You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
in my code I use a buffered channel to hold messages obtained from a kafka consumer which I then want to write out using a producer to a different broker.
for msg := range mChan.Messages {
message := kafka.Message{Value:[]byte(msg)}
w := &kafka.Writer{
Addr: kafka.TCP(broker),
Topic: topic,
AllowAutoTopicCreation: true,
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(), message)
if err != nil {
log.Println("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Println("failed to close writer:", err)
}
}
my question is, should I be creating a new kafka.Writer like this for every message? i tried moving the creation of kafka.Writer outside the range loop thinking I can re-use the same same instance of kafka.Writer but this just results in
failed to write messages: io: read/write on closed pipe errors when i try to WriteMessages
ie:
w := &kafka.Writer{
Addr: kafka.TCP(broker),
Topic: topic,
AllowAutoTopicCreation: true,
Balancer: &kafka.LeastBytes{},
}
for msg := range mChan.Messages {
message := kafka.Message{Value:[]byte(msg)}
err := w.WriteMessages(context.Background(), message)
if err != nil {
log.Println("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Println("failed to close writer:", err)
}
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi, new to go so this may be a silly question.
in my code I use a buffered channel to hold messages obtained from a kafka consumer which I then want to write out using a producer to a different broker.
my question is, should I be creating a new kafka.Writer like this for every message? i tried moving the creation of kafka.Writer outside the range loop thinking I can re-use the same same instance of kafka.Writer but this just results in
failed to write messages: io: read/write on closed pipe errors when i try to WriteMessages
ie:
any help would be appreciated
thanks
Beta Was this translation helpful? Give feedback.
All reactions