Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose information about rebalance events in th reader #1331

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand Down Expand Up @@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
Expand Down
6 changes: 3 additions & 3 deletions example_groupbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func ExampleNewReader_rackAffinity() {
}

// findRack is the basic rack resolver strategy for use in AWS. It supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
// - ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// - Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
Expand Down
22 changes: 12 additions & 10 deletions groupbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ type GroupBalancer interface {
// RangeGroupBalancer groups consumers by partition
//
// Example: 5 partitions, 2 consumers
// C0: [0, 1, 2]
// C1: [3, 4]
//
// C0: [0, 1, 2]
// C1: [3, 4]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
//
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
type RangeGroupBalancer struct{}

func (r RangeGroupBalancer) ProtocolName() string {
Expand Down Expand Up @@ -92,14 +93,15 @@ func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions
// RoundrobinGroupBalancer divides partitions evenly among consumers
//
// Example: 5 partitions, 2 consumers
// C0: [0, 2, 4]
// C1: [1, 3]
//
// C0: [0, 2, 4]
// C1: [1, 3]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
//
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
type RoundRobinGroupBalancer struct{}

func (r RoundRobinGroupBalancer) ProtocolName() string {
Expand Down
12 changes: 12 additions & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,15 @@ func (l *testKafkaLogger) Printf(msg string, args ...interface{}) {
l.T.Logf(msg, args...)
}
}

type testRebalanceEventCallback struct {
NoticeChan chan map[string][]PartitionAssignment
}

func newTestRebalanceEventCallback(c chan map[string][]PartitionAssignment) RebalanceEventInterceptor {
return &testRebalanceEventCallback{NoticeChan: c}
}

func (c *testRebalanceEventCallback) Callback(partitionAssignments map[string][]PartitionAssignment) {
c.NoticeChan <- partitionAssignments
}
11 changes: 6 additions & 5 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ type Logger interface {

// LoggerFunc is a bridge between Logger and any third party logger
// Usage:
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
//
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
type LoggerFunc func(string, ...interface{})

func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
13 changes: 13 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.subscribe(gen.Assignments)

r.withRebalanceEventInterceptor(func(l RebalanceEventInterceptor) {
l.Callback(gen.Assignments)
})

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
Expand Down Expand Up @@ -522,6 +526,9 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// If not nil, specifies a callback usd to report rebalance events
RebalanceEventInterceptor RebalanceEventInterceptor
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -1142,6 +1149,12 @@ func (r *Reader) withErrorLogger(do func(Logger)) {
}
}

func (r *Reader) withRebalanceEventInterceptor(do func(RebalanceEventInterceptor)) {
if r.config.RebalanceEventInterceptor != nil {
do(r.config.RebalanceEventInterceptor)
}
}

func (r *Reader) activateReadLag() {
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
// read lag will only be calculated when not using consumer groups
Expand Down
46 changes: 46 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,52 @@ func deleteTopic(t *testing.T, topic ...string) {
}
}

func TestReaderCollectsRebalanceEvents(t *testing.T) {
const GroupId = "a"
const Partitions = 5
topic := makeTopic()
createTopic(t, topic, Partitions)
defer deleteTopic(t, topic)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
c := make(chan map[string][]PartitionAssignment)
eventReceived := false
defer func() {
if !eventReceived {
t.Error("no rebalance event received")
}
}()
go func() {
firstAssignment := <-c
if len(firstAssignment) != 1 {
t.Error("multiple topics assigned")
}
info, ok := firstAssignment[topic]
if !ok {
t.Error("wrong topic assigned")
}
if len(info) != Partitions {
t.Error("wrong number of partitions assigned")
}
eventReceived = true
}()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: GroupId,
MinBytes: 1,
MaxBytes: 1000,
MaxWait: 100 * time.Millisecond,
RebalanceEventInterceptor: newTestRebalanceEventCallback(c),
})
defer r.Close()

prepareReader(t, ctx, r, makeTestSequence(1)...)
_, _ = r.ReadMessage(ctx)
}

func TestReaderOnNonZeroPartition(t *testing.T) {
tests := []struct {
scenario string
Expand Down
12 changes: 12 additions & 0 deletions rebalance_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kafka

// RebalanceEventInterceptor defines the rebalance event callback API
type RebalanceEventInterceptor interface {
Callback(map[string][]PartitionAssignment)
}

type RebalanceFunc func(map[string][]PartitionAssignment)

func (f RebalanceFunc) Callback(partitionAssignments map[string][]PartitionAssignment) {
f(partitionAssignments)
}