diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index d7de09b..6dc0497 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -208,9 +208,22 @@ func NewProducerWorkerStatus(topic string) ProducerWorkerStatus { } } +func (pw *ProducerWorker) OnAcked(r *kgo.Record) { + pw.Status.lock.Lock() + defer pw.Status.lock.Unlock() + + pw.Status.OnAcked(r.Partition, r.Offset) + + pw.validOffsets.Insert(r.Partition, r.Offset) + if pw.config.producesTombstones && r.Value != nil { + pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset) + } + if pw.validateLatestValues { + pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value)) + } +} + func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) { - self.lock.Lock() - defer self.lock.Unlock() self.Acked += 1 currentMax, present := self.MaxOffsetsProduced[Partition] @@ -406,17 +419,9 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) { log.Debugf("errored = %t", errored) } else { ackLatency := time.Now().Sub(sentAt) - pw.Status.OnAcked(r.Partition, r.Offset) + pw.OnAcked(r) pw.Status.latency.Update(ackLatency.Microseconds()) log.Debugf("Wrote partition %d at %d", r.Partition, r.Offset) - pw.validOffsets.Insert(r.Partition, r.Offset) - if pw.config.producesTombstones && r.Value != nil { - pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset) - } - if pw.validateLatestValues { - pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value)) - } - } wg.Done() }