Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve Kafee exception handling #126

Merged
merged 12 commits into from
Dec 12, 2024
5 changes: 2 additions & 3 deletions lib/kafee/consumer/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Kafee.Consumer.Adapter do
end

"""
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok
@spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok | {:error, Exception.t()}
def push_message(consumer, options, %Message{} = message) do
Message.set_logger_metadata(message)

Expand Down Expand Up @@ -79,7 +79,6 @@ defmodule Kafee.Consumer.Adapter do
:ok
rescue
error ->
consumer.handle_failure(error, message)
:ok
{:error, error}
end
end
67 changes: 48 additions & 19 deletions lib/kafee/consumer/broadway_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,34 +221,60 @@ defmodule Kafee.Consumer.BroadwayAdapter do
tasks = Enum.map(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end))
Task.await_many(tasks, :infinity)
else
Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end)
Enum.map(messages, fn message -> do_consumer_work(message, consumer, options) end)
kinson marked this conversation as resolved.
Show resolved Hide resolved
end
catch
kind, reason ->
Logger.error(
"Caught #{kind} attempting to handle batch : #{Exception.format(kind, reason, __STACKTRACE__)}",
consumer: inspect(consumer),
messages: inspect(messages)
)

messages
# If we catch at this stage we have no way of knowing which messages successfully processed
# So we will mark all messages as failed.
Enum.map(messages, &Broadway.Message.failed(&1, reason))
end

# Dialyzer can't recognize that :ok is a valid return type for this function
# due to the rescue clause in push_message/3
@dialyzer {:nowarn_function, do_consumer_work: 3}
Comment on lines +239 to +241
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the rescue on Kafee.Consumer.Adapter.push_message makes Dialyzer think that {:error, Exception.t()} is the only possible return... I've tried a couple different ways to fix without luck. Open to ideas

defp do_consumer_work(
%Broadway.Message{
data: value,
metadata: metadata
},
} = message,
consumer,
options
) do
Kafee.Consumer.Adapter.push_message(
consumer,
options,
%Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
}
)
result =
Kafee.Consumer.Adapter.push_message(
consumer,
options,
%Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
}
)

case result do
:ok -> message
error -> Broadway.Message.failed(message, error)
end
catch
kind, reason ->
Logger.error("Caught #{kind} attempting to process message: #{Exception.format(kind, reason, __STACKTRACE__)}",
consumer: inspect(consumer),
message: inspect(message)
)

Broadway.Message.failed(message, reason)
end

@doc false
Expand All @@ -258,8 +284,11 @@ defmodule Kafee.Consumer.BroadwayAdapter do
# function above because `Kafee.Consumer.Adapter.push_message/2` will catch any
# errors.

error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"}
messages |> List.wrap() |> Enum.each(&consumer.handle_failure(error, &1))
messages
|> List.wrap()
|> Enum.each(fn message ->
consumer.handle_failure(message.status, message)
end)
kinson marked this conversation as resolved.
Show resolved Hide resolved

messages
end
Expand Down
14 changes: 10 additions & 4 deletions test/kafee/consumer/adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ defmodule Kafee.Consumer.AdapterTest do
end

describe "push_message/2" do
test "catches any error raised and returns :ok" do
test "returns ok when the message processes correctly" do
message = Kafee.BrodApi.generate_consumer_message()

patch(MyConsumer, :handle_message, fn _ -> :ok end)

assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message)
end

test "catches any error raised and returns and error" do
message = Kafee.BrodApi.generate_consumer_message()
error = %RuntimeError{message: "testing error handling"}

patch(MyConsumer, :handle_message, fn _ -> raise error end)

assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message)
# credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls
assert_called MyConsumer.handle_failure(^error, ^message)
assert {:error, ^error} = Adapter.push_message(MyConsumer, @consumer_options, message)
end

test "calls Datadog.DataStreams.Integration.Kafka.trace_consume/2" do
Expand Down
7 changes: 5 additions & 2 deletions test/kafee/consumer/broadway_adapter_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ defmodule Kafee.Consumer.BroadwayAdapterIntegrationTest do
# assert they were done asynchronously
assert 100 == task_pids |> Enum.uniq() |> length

assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-1\"}"}
assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-2\"}"}
assert_receive {:error_reason,
"{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-1\"}}}"}

assert_receive {:error_reason,
"{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-2\"}}}"}
end
end
end
Loading