Skip to content

Commit

Permalink
fix: update dialyzer types for producer module (#74)
Browse files Browse the repository at this point in the history
This adds a `Kafee.Producer.Message.input` type that includes the
optional nil field values that get autofilled by the module. This fixes
the dialyzer on our first Kafee 3.0 producer PR
stordco/product_catalog_service#500 🎊
  • Loading branch information
btkostner authored Nov 22, 2023
1 parent 1fd61db commit f1fa928
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 26 deletions.
9 changes: 5 additions & 4 deletions lib/kafee/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ defmodule Kafee.Producer do
Sends a single message to the configured adapter to be
sent to Kafka.
"""
@spec produce(Kafee.Producer.Message.t() | [Kafee.Producer.Message.t()]) :: :ok | {:error, term()}
def produce(%Kafee.Producer.Message{} = message),
do: produce([message])
@spec produce(Kafee.Producer.Message.input() | [Kafee.Producer.Message.input()]) :: :ok | {:error, term()}
def produce(%Kafee.Producer.Message{} = message) do
Kafee.Producer.produce(__MODULE__, [message])
end

@doc """
Sends a list of messages to the configured adapter to be
Expand Down Expand Up @@ -260,7 +261,7 @@ defmodule Kafee.Producer do
@doc """
Produces a list of messages via the given `Kafee.Producer` module.
"""
@spec produce(module(), [Kafee.Producer.Message.t()]) :: :ok | {:error, term()}
@spec produce(module(), [Kafee.Producer.Message.input()]) :: :ok | {:error, term()}
def produce(producer, messages) do
options = :ets.lookup_element(:kafee_config, producer, 2)

Expand Down
2 changes: 1 addition & 1 deletion lib/kafee/producer/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ defmodule Kafee.Producer.Adapter do
@doc """
Sends all of the given messages to the adapter for sending.
"""
@callback produce([Producer.Message.t()], module(), Producer.options()) :: :ok | {:error, term()}
@callback produce([Producer.Message.input()], module(), Producer.options()) :: :ok | {:error, term()}
end
2 changes: 1 addition & 1 deletion lib/kafee/producer/async_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ defmodule Kafee.Producer.AsyncAdapter do
to be sent to Kafka in the future.
"""
@impl Kafee.Producer.Adapter
@spec produce([Message.t()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
@spec produce([Message.input()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
def produce(messages, producer, options) do
message_groups =
messages
Expand Down
52 changes: 34 additions & 18 deletions lib/kafee/producer/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ defmodule Kafee.Producer.Message do

@derive {Jason.Encoder, except: []}

defstruct [
:key,
:value,
:topic,
:partition,
:partition_fun,
headers: []
]
defstruct key: "",
value: "",
topic: nil,
partition: nil,
partition_fun: nil,
headers: []

defmodule ValidationError do
@moduledoc false
defexception [:error_key, :kafee_message, :message]
end

@typedoc """
A fully populated producer message. This does not include
any `nil` fields as they have all been set.
"""
@type t :: %Message{
key: Kafee.key(),
value: Kafee.value() | any(),
Expand All @@ -32,6 +34,19 @@ defmodule Kafee.Producer.Message do
headers: Kafee.headers()
}

@typedoc """
A Kafka message producer input. This includes `nil` fields that
are auto populated by the producer module.
"""
@type input :: %Message{
key: Kafee.key(),
value: Kafee.value() | any(),
topic: Kafee.topic() | nil,
partition: Kafee.partition() | nil,
partition_fun: Kafee.partition_fun() | nil,
headers: Kafee.headers()
}

@doc """
Sets message keys that are `nil` to values from the producer
module. This allows you to _not_ specify a topic or partition
Expand All @@ -47,7 +62,7 @@ defmodule Kafee.Producer.Message do
%Message{partition_fun: :hash}
"""
@spec set_module_values(t(), module(), Kafee.Producer.options()) :: t()
@spec set_module_values(input(), module(), Kafee.Producer.options()) :: input()
def set_module_values(%Message{} = message, _producer, options) do
message
|> replace_nil(:topic, options[:topic])
Expand All @@ -71,7 +86,7 @@ defmodule Kafee.Producer.Message do
%Message{value: ~s({"key":"value"}), headers: [{"kafka_contentType", "application/json"}]}
"""
@spec encode(t(), module(), Kafee.Producer.options()) :: t()
@spec encode(input(), module(), Kafee.Producer.options()) :: input()
def encode(%Message{} = message, _producer, options) do
case Keyword.get(options, :encoder, nil) do
nil ->
Expand Down Expand Up @@ -104,7 +119,7 @@ defmodule Kafee.Producer.Message do
%Message{key: "key", value: "value", partition: 0, partition_fun: :random}
"""
@spec partition(t(), module(), Kafee.Producer.options()) :: t()
@spec partition(input(), module(), Kafee.Producer.options()) :: input()
def partition(%Message{partition: nil} = message, producer, opts) do
# We override the topic because messages being consumed and technically
# be published to dynamic topics.
Expand Down Expand Up @@ -153,7 +168,7 @@ defmodule Kafee.Producer.Message do
}
"""
@spec set_request_id(t(), String.t()) :: t()
@spec set_request_id(input(), String.t()) :: input()
def set_request_id(%Message{} = message, request_id) do
new_headers =
message.headers
Expand All @@ -177,7 +192,7 @@ defmodule Kafee.Producer.Message do
}
"""
@spec set_request_id_from_logger(t()) :: t()
@spec set_request_id_from_logger(input()) :: input()
def set_request_id_from_logger(%Message{} = message) do
case Keyword.get(Logger.metadata(), :request_id, nil) do
request_id when is_binary(request_id) -> set_request_id(message, request_id)
Expand All @@ -188,7 +203,7 @@ defmodule Kafee.Producer.Message do
@doc """
Sets the `kafka_contentType` header in a message.
"""
@spec set_content_type(t(), binary()) :: t()
@spec set_content_type(input(), binary()) :: input()
def set_content_type(%Message{} = message, content_type) do
new_headers =
message.headers
Expand All @@ -201,7 +216,7 @@ defmodule Kafee.Producer.Message do
@doc """
Checks if the given message has a header with the given key.
"""
@spec has_header?(t(), binary()) :: boolean()
@spec has_header?(input(), binary()) :: boolean()
def has_header?(%Message{} = message, key) do
Enum.any?(message.headers, fn {k, _v} -> k == key end)
end
Expand All @@ -210,7 +225,7 @@ defmodule Kafee.Producer.Message do
Validates that the requires fields in a message are set. Raises
`Kafee.Producer.Message.ValidationError` on a missing or incorrect field.
"""
@spec validate!(t()) :: t()
@spec validate!(input()) :: t()
def validate!(%Message{topic: nil} = message),
do:
raise(ValidationError,
Expand Down Expand Up @@ -266,7 +281,7 @@ defmodule Kafee.Producer.Message do
"(anonymous) publish"
"""
@spec get_otel_span_name(t()) :: String.t()
@spec get_otel_span_name(input()) :: String.t()
def get_otel_span_name(%Message{} = message) do
prefix = if is_nil(message.topic), do: "(anonymous)", else: message.topic
prefix <> " publish"
Expand Down Expand Up @@ -311,6 +326,7 @@ defmodule Kafee.Producer.Message do
}
iex> get_otel_span_attributes(%Message{
...> key: nil,
...> value: "message value"
...> })
%{
Expand All @@ -319,7 +335,7 @@ defmodule Kafee.Producer.Message do
}
"""
@spec get_otel_span_attributes(t() | [t()]) :: map()
@spec get_otel_span_attributes(input() | [input()]) :: map()
def get_otel_span_attributes([message | _] = messages) when is_list(messages) do
message
|> get_otel_span_attributes()
Expand Down
2 changes: 1 addition & 1 deletion lib/kafee/producer/sync_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ defmodule Kafee.Producer.SyncAdapter do
Calls the `:brod.produce_sync/5` function.
"""
@impl Kafee.Producer.Adapter
@spec produce([Message.t()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
@spec produce([Message.input()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
def produce(messages, producer, options) do
for message <- messages do
message =
Expand Down
2 changes: 1 addition & 1 deletion lib/kafee/producer/test_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Kafee.Producer.TestAdapter do
Adds messages to the internal memory.
"""
@impl Kafee.Producer.Adapter
@spec produce([Message.t()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
@spec produce([Message.input()], module(), Kafee.Producer.options()) :: :ok | {:error, term()}
def produce(messages, producer, options) do
pid = Application.get_env(:kafee, :test_process, self())

Expand Down

0 comments on commit f1fa928

Please sign in to comment.