-
Notifications
You must be signed in to change notification settings - Fork 796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the field to avoid creating missing topic. #873
Changes from all commits
054fc07
59a3338
682d91a
dc5d0c6
55102ba
5d70fac
f6edf1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,6 +185,9 @@ type Writer struct { | |
// If nil, DefaultTransport is used. | ||
Transport RoundTripper | ||
|
||
// AllowAutoTopicCreation notifies writer to create topic is missing. | ||
AllowAutoTopicCreation bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not introduce this in the writer config too? If I am using WriterConfig to create the writer, is it safe to just set this config after calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @erikdw @achille-roussel Here's a PR in case 1278 is a desirable feature request: #1279 if |
||
|
||
// Manages the current set of partition-topic writers. | ||
group sync.WaitGroup | ||
mutex sync.Mutex | ||
|
@@ -733,7 +736,7 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { | |
// caching recent results (the kafka.Transport types does). | ||
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{ | ||
TopicNames: []string{topic}, | ||
AllowAutoTopicCreation: true, | ||
AllowAutoTopicCreation: w.AllowAutoTopicCreation, | ||
}) | ||
if err != nil { | ||
return 0, err | ||
|
@@ -941,7 +944,7 @@ func newBatchQueue(initialSize int) batchQueue { | |
bq := batchQueue{ | ||
queue: make([]*writeBatch, 0, initialSize), | ||
mutex: &sync.Mutex{}, | ||
cond: &sync.Cond{}, | ||
cond: &sync.Cond{}, | ||
} | ||
|
||
bq.cond.L = bq.mutex | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it's not a breaking change per-say since the code will continue to compile, the new behavior we introduce is the inverse of what the program used to do.
Do you think we could add a bit more documentation to reflect that fact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@achille-roussel hey! indeed, you are right wrt docu update. I update the test case as well.