Skip to content

Commit

Permalink
producer_worker: add OnAcked()
Browse files Browse the repository at this point in the history
There was a race-y access to a map within the producer worker.
Add `OnAcked()` to allow for proper access of the producer's lock
during reading and writing.
  • Loading branch information
WillemKauf committed Nov 22, 2024
1 parent 1a36209 commit 5ef813d
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,22 @@ func NewProducerWorkerStatus(topic string) ProducerWorkerStatus {
}
}

func (pw *ProducerWorker) OnAcked(r *kgo.Record) {
pw.Status.lock.Lock()
defer pw.Status.lock.Unlock()

pw.Status.OnAcked(r.Partition, r.Offset)

pw.validOffsets.Insert(r.Partition, r.Offset)
if pw.config.producesTombstones && r.Value != nil {
pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset)
}
if pw.validateLatestValues {
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
}
}

func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) {
self.lock.Lock()
defer self.lock.Unlock()
self.Acked += 1

currentMax, present := self.MaxOffsetsProduced[Partition]
Expand Down Expand Up @@ -406,17 +419,9 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) {
log.Debugf("errored = %t", errored)
} else {
ackLatency := time.Now().Sub(sentAt)
pw.Status.OnAcked(r.Partition, r.Offset)
pw.OnAcked(r)
pw.Status.latency.Update(ackLatency.Microseconds())
log.Debugf("Wrote partition %d at %d", r.Partition, r.Offset)
pw.validOffsets.Insert(r.Partition, r.Offset)
if pw.config.producesTombstones && r.Value != nil {
pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset)
}
if pw.validateLatestValues {
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
}

}
wg.Done()
}
Expand Down

0 comments on commit 5ef813d

Please sign in to comment.