Skip to content

Commit

Permalink
(#1308) - optionally fail commit attempts for generations that are no…
Browse files Browse the repository at this point in the history
…t valid anymore.
  • Loading branch information
nachogiljaldo committed Nov 15, 2024
1 parent 7b9c99d commit 49e263b
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 78 deletions.
3 changes: 3 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.HighWaterMark = batch.highWaterMark
msg.Time = makeTime(timestamp)
msg.Headers = headers
if batch.conn != nil {
msg.GenerationId = batch.conn.generationId
}

return msg, err
}
Expand Down
14 changes: 8 additions & 6 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package kafka
// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
topic string
partition int
offset int64
topic string
partition int
offset int64
generationId int32
}

// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
generationId: msg.GenerationId,
}
}

Expand Down
6 changes: 6 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
var (
errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")

undefinedGenerationId int32 = -1
)

// Conn represents a connection to a kafka broker.
Expand Down Expand Up @@ -65,6 +67,8 @@ type Conn struct {
apiVersions atomic.Value // apiVersionMap

transactionalID *string

generationId int32
}

type apiVersionMap map[apiKey]ApiVersion
Expand Down Expand Up @@ -182,6 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
offset: FirstOffset,
requiredAcks: -1,
transactionalID: emptyToNullable(config.TransactionalID),
generationId: undefinedGenerationId,
}

c.wb.w = &c.wbuf
Expand Down Expand Up @@ -388,6 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

c.generationId = response.GenerationID
return response, nil
}

Expand Down
30 changes: 18 additions & 12 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (g *Generation) Start(fn func(ctx context.Context)) {
// consumer group coordinator. This can be used to reset the consumer to
// explicit offsets.
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
return g.CommitOffsetsForGenID(g.ID, offsets)
}

// CommitOffsetsForGenID commits the provided topic+partition+offset combos to the
// consumer group coordinator specifying the given genID.
func (g *Generation) CommitOffsetsForGenID(genID int32, offsets map[string]map[int]int64) error {
if len(offsets) == 0 {
return nil
}
Expand All @@ -434,7 +440,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {

request := offsetCommitRequestV2{
GroupID: g.GroupID,
GenerationID: g.ID,
GenerationID: genID,
MemberID: g.MemberID,
RetentionTime: g.retentionMillis,
Topics: topics,
Expand Down Expand Up @@ -925,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand Down Expand Up @@ -1073,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
Expand Down
4 changes: 4 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Message struct {
Value []byte
Headers []Header

// If the message has been sent by a consumer group, it contains the
// generation's id. Value is -1 if not using consumer groups.
GenerationId int32

// This field is used to hold arbitrary data you wish to include, so it
// will be available when handle it on the Writer's `Completion` method,
// this support the application can do any post operation on each message.
Expand Down
94 changes: 75 additions & 19 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
Expand All @@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
}

r.mutex.Lock()
r.start(offsets)
r.start(generationId, offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
Expand All @@ -150,35 +150,73 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
backoffDelayMax = 5 * time.Second
)

for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
return
messagesToSendForGeneration := make(map[int32]map[string]map[int]int64)
for topic, partitionsInfo := range offsetStash {
for partition, commitInfo := range partitionsInfo {
if _, ok := messagesToSendForGeneration[commitInfo.generationID]; !ok {
messagesToSendForGeneration[commitInfo.generationID] = make(map[string]map[int]int64)
}
msgsForTopic := messagesToSendForGeneration[commitInfo.generationID]
if _, ok := msgsForTopic[topic]; !ok {
msgsForTopic[topic] = make(map[int]int64)
}
msgsForPartition := msgsForTopic[topic]
msgsForPartition[partition] = commitInfo.offset
}
}
var illegalGenerationErr bool
for generationID, messages := range messagesToSendForGeneration {
for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
continue
}
}

if err = gen.CommitOffsets(offsetStash); err == nil {
return
if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil {
break
}

// IllegalGeneration error is not retriable, but we should attempt to
// perform the remaining commits
if errors.Is(err, IllegalGeneration) {
r.withErrorLogger(func(l Logger) { l.Printf("generation %d - %v", generationID, err) })
offsetStash.removeGenerationID(generationID)
illegalGenerationErr = true
err = nil
break
}
}
}

// if configured to ignore the error
if illegalGenerationErr && r.config.ErrorOnWrongGenerationCommit {
err = IllegalGeneration
}
return // err will not be nil
}

// offsetStash holds offsets by topic => partition => offset.
type offsetStash map[string]map[int]int64
// offsetStash holds offsets by topic => partition => offsetEntry.
type offsetEntry struct {
offset int64
generationID int32
}
type offsetStash map[string]map[int]offsetEntry

// merge updates the offsetStash with the offsets from the provided messages.
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
offsetsByPartition = map[int]offsetEntry{}
o[c.topic] = offsetsByPartition
}

if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
offsetsByPartition[c.partition] = offsetEntry{
offset: c.offset,
generationID: c.generationId,
}
}
}
}
Expand All @@ -190,6 +228,19 @@ func (o offsetStash) reset() {
}
}

func (o offsetStash) removeGenerationID(genID int32) {
for topic, offsetsForTopic := range o {
for partition, offsetsForPartition := range offsetsForTopic {
if offsetsForPartition.generationID == genID {
delete(offsetsForTopic, partition)
}
if len(offsetsForTopic) == 0 {
delete(o, topic)
}
}
}
}

// commitLoopImmediate handles each commit synchronously.
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
offsets := offsetStash{}
Expand Down Expand Up @@ -329,7 +380,7 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.stats.rebalances.observe(1)

r.subscribe(gen.Assignments)
r.subscribe(gen.ID, gen.Assignments)

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
Expand Down Expand Up @@ -522,6 +573,10 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// ErrorOnWrongGenerationCommit indicates that we should return an error when
// attempting to commit a message to a generation different than the current one.
ErrorOnWrongGenerationCommit bool
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -819,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -1040,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(r.getTopicPartitionOffset())
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1178,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand Down Expand Up @@ -1216,7 +1271,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, offset)
}).run(ctx, generationId, offset)
}(ctx, key, offset, &r.join)
}
}
Expand Down Expand Up @@ -1253,7 +1308,7 @@ type readerMessage struct {
error error
}

func (r *reader) run(ctx context.Context, offset int64) {
func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
Expand Down Expand Up @@ -1306,6 +1361,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
}
continue
}
conn.generationId = generationId

// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
Expand Down
Loading

0 comments on commit 49e263b

Please sign in to comment.