diff --git a/lib/kafee/consumer/adapter.ex b/lib/kafee/consumer/adapter.ex index 7a21483..0f7dc7e 100644 --- a/lib/kafee/consumer/adapter.ex +++ b/lib/kafee/consumer/adapter.ex @@ -44,7 +44,7 @@ defmodule Kafee.Consumer.Adapter do end """ - @spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok + @spec push_message(atom(), Kafee.Consumer.options(), Message.t()) :: :ok | {:error, Exception.t()} def push_message(consumer, options, %Message{} = message) do Message.set_logger_metadata(message) @@ -79,7 +79,6 @@ defmodule Kafee.Consumer.Adapter do :ok rescue error -> - consumer.handle_failure(error, message) - :ok + {:error, error} end end diff --git a/lib/kafee/consumer/broadway_adapter.ex b/lib/kafee/consumer/broadway_adapter.ex index 1b1f459..c66f8cd 100644 --- a/lib/kafee/consumer/broadway_adapter.ex +++ b/lib/kafee/consumer/broadway_adapter.ex @@ -221,34 +221,60 @@ defmodule Kafee.Consumer.BroadwayAdapter do tasks = Enum.map(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end)) Task.await_many(tasks, :infinity) else - Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end) + Enum.map(messages, fn message -> do_consumer_work(message, consumer, options) end) end + catch + kind, reason -> + Logger.error( + "Caught #{kind} attempting to handle batch : #{Exception.format(kind, reason, __STACKTRACE__)}", + consumer: inspect(consumer), + messages: inspect(messages) + ) - messages + # If we catch at this stage we have no way of knowing which messages successfully processed + # So we will mark all messages as failed. + Enum.map(messages, &Broadway.Message.failed(&1, reason)) end + # Dialyzer can't recognize that :ok is a valid return type for this function + # due to the rescue clause in push_message/3 + @dialyzer {:nowarn_function, do_consumer_work: 3} defp do_consumer_work( %Broadway.Message{ data: value, metadata: metadata - }, + } = message, consumer, options ) do - Kafee.Consumer.Adapter.push_message( - consumer, - options, - %Kafee.Consumer.Message{ - key: metadata.key, - value: value, - topic: metadata.topic, - partition: metadata.partition, - offset: metadata.offset, - consumer_group: options[:consumer_group_id], - timestamp: DateTime.from_unix!(metadata.ts, :millisecond), - headers: metadata.headers - } - ) + result = + Kafee.Consumer.Adapter.push_message( + consumer, + options, + %Kafee.Consumer.Message{ + key: metadata.key, + value: value, + topic: metadata.topic, + partition: metadata.partition, + offset: metadata.offset, + consumer_group: options[:consumer_group_id], + timestamp: DateTime.from_unix!(metadata.ts, :millisecond), + headers: metadata.headers + } + ) + + case result do + :ok -> message + error -> Broadway.Message.failed(message, error) + end + catch + kind, reason -> + Logger.error("Caught #{kind} attempting to process message: #{Exception.format(kind, reason, __STACKTRACE__)}", + consumer: inspect(consumer), + message: inspect(message) + ) + + Broadway.Message.failed(message, reason) end @doc false @@ -258,8 +284,11 @@ defmodule Kafee.Consumer.BroadwayAdapter do # function above because `Kafee.Consumer.Adapter.push_message/2` will catch any # errors. - error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"} - messages |> List.wrap() |> Enum.each(&consumer.handle_failure(error, &1)) + messages + |> List.wrap() + |> Enum.each(fn message -> + consumer.handle_failure(message.status, message) + end) messages end diff --git a/test/kafee/consumer/adapter_test.exs b/test/kafee/consumer/adapter_test.exs index 6b42452..ae1b459 100644 --- a/test/kafee/consumer/adapter_test.exs +++ b/test/kafee/consumer/adapter_test.exs @@ -20,15 +20,21 @@ defmodule Kafee.Consumer.AdapterTest do end describe "push_message/2" do - test "catches any error raised and returns :ok" do + test "returns ok when the message processes correctly" do + message = Kafee.BrodApi.generate_consumer_message() + + patch(MyConsumer, :handle_message, fn _ -> :ok end) + + assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message) + end + + test "catches any error raised and returns and error" do message = Kafee.BrodApi.generate_consumer_message() error = %RuntimeError{message: "testing error handling"} patch(MyConsumer, :handle_message, fn _ -> raise error end) - assert :ok = Adapter.push_message(MyConsumer, @consumer_options, message) - # credo:disable-for-next-line Credo.Check.Readability.NestedFunctionCalls - assert_called MyConsumer.handle_failure(^error, ^message) + assert {:error, ^error} = Adapter.push_message(MyConsumer, @consumer_options, message) end test "calls Datadog.DataStreams.Integration.Kafka.trace_consume/2" do diff --git a/test/kafee/consumer/broadway_adapter_integration_test.exs b/test/kafee/consumer/broadway_adapter_integration_test.exs index 55d7c6b..36f82ba 100644 --- a/test/kafee/consumer/broadway_adapter_integration_test.exs +++ b/test/kafee/consumer/broadway_adapter_integration_test.exs @@ -128,8 +128,11 @@ defmodule Kafee.Consumer.BroadwayAdapterIntegrationTest do # assert they were done asynchronously assert 100 == task_pids |> Enum.uniq() |> length - assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-1\"}"} - assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-2\"}"} + assert_receive {:error_reason, + "{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-1\"}}}"} + + assert_receive {:error_reason, + "{:failed, {:error, %RuntimeError{message: \"Error handling a message for key-fail-2\"}}}"} end end end