diff --git a/lib/kafee/producer/async_adapter.ex b/lib/kafee/producer/async_adapter.ex index a0388ee..d4a4239 100644 --- a/lib/kafee/producer/async_adapter.ex +++ b/lib/kafee/producer/async_adapter.ex @@ -281,7 +281,11 @@ defmodule Kafee.Producer.AsyncAdapter do |> Keyword.put(:topic, topic) |> Keyword.put(:partition, partition) - with {:error, {:already_started, pid}} <- Supervisor.start_child(producer, {AsyncWorker, worker_options}) do + brod_client_name = brod_client(producer) + custom_child_id = "#{brod_client_name}-AsyncWorker-#{topic}-#{partition}" + + with {:error, {:already_started, pid}} <- + Supervisor.start_child(producer, %{id: custom_child_id, start: {AsyncWorker, :start_link, [worker_options]}}) do {:ok, pid} end end diff --git a/test/kafee/producer/async_adapter_test.exs b/test/kafee/producer/async_adapter_test.exs index 4c61ae0..fa318ed 100644 --- a/test/kafee/producer/async_adapter_test.exs +++ b/test/kafee/producer/async_adapter_test.exs @@ -1,5 +1,6 @@ defmodule Kafee.Producer.AsyncAdapterTest do use Kafee.KafkaCase + import ExUnit.CaptureLog alias Kafee.Producer.{AsyncWorker, Message} @@ -45,6 +46,15 @@ defmodule Kafee.Producer.AsyncAdapterTest do end describe "produce/2" do + defp assert_registry_lookup_pid(brod_client, topic, partition) do + {:via, Registry, {registry_name, registry_key}} = + AsyncWorker.process_name(brod_client, topic, partition) + + [{pid, _value}] = Registry.lookup(registry_name, registry_key) + assert is_pid(pid) + pid + end + test "queues messages to the async worker", %{topic: topic} do messages = BrodApi.generate_producer_message_list(topic, 5) @@ -73,9 +83,7 @@ defmodule Kafee.Producer.AsyncAdapterTest do assert :ok = MyProducer.produce(messages) assert_called_once(AsyncWorker.queue(_pid, _messages)) - {:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, 0) - assert [{pid, _value}] = Registry.lookup(registry_name, registry_key) - assert is_pid(pid) + assert_registry_lookup_pid(MyProducer.BrodClient, topic, 0) # Send more messages messages = BrodApi.generate_producer_message_list(topic, 5) @@ -83,9 +91,47 @@ defmodule Kafee.Producer.AsyncAdapterTest do assert_called(AsyncWorker.queue(_pid, _messages), 2) # Assert there is still only a single async worker process - {:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, 0) - assert [{pid, _value}] = Registry.lookup(registry_name, registry_key) - assert is_pid(pid) + assert_registry_lookup_pid(MyProducer.BrodClient, topic, 0) + end + + @tag capture_log: true + test "uses different async worker processes for different partitions", %{topic: topic} do + # messages list with all unique partitions + messages = BrodApi.generate_producer_partitioned_message_list(topic, 5, 5) + + capture_log(fn -> + assert :ok = MyProducer.produce(messages) + end) + + assert_called(AsyncWorker.queue(_pid, _messages), 5) + + worker_pids = + for x <- 0..4 do + assert_registry_lookup_pid(MyProducer.BrodClient, topic, x) + end + + assert 5 == worker_pids |> Enum.uniq() |> length() + + # Send more messages through same 5 partitions + messages = BrodApi.generate_producer_partitioned_message_list(topic, 5, 5) + + assert :ok = MyProducer.produce(messages) + assert_called(AsyncWorker.queue(_pid, _messages), 10) + + # Assert no new worker processes got created + worker_pids_2 = + for x <- 0..4 do + assert_registry_lookup_pid(MyProducer.BrodClient, topic, x) + end + + assert worker_pids_2 |> MapSet.new() |> MapSet.equal?(MapSet.new(worker_pids)) + + # clean up because kafka in test only has one partition, so errors happen + capture_log(fn -> + for pid <- worker_pids do + GenServer.stop(pid) + end + end) end end end diff --git a/test/support/brod_api.ex b/test/support/brod_api.ex index 6870b75..c813a24 100644 --- a/test/support/brod_api.ex +++ b/test/support/brod_api.ex @@ -93,6 +93,34 @@ defmodule Kafee.BrodApi do } end + @doc """ + Generates a list of messages that tries to spread evenly across the given number of partitions. + Returns error if number of partitions is greater than number of messages to create. + """ + def generate_producer_partitioned_message_list(topic, number_of_messages, partitions \\ 1) + + def generate_producer_partitioned_message_list(_topic, number_of_messages, partitions) + when length(number_of_messages) < length(partitions) do + {:error, "number of partitions is greather than number of messages"} + end + + def generate_producer_partitioned_message_list(topic, number_of_messages, partitions) do + messages = generate_producer_message_list(topic, number_of_messages) + chunk_every = Kernel.floor(number_of_messages / partitions) + + messages + |> Enum.chunk_every(chunk_every) + |> Enum.with_index() + |> Enum.flat_map(fn {chunked_messages, idx} -> + Enum.map(chunked_messages, fn message -> + %{message | partition: idx} + end) + end) + + # # change partitions + # messages = messages |> Enum.with_index(1) |> Enum.map(fn {message, idx} -> %{message | partition: idx} end) + end + @doc """ Returns a simple map of all of the message fields we send to brod and Kafka. """