Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SIGNAL-7060 preemtively drop large messages from queue #101

Merged
merged 16 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
defmodule Kafee.Producer.AsyncWorker do
# A simple GenServer for every topic * partition in Kafka. It holds an
# erlang `:queue` and sends messages every so often. On process close, we
# attempt to send all messages to Kafka, and in the unlikely event we can't
# we write all messages to the logs.
@moduledoc """
A simple GenServer for every topic * partition in Kafka. It holds an
erlang `:queue` and sends messages every so often.

@moduledoc false
## Note on termination

On process close, we attempt to send all messages to Kafka,
and in the unlikely event we can't, we write all messages to the logs.

## When message is larger than what Kafka allows

### Configuration of `max_request_bytes` option in `Kafee.Producer.AsyncAdapter`

Although you'd set the max size of messages over at Kafka's cloud settings,
it is actually the `state.max_request_bytes` that should be set correctly in order for any
size based triaging can happen. Therefore setting that is critical for large message handling.

### During message queuing

Note that at `handle_cast({:queue, messages}, state)`, code will drop large messages so they don't actually get into the queue.
These dropped messsages will show up in logs - please see the defp `queue_without_large_messages/2`.
The messages should be picked up from the logs and should be triaged accordingly.

"""

use GenServer,
shutdown: :timer.seconds(25)
Expand Down Expand Up @@ -226,10 +244,13 @@ defmodule Kafee.Producer.AsyncWorker do
@doc false
def handle_info(_, state), do: {:noreply, state}

# A simple request to add more messages to the queue. Nothing fancy here.
# A simple request to add more messages to the queue.
# Note: will drop large messages and not add it to queue.
@doc false
def handle_cast({:queue, messages}, state) do
new_queue = :queue.join(state.queue, :queue.from_list(messages))
new_messages_queue = messages |> :queue.from_list() |> queue_without_large_messages(state.max_request_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand, the primary change here is that before this pr, we would still attempt to publish messages that are too large unless the async worker process was terminating in which case we would filter them out.

Whereas, with this change we will pre-emptively drop the large messages and never try to publish them. Is that correct?

I think that makes sense from a functional perspective, but I want to double check my understanding before approving!

Copy link
Contributor Author

@seungjinstord seungjinstord Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that's 100% correct. Originally we would ignore the state.max_request_bytes on the first send attempt, and just push first, and then handle the errors bubbling up from brod.

Adding to the context: due to increased traffic and size - we are observing brod's related processes failing and the AsyncWorker layer isn't getting the "raw" error messages with the actualy message payload (it seems brod is eating it ahead of bubbling up).

Therefore the other way would be if we already know and have set the state.max_request_byte, we should be safe in filtering the large messges out ahead of time, so the queue won't even have them.

new_queue = :queue.join(state.queue, new_messages_queue)

emit_queue_telemetry(state, :queue.len(new_queue))

Process.send_after(self(), :send, state.throttle_ms)
Expand All @@ -252,7 +273,13 @@ defmodule Kafee.Producer.AsyncWorker do
def terminate(_reason, %{send_task: nil} = state) do
# We only focus on triaging the queue in state. If there are messages too big, we log and don't send.
# Update state with queue just with messages that are acceptable
state = %{state | queue: state_queue_without_large_messages(state)}
state = %{state | queue: queue_without_large_messages(state.queue, state.max_request_bytes)}

count = :queue.len(state.queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

terminate_send(state)
end
Expand Down Expand Up @@ -316,21 +343,21 @@ defmodule Kafee.Producer.AsyncWorker do
:ok
end

defp state_queue_without_large_messages(state) do
defp queue_without_large_messages(queue, max_request_bytes) do
# messages_beyond_max_bytes are going to be logged and not processed,
# as they are individually already over max_request_bytes in size.

{messages_within_max_bytes_queue, messages_beyond_max_bytes_reversed} =
:queue.fold(
fn message, {acc_queue_messages_within_limit, acc_messages_beyond_limit} ->
if message_within_max_bytes?(message, state.max_request_bytes) do
if message_within_max_bytes?(message, max_request_bytes) do
{:queue.in(message, acc_queue_messages_within_limit), acc_messages_beyond_limit}
else
{acc_queue_messages_within_limit, [message | acc_messages_beyond_limit]}
end
end,
{:queue.new(), []},
state.queue
queue
)

messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed)
Expand All @@ -339,12 +366,6 @@ defmodule Kafee.Producer.AsyncWorker do
Logger.error("Message in queue is too large, will not push to Kafka", data: message)
end)

