Skip to content

Commit

Permalink
feat: additional options to allow for batching and asynchronous batch…
Browse files Browse the repository at this point in the history
… handling for BroadwayAdapter (#103)

## Related Ticket(s)
SIGNAL-7088

<!--
Enter the Jira issue below in the following format: PROJECT-##
-->

## Checklist

<!--
For each bullet, ensure your pr meets the criteria and write a note
explaining how this PR relates. Mark them as complete as they are done.
All top-level checkboxes should be checked regardless of their relevance
to the pr with a note explaining whether they are relevant or not.
-->

- [x] Code conforms to the [Elixir
Styleguide](https://github.com/christopheradams/elixir_style_guide)

## Problem

Using Kafee's BroadwayAdapter with the FirehoseConsumer in wms-service,
I ran a comparison test with a similar dummy handler for ProcessAdapter
along with the dummy handler for BroadwayAdapter.

ProcessAdapter version took 7 seconds with a LOT of DBConnection pool
errors, and the BroadwayAdapater took 66 seconds (at 12 partitions).

Roughly 10x difference, so a crude way to match would be to jack up
partitions from 12 to 120.

Naturally I then jacked up partition to 120, keeping
consumer_concurrency to 12, and it did help - now it took only 44
seconds.

Then I jacked up the consumer_concurrency to 120, and then I started to
get DBConnection pool / page issues because my local DB couldn’t power
120 connection pooling, even with upping it in dev.exs. Therefore this
“vanilla” approach to scaling won’t work.

<!--
What is the problem you're solving or feature you're implementing? Link
to any Jira tickets or previous discussions of the issue.
-->

## Details

My first thought was if a naive way of tweaking batch config values
would work - changing concurrency, batch size, or partitioning function.
Partitioning function (to override Kafka's partitioning) is locked down
by BroadwayKafka so that's not possible - concurrent processes used
would only be up to the number of partitions, which I guess should be
abiding by the rules of Kafka. But regardless,
[batching](https://hexdocs.pm/broadway/Broadway.html#module-batching)
does expose a place where some chunked operation can happen.

I decided our events are pretty idempotent and protected from rigid
rules of being chronologically ordered - this is battle tested already.
I mean the current ProcessAdapter runs the event handlers
[asynchronously
already](https://github.com/stordco/wms-service/blob/main/lib/warehouse/helpers.ex#L22).

We're exactly mimicing the pattern that ProcessAdapter goes through, if
we use Batching:
  
* ProcessAdapter records the events as they happen for that request_id,
in chronological order, in its process state.
Then, it runs it through the event handlers asynchronously.
* BroadwayAdapter in this PR would have capability to group messages
that are coming in chronological order from partitions into batches, and
for the messages in each batch, there's a new configuration to allow for
the messages to be handled asynchronously.

For simplicity of code paths, the pragmatic approach is chosed - it's
always going to go through a default batching, with a size of 1 unless
overriding config options is passed.

### Local test result

Using the same 400 threshold automation config trigger (see description
in this [PR](stordco/wms-service#4156)), doing
it async with following settings resulted in **7 seconds (previously 66
seconds)** to go through the same number of events, with NO DBConnection
errors popping up! The batch options used were:

```
               batching: [
                 concurrency: System.schedulers_online() * 2,
                 size: 100,
                 timeout: 500,
                 async_run: true
               ]
```



<!--
Include a brief overview of the technical process you took (or are going
to take!) to get from the problem to the solution.
-->
  • Loading branch information
seungjinstord authored Oct 7, 2024
1 parent 33cdb5f commit f003f0b
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 64 deletions.
215 changes: 173 additions & 42 deletions lib/kafee/consumer/broadway_adapter.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule Kafee.Consumer.BroadwayAdapter do
@batching_default_options [
concurrency: System.schedulers_online() * 2,
batch_size: 1
]
@options_schema NimbleOptions.new!(
connect_timeout: [
default: :timer.seconds(10),
Expand Down Expand Up @@ -26,6 +30,60 @@ defmodule Kafee.Consumer.BroadwayAdapter do
Broadway default, which is `System.schedulers_online() * 2`.
""",
type: :non_neg_integer
],
batching: [
required: false,
doc: """
Optional.
Options for `batches` configuration in Broadway.
See its [documentation](https://hexdocs.pm/broadway/Broadway.html#module-the-default-batcher).
## Notes
1. Batching is _always turned on_, with a size of 1.
2. Only the `:default` batcher key is supported.
3. On top of this, there's an optional `async_run` option,
where it will run `Kafee.Consumer.Adapter.push_message` asynchronously across all messages in that batch.
""",
type: :non_empty_keyword_list,
keys: [
concurrency: [
required: true,
type: :non_neg_integer,
doc: """
Set concurrency for batches.
Note that typically due to the limitation of Kafka a single processes will be assigned per partition.
For example, for a 12 partition topic, assigning 50 concurrent batches to run will only end up
using around 12 processes. Rest of the concurrency capacity will be not used.
Default is System.schedulers_online() * 2, which is recommended by BroadwayKafka.
"""
],
size: [
required: true,
type: :non_neg_integer,
doc: """
Max number of messages to have per batch.
Default size is 1.
"""
],
timeout: [
default: 1_000,
type: :non_neg_integer,
doc: """
Max time allowed to wait for adding messages into batch until batch starts processing.
"""
],
async_run: [
default: false,
type: :boolean,
doc: """
Flag that decides batch processing will be done asynchronously.
"""
]
]
]
)

Expand All @@ -40,6 +98,13 @@ defmodule Kafee.Consumer.BroadwayAdapter do
## Options
#{NimbleOptions.docs(@options_schema)}
## Note that batching is always turned on
To simplify the options and logic paths on the configuration, we took a pragmatic approach.
[Batching](https://hexdocs.pm/broadway_kafka/BroadwayKafka.Producer.html#module-concurrency-and-partitioning)
is always turned on, and default size will be 1 unless increased.
"""

@behaviour Broadway
Expand All @@ -54,38 +119,70 @@ defmodule Kafee.Consumer.BroadwayAdapter do
@impl Kafee.Consumer.Adapter
@spec start_link(module(), Kafee.Consumer.options()) :: Supervisor.on_start()
def start_link(consumer, options) do
with {:ok, adapter_options} <- validate_adapter_options(options) do
Broadway.start_link(
__MODULE__,
broadway_config(consumer, options, adapter_options)
)
end
end

defp validate_adapter_options(options) do
adapter_options =
case options[:adapter] do
nil -> []
adapter when is_atom(adapter) -> []
{_adapter, adapter_options} -> adapter_options
end

with {:ok, adapter_options} <- NimbleOptions.validate(adapter_options, @options_schema) do
Broadway.start_link(__MODULE__,
name: consumer,
context: %{
consumer: consumer,
consumer_group: options[:consumer_group_id],
options: options
},
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: [{options[:host], options[:port]}],
group_id: options[:consumer_group_id],
topics: [options[:topic]],
client_config: client_config(options)
]},
concurrency: adapter_options[:consumer_concurrency]
],
processors: [
default: [
concurrency: processor_concurrency(adapter_options)
]
NimbleOptions.validate(adapter_options, @options_schema)
end

defp broadway_config(consumer, options, adapter_options) do
base_config = [
name: consumer,
context: %{
consumer: consumer,
consumer_group: options[:consumer_group_id],
options: options,
adapter_options: adapter_options
},
producer: [
module:
{BroadwayKafka.Producer,
[
hosts: [{options[:host], options[:port]}],
group_id: options[:consumer_group_id],
topics: [options[:topic]],
client_config: client_config(options)
]},
concurrency: adapter_options[:consumer_concurrency]
],
processors: [
default: [
concurrency: processor_concurrency(adapter_options)
]
)
]
]

case Keyword.fetch(adapter_options, :batching) do
{:ok, batching} ->
Keyword.merge(base_config,
batchers: [
default: [
concurrency: batching[:concurrency],
batch_size: batching[:size],
batch_timeout: batching[:timeout]
]
]
)

:error ->
Keyword.merge(base_config,
batchers: [
default: @batching_default_options
]
)
end
end

Expand All @@ -100,34 +197,68 @@ defmodule Kafee.Consumer.BroadwayAdapter do

@doc false
@impl Broadway
def handle_message(:default, %Broadway.Message{data: value, metadata: metadata} = message, %{
consumer: consumer,
options: options
}) do
Kafee.Consumer.Adapter.push_message(consumer, options, %Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
})

def handle_message(:default, %Broadway.Message{} = message, _context) do
message
end

@impl Broadway
def handle_batch(
:default,
messages,
_batch_info,
%{
adapter_options: adapter_options,
consumer: consumer,
options: options
} = _context
) do
batch_config = adapter_options[:batching]

if batch_config[:async_run] do
# No need for Task.Supervisor as it is not running under a GenServer,
# and Kafee.Consumer.Adapter.push_message does already have error handling.
Enum.each(messages, &Task.async(fn -> do_consumer_work(&1, consumer, options) end))
else
Enum.each(messages, fn message -> do_consumer_work(message, consumer, options) end)
end

messages
end

defp do_consumer_work(
%Broadway.Message{
data: value,
metadata: metadata
},
consumer,
options
) do
Kafee.Consumer.Adapter.push_message(
consumer,
options,
%Kafee.Consumer.Message{
key: metadata.key,
value: value,
topic: metadata.topic,
partition: metadata.partition,
offset: metadata.offset,
consumer_group: options[:consumer_group_id],
timestamp: DateTime.from_unix!(metadata.ts, :millisecond),
headers: metadata.headers
}
)
end

@doc false
@impl Broadway
def handle_failed(message, %{consumer: consumer}) do
def handle_failed(messages, %{consumer: consumer}) do
# This error only occurs when there is an issue with the `handle_message/2`
# function above because `Kafee.Consumer.Adapter.push_message/2` will catch any
# errors.

error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"}
consumer.handle_failure(error, message)
messages |> List.wrap() |> Enum.each(&consumer.handle_failure(error, &1))

message
messages
end
end
Loading

0 comments on commit f003f0b

Please sign in to comment.