Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SIGNAL-7090 UPDATE async worker id setting per partition #97

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/kafee/producer/async_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ defmodule Kafee.Producer.AsyncAdapter do
|> Keyword.put(:topic, topic)
|> Keyword.put(:partition, partition)

with {:error, {:already_started, pid}} <- Supervisor.start_child(producer, {AsyncWorker, worker_options}) do
brod_client_name = brod_client(producer)
custom_child_id = "#{brod_client_name}-AsyncWorker-#{topic}-#{partition}"

with {:error, {:already_started, pid}} <-
Supervisor.start_child(producer, %{id: custom_child_id, start: {AsyncWorker, :start_link, [worker_options]}}) do
{:ok, pid}
end
end
Expand Down
58 changes: 52 additions & 6 deletions test/kafee/producer/async_adapter_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Kafee.Producer.AsyncAdapterTest do
use Kafee.KafkaCase
import ExUnit.CaptureLog

alias Kafee.Producer.{AsyncWorker, Message}

Expand Down Expand Up @@ -45,6 +46,15 @@ defmodule Kafee.Producer.AsyncAdapterTest do
end

describe "produce/2" do
defp assert_registry_lookup_pid(brod_client, topic, partition) do
{:via, Registry, {registry_name, registry_key}} =
AsyncWorker.process_name(brod_client, topic, partition)

[{pid, _value}] = Registry.lookup(registry_name, registry_key)
assert is_pid(pid)
pid
end

test "queues messages to the async worker", %{topic: topic} do
messages = BrodApi.generate_producer_message_list(topic, 5)

Expand Down Expand Up @@ -73,19 +83,55 @@ defmodule Kafee.Producer.AsyncAdapterTest do
assert :ok = MyProducer.produce(messages)
assert_called_once(AsyncWorker.queue(_pid, _messages))

{:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, 0)
assert [{pid, _value}] = Registry.lookup(registry_name, registry_key)
assert is_pid(pid)
assert_registry_lookup_pid(MyProducer.BrodClient, topic, 0)

# Send more messages
messages = BrodApi.generate_producer_message_list(topic, 5)
assert :ok = MyProducer.produce(messages)
assert_called(AsyncWorker.queue(_pid, _messages), 2)

# Assert there is still only a single async worker process
{:via, Registry, {registry_name, registry_key}} = AsyncWorker.process_name(MyProducer.BrodClient, topic, 0)
assert [{pid, _value}] = Registry.lookup(registry_name, registry_key)
assert is_pid(pid)
assert_registry_lookup_pid(MyProducer.BrodClient, topic, 0)
end

@tag capture_log: true
test "uses different async worker processes for different partitions", %{topic: topic} do
# messages list with all unique partitions
messages = BrodApi.generate_producer_partitioned_message_list(topic, 5, 5)

capture_log(fn ->
assert :ok = MyProducer.produce(messages)
end)

assert_called(AsyncWorker.queue(_pid, _messages), 5)

worker_pids =
for x <- 0..4 do
assert_registry_lookup_pid(MyProducer.BrodClient, topic, x)
end

assert 5 == worker_pids |> Enum.uniq() |> length()

# Send more messages through same 5 partitions
messages = BrodApi.generate_producer_partitioned_message_list(topic, 5, 5)

assert :ok = MyProducer.produce(messages)
assert_called(AsyncWorker.queue(_pid, _messages), 10)

# Assert no new worker processes got created
worker_pids_2 =
for x <- 0..4 do
assert_registry_lookup_pid(MyProducer.BrodClient, topic, x)
end

assert worker_pids_2 |> MapSet.new() |> MapSet.equal?(MapSet.new(worker_pids))

# clean up because kafka in test only has one partition, so errors happen
capture_log(fn ->
for pid <- worker_pids do
GenServer.stop(pid)
end
end)
end
end
end
28 changes: 28 additions & 0 deletions test/support/brod_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,34 @@ defmodule Kafee.BrodApi do
}
end

@doc """
Generates a list of messages that tries to spread evenly across the given number of partitions.
Returns error if number of partitions is greater than number of messages to create.
"""
def generate_producer_partitioned_message_list(topic, number_of_messages, partitions \\ 1)

def generate_producer_partitioned_message_list(_topic, number_of_messages, partitions)
when length(number_of_messages) < length(partitions) do
{:error, "number of partitions is greather than number of messages"}
end

def generate_producer_partitioned_message_list(topic, number_of_messages, partitions) do
messages = generate_producer_message_list(topic, number_of_messages)
chunk_every = Kernel.floor(number_of_messages / partitions)

messages
|> Enum.chunk_every(chunk_every)
|> Enum.with_index()
|> Enum.flat_map(fn {chunked_messages, idx} ->
Enum.map(chunked_messages, fn message ->
%{message | partition: idx}
end)
end)

# # change partitions
# messages = messages |> Enum.with_index(1) |> Enum.map(fn {message, idx} -> %{message | partition: idx} end)
end

@doc """
Returns a simple map of all of the message fields we send to brod and Kafka.
"""
Expand Down