count = :queue.len(messages_within_max_bytes_queue)

if count > 0 do
Logger.info("Attempting to send #{count} messages to Kafka before terminate")
end

messages_within_max_bytes_queue
end

Expand Down
135 changes: 105 additions & 30 deletions test/kafee/producer/async_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,86 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

describe "queue/2" do
setup(%{topic: topic}) do
[small_message] = BrodApi.generate_producer_message_list(topic, 1)
message_fixture = File.read!("test/support/example/large_message.json")
large_message_fixture = String.duplicate(message_fixture, 10)

# This message will skip being sent to Kafka, and only be logged
large_message_1 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

large_message_2 =
topic
|> BrodApi.generate_producer_message()
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_2")

[small_message: small_message, large_message_1: large_message_1, large_message_2: large_message_2]
end

test "queue a list of messages will send them", %{pid: pid, topic: topic} do
messages = BrodApi.generate_producer_message_list(topic, 2)
assert :ok = AsyncWorker.queue(pid, messages)
assert_receive {^topic, {GenServer, :cast, {:queue, ^messages}}}
end

@tag capture_log: true
test "any messages too large gets logged and dropped from queue when small message is first in list to enqueue", %{
pid: pid,
topic: topic,
small_message: small_message,
large_message_1: large_message_1,
large_message_2: large_message_2
} do
messages = [small_message, large_message_1, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

expected_large_message_error_log = "Message in queue is too large, will not push to Kafka"
brod_message = BrodApi.to_kafka_message(small_message)
assert_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))

assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1
async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()

# all of the messages in queue are processed or dropped
assert 0 == :queue.len(async_worker_state.queue)
end

@tag capture_log: true
test "any messages too large gets logged and dropped from queue when large message is first in list to enqueue", %{
pid: pid,
topic: topic,
small_message: small_message,
large_message_1: large_message_1,
large_message_2: large_message_2
} do
messages = [large_message_1, small_message, large_message_2]

log =
capture_log(fn ->
assert :ok = AsyncWorker.queue(pid, messages)
Process.sleep(@wait_timeout)
end)

expected_large_message_error_log = "Message in queue is too large, will not push to Kafka"
brod_message = BrodApi.to_kafka_message(large_message_1)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
brod_message = BrodApi.to_kafka_message(large_message_2)
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert 2 == (log |> String.split(expected_large_message_error_log) |> length()) - 1

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
end
end

describe "handle_info :send" do
Expand Down Expand Up @@ -100,18 +175,18 @@ defmodule Kafee.Producer.AsyncWorkerTest do
test ":ok removes sent messages from the queue", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert ^remaining_messages = :queue.to_list(new_state.queue)
assert ^messages = :queue.to_list(new_state.queue)
end

test ":ok emits telemetry of remaining messages", %{state: state, topic: topic} do
task = make_fake_task()
send_messages = BrodApi.generate_producer_message_list(topic, 4)
remaining_messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ remaining_messages)}
messages = BrodApi.generate_producer_message_list(topic, 3)
state = %{state | send_task: task, queue: :queue.from_list(send_messages ++ messages)}

assert {:noreply, _new_state} = AsyncWorker.handle_info({task.ref, {:ok, 4, 0}}, state)
assert_receive {:telemetry_event, [:kafee, :queue], %{count: 3}, %{partition: 0, topic: ^topic}}
Expand All @@ -133,7 +208,7 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

@tag capture_log: true
test "any single message too large gets logged and dropped from queue", %{pid: pid, topic: topic} do
test "any single message too large gets logged and not added to queue", %{pid: pid, topic: topic} do
message_fixture = File.read!("test/support/example/large_message.json")
large_message = String.duplicate(message_fixture, 10)

Expand All @@ -149,8 +224,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end)

brod_message = BrodApi.to_kafka_message(message)
assert_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large"
refute_called(:brod.produce(_client_id, ^topic, 0, :undefined, [^brod_message]))
assert log =~ "Message in queue is too large, will not push to Kafka"

async_worker_state = pid |> Patch.Listener.target() |> :sys.get_state()
assert 0 == :queue.len(async_worker_state.queue)
Expand Down Expand Up @@ -271,21 +346,21 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "waits for in flight tasks to complete", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

state = %{state | queue: :queue.from_list(remaining_messages)}
state = %{state | queue: :queue.from_list(messages)}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for in flight send and sends remaining messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:ok, 0, 0}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
Expand All @@ -294,31 +369,31 @@ defmodule Kafee.Producer.AsyncWorkerTest do

@tag capture_log: true
test "waits for in flight error and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: :infinity}

Process.send_after(self(), {task.ref, {:error, :internal_error}}, 10)
assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "waits for timeout and retries sending messages", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(remaining_messages)
messages = BrodApi.generate_producer_message_list(topic, 20)
remaining_brod_messages = BrodApi.to_kafka_message(messages)

task = make_fake_task()
state = %{state | queue: :queue.from_list(remaining_messages), send_task: task, send_timeout: 10}
state = %{state | queue: :queue.from_list(messages), send_task: task, send_timeout: 10}

assert :ok = AsyncWorker.terminate(:normal, state)
assert_called_once(:brod.produce(_client_id, ^topic, 0, _key, ^remaining_brod_messages))
end

test "any brod errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
{:error, :timeout}
Expand All @@ -335,8 +410,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
end

test "any raised errors are logged before terminate", %{state: state, topic: topic} do
remaining_messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = BrodApi.generate_producer_message_list(topic, 10)
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

patch(:brod, :sync_produce_request_offset, fn _ref, _timeout ->
raise RuntimeError, message: "test"
Expand Down Expand Up @@ -369,8 +444,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
|> Map.put(:value, large_message_fixture)
|> Map.put(:key, "large_msg_1")

remaining_messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
messages = [small_message_1, large_message_1, small_message_2]
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -401,9 +476,9 @@ defmodule Kafee.Producer.AsyncWorkerTest do
small_message_unit_size = kafka_message_size_bytes(small_message)

small_message_total = Kernel.ceil(max_request_bytes / small_message_unit_size) * 2
remaining_messages = BrodApi.generate_producer_message_list(topic, small_message_total)
messages = BrodApi.generate_producer_message_list(topic, small_message_total)

state = %{state | queue: :queue.from_list(remaining_messages), send_timeout: :infinity}
state = %{state | queue: :queue.from_list(messages), send_timeout: :infinity}

log =
capture_log(fn ->
Expand Down Expand Up @@ -440,8 +515,8 @@ defmodule Kafee.Producer.AsyncWorkerTest do
batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(batch) in 10_000..15_000

{_batched_messages, remaining_messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(remaining_messages, 1_040_384))
{_batched_messages, messages} = batch |> length() |> :queue.split(messages)
remaining_batch = private(AsyncWorker.build_message_batch(messages, 1_040_384))
assert length(remaining_batch) in 5_000..10_000

assert 20_000 = length(batch) + length(remaining_batch)
Expand Down
1 change: 0 additions & 1 deletion test/kafee/producer_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ defmodule Kafee.ProducerIntegrationTest do
end)

refute log =~ "Message in queue is too large"
assert log =~ "brod producer process is currently down"
assert log =~ "Successfully sent messages to Kafka"
end
end
Expand Down