diff --git a/lib/kafee/consumer/broadway_adapter.ex b/lib/kafee/consumer/broadway_adapter.ex index 7813b3b..82cbe44 100644 --- a/lib/kafee/consumer/broadway_adapter.ex +++ b/lib/kafee/consumer/broadway_adapter.ex @@ -217,7 +217,8 @@ defmodule Kafee.Consumer.BroadwayAdapter do if batch_config[:async_run] do # No need for Task.Supervisor as it is not running under a GenServer, # and Kafee.Consumer.Adapter.push_message does already have error handling. - Enum.each(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end)) + tasks = Enum.map(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end)) + Task.await_many(tasks) else Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end) end