Skip to content

Commit

Permalink
fix: Improve Kafee exception handling (#126)
Browse files Browse the repository at this point in the history
## Related Ticket(s)

<!--
Enter the Jira issue below in the following format: PROJECT-##
-->

SIGNAL-7451

## Checklist

<!--
For each bullet, ensure your pr meets the criteria and write a note
explaining how this PR relates. Mark them as complete as they are done.
All top-level checkboxes should be checked regardless of their relevance
to the pr with a note explaining whether they are relevant or not.
-->

- [x] Code conforms to the [Elixir
Styleguide](https://github.com/christopheradams/elixir_style_guide)

## Problem

<!--
What is the problem you're solving or feature you're implementing? Link
to any Jira tickets or previous discussions of the issue.
-->

We've noticed on wms-service that under high load, sometimes we get
error messages in our consumers. These bubble up and trigger the
exception handling of `Broadway`. This PR aims to more gracefully catch
exits that occur in `Task.await_many/2` and mark the messages as failed.

## Details

<!--
Include a brief overview of the technical process you took (or are going
to take!) to get from the problem to the solution.
-->

There is also a shift in approach - instead of manually calling
`handle_failure` and then returning the message in failed cases, I am
opting to shift our usage to reflect the library. `handle_failure` is
called by the library itself, so there's no need for Kafee to call it
manually if we use `Broadway.Message.failed/2`
  • Loading branch information
jondthomas authored Dec 12, 2024
1 parent 3ab284c commit fa5bd44
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 28 deletions.
5 changes: 2 additions & 3 deletions lib/kafee/consumer/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Kafee.Consumer.Adapter do
end
"""
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok | {:error, Exception.t()}
def push_message(consumer, options, %Message{} = message) do
Message.set_logger_metadata(message)

Expand Down Expand Up @@ -79,7 +79,6 @@ defmodule Kafee.Consumer.Adapter do
:ok
rescue
error ->
consumer.handle_failure(error, message)
:ok
{:error, error}
end
end
67 changes: 48 additions & 19 deletions lib/kafee/consumer/broadway_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,34 +221,60 @@ defmodule Kafee.Consumer.BroadwayAdapter do
tasks = Enum.map(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end))
Task.await_many(tasks, :infinity)
else
Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end)
Enum.map(messages, fn message -> do_consumer_work(message, consumer, options) end)
end
catch
kind, reason ->
Logger.error(
"Caught #{kind} attempting to handle batch : #{Exception.format(kind, reason, __STACKTRACE__)}",
consumer: inspect(consumer),
messages: inspect(messages)
)

messages
# If we catch at this stage we have no way of knowing which messages successfully processed
# So we will mark all messages as failed.
Enum.map(messages, &Broadway.Message.failed(&1, reason))
end

# Dialyzer can't recognize that :ok is a valid return type for this function
# due to the rescue clause in push_message/3
@dialyzer {:nowarn_function, do_consumer_work: 3}
defp do_consumer_work(
%Broadway.Message{
data: value,
metadata: metadata
},
} = message,
consumer,
options
) do
Kafee.Consumer.Adapter.push_message(
consumer,
options,
%Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
}
)
result =
Kafee.Consumer.Adapter.push_message(
consumer,
options,
%Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
}
)

case result do
:ok -> message
error -> Broadway.Message.failed(message, error)
end
catch
kind, reason ->
Logger.error("Caught #{kind} attempting to process message: #{Exception.format(kind, reason, __STACKTRACE__)}",
consumer: inspect(consumer),
message: inspect(message)
)

Broadway.Message.failed(message, reason)
end

@doc false
Expand All @@ -258,8 +284,11 @@ defmodule Kafee.Consumer.BroadwayAdapter do
# function above because `Kafee.Consumer.Adapter.push_message/2` will catch any
# errors.

error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"}
messages |> List.wrap() |> Enum.each(&consumer.handle_failure(error, &1))
messages
|> List.wrap()
|> Enum.each(fn message ->
consumer.handle_failure(message.status, message)
end)

messages
end
Expand Down
14 changes: 10 additions & 4 deletions test/kafee/consumer/adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ defmodule Kafee.Consumer.AdapterTest do
end

describe "push_message/2" do
test "catches any error raised and returns :ok" do
test "returns ok when the message processes correctly" do
message = Kafee.BrodApi.generate_consumer_message()

patch(MyConsumer, :handle_message, fn _ -> :ok end)

assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message)
end

test "catches any error raised and returns and error" do
message = Kafee.BrodApi.generate_consumer_message()
error = %RuntimeError{message: "testing error handling"}

patch(MyConsumer, :handle_message, fn _ -> raise error end)

assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message)
# credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls
assert_called MyConsumer.handle_failure(^error, ^message)
assert {:error, ^error} = Adapter.push_message(MyConsumer, @consumer_options, message)
end

test "calls Datadog.DataStreams.Integration.Kafka.trace_consume/2" do
Expand Down
7 changes: 5 additions & 2 deletions test/kafee/consumer/broadway_adapter_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ defmodule Kafee.Consumer.BroadwayAdapterIntegrationTest do
# assert they were done asynchronously
assert 100 == task_pids |> Enum.uniq() |> length

assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-1\"}"}
assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-2\"}"}
assert_receive {:error_reason,
"{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-1\"}}}"}

assert_receive {:error_reason,
"{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-2\"}}}"}
end
end
end

0 comments on commit fa5bd44

Please sign in to comment.