Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SIGNAL-7090 UPDATE async worker id setting per partition #97

Merged
5 changes: 4 additions & 1 deletion lib/kafee/producer/async_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ defmodule Kafee.Producer.AsyncAdapter do
|> Keyword.put(:topic, topic)
|> Keyword.put(:partition, partition)

with {:error, {:already_started, pid}} <- Supervisor.start_child(producer, {AsyncWorker, worker_options}) do
custom_child_id = "#{AsyncWorker}#{partition}"
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

with {:error, {:already_started, pid}} <-
Supervisor.start_child(producer, %{id: custom_child_id, start: {AsyncWorker, :start_link, [worker_options]}}) do
{:ok, pid}
end
end
Expand Down
51 changes: 51 additions & 0 deletions test/kafee/producer/async_adapter_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Kafee.Producer.AsyncAdapterTest do
use Kafee.KafkaCase
import ExUnit.CaptureLog

alias Kafee.Producer.{AsyncWorker, Message}

Expand Down Expand Up @@ -87,5 +88,55 @@ defmodule Kafee.Producer.AsyncAdapterTest do
assert [{pid, _value}] = Registry.lookup(registry_name, registry_key)
assert is_pid(pid)
end

@tag capture_log: true
test "uses different async worker processes for different partitions", %{topic: topic} do
# Send original messages to start the worker
messages = BrodApi.generate_producer_message_list(topic, 5)
# change partitions
messages = messages |> Enum.with_index(1) |> Enum.map(fn {message, idx} -> %{message | partition: idx} end)

capture_log(fn ->
assert :ok = MyProducer.produce(messages)
end)

assert_called(AsyncWorker.queue(_pid, _messages), 5)

worker_pids =
for x <- 1..5 do
{:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, x)
assert [{pid, _value}] = Registry.lookup(registry_name, registry_key)
assert is_pid(pid)
pid
end

assert 5 == worker_pids |> Enum.uniq() |> length()

# Send more messages
messages = BrodApi.generate_producer_message_list(topic, 5)
# change partitions
messages = messages |> Enum.with_index(1) |> Enum.map(fn {message, idx} -> %{message | partition: idx} end)
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved

assert :ok = MyProducer.produce(messages)
assert_called(AsyncWorker.queue(_pid, _messages), 10)

# Assert no new worker processes got created
worker_pids_2 =
for x <- 1..5 do
{:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, x)
assert [{pid, _value}] = Registry.lookup(registry_name, registry_key)
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
assert is_pid(pid)
pid
end

assert worker_pids_2 |> MapSet.new() |> MapSet.equal?(MapSet.new(worker_pids))

# clean up because kafka in test only has one partition, so errors happen
capture_log(fn ->
for pid <- worker_pids do
GenServer.stop(pid)
end
end)
end
end
end
Loading