Skip to content

Commit

Permalink
Add a listener for rebalances.
Browse files Browse the repository at this point in the history
  • Loading branch information
nachogiljaldo committed Jul 2, 2024
1 parent 7b9c99d commit f8eddfc
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
20 changes: 20 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,23 @@ func (r *Reader) run(cg *ConsumerGroup) {
for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
gen, err = cg.Next(r.stctx)
if err == nil {
if r.config.AssignmentListener != nil {
assignments := make([]GroupMemberTopic, 0, len(gen.Assignments))
for topic, partitions := range gen.Assignments {
assignedPartitions := make([]int, 0, len(partitions))
for _, partition := range partitions {
assignedPartitions = append(assignedPartitions, partition.ID)
}
sort.Slice(assignedPartitions, func(i, j int) bool {
return assignedPartitions[i] < assignedPartitions[j]
})
assignments = append(assignments, GroupMemberTopic{
Topic: topic,
Partitions: assignedPartitions,
})
}
r.config.AssignmentListener(assignments)
}
break
}
if errors.Is(err, r.stctx.Err()) {
Expand Down Expand Up @@ -522,6 +539,9 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// AsignmentListener is called when a reassignment happens indicating what are the new partitions
AssignmentListener func(partitions []GroupMemberTopic)
}

// Validate method validates ReaderConfig properties.
Expand Down
44 changes: 44 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -890,6 +891,49 @@ func TestReaderConsumerGroup(t *testing.T) {
}
}

func TestAssignmentListener(t *testing.T) {
// It appears that some of the tests depend on all these tests being
// run concurrently to pass... this is brittle and should be fixed
// at some point.
t.Parallel()

topic := makeTopic()
createTopic(t, topic, 10)
defer deleteTopic(t, topic)

var lock sync.Mutex
assignments := make([][]GroupMemberTopic, 0)
groupID := makeGroupID()
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: groupID,
HeartbeatInterval: 2 * time.Second,
CommitInterval: 1 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
MinBytes: 1,
MaxBytes: 1e6,
AssignmentListener: func(partitions []GroupMemberTopic) {
lock.Lock()
defer lock.Unlock()
assignments = append(assignments, partitions)
},
})
defer r.Close()

assert.Eventually(t, func() bool {
return reflect.DeepEqual(assignments, [][]GroupMemberTopic{
{
GroupMemberTopic{
Topic: topic,
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
},
})
}, 10*time.Second, 100*time.Millisecond)
}

func testReaderConsumerGroupHandshake(t *testing.T, ctx context.Context, r *Reader) {
prepareReader(t, context.Background(), r, makeTestSequence(5)...)

Expand Down

0 comments on commit f8eddfc

Please sign in to comment.