-
Notifications
You must be signed in to change notification settings - Fork 9
Learning Elixir's `GenServer` with a real world example (WIP)
I started learning Elixir a few months ago, mostly through hacking on Papercups. I'm ashamed to say most of my Elixir education has been through trial and error, figuring things out as I go along. So this past week I decided to take some time off from Papercups to go a bit deeper into Elixir.
In particular, I was itching to learn more about handling concurrency in Elixir. This, of course, led me to GenServers.
Anyone who's moderately familiar with Elixir has probably at least heard of GenServer
. (If you haven't, that's ok too!) Strictly speaking, it's one of those things you can get away with avoiding for a while, and still be reasonably productive while using something like the Phoenix framework.
But it's an incredible useful feature of the language that you can add to your toolbox! There are so many nice things you can do with it. For example, with GenServers, you can:
- create a simple cache, buffer, or rate limiter
- set up recurring tasks to run every few hours (e.g. check out Jose's answer to "How can I schedule code to run every few hours in Elixir?" on Stack Overflow)
- handle async processes much more easily, with built-in functionality for tracing, error reporting, and retry logic
...all without any external dependencies!
While learning about GenServers, I found many of the examples to be a bit contrived... a lot of "key-value" stores, shopping lists, and things like that. I wanted to understand how GenServers are being used "out in the wild", so I turned to one of my favorite Elixir open-source repos: https://github.com/plausible/analytics
A bit of background: Plausible is an open-source analytics platform, with >1000 paying customers. Because they are a real-world production application serving many users, I figured I could learn a lot from diagnosing one of their modules. 🤓
For the sake of this post, I'm going to go through one of the simpler GenServers in their repo: Plausible.Event.WriteBuffer, which can be found at /lib/plausible/event/write_buffer.ex
At a high level, this GenServer allows Plausible to insert large quantities of events into the database in batches, rather than doing an insertion for each individual event. (This seems particularly important for a product like Plausible's, where they are potentially ingesting thousands of events per second!)
At the code level, what this process is doing is essentially:
- ingesting events via the
insert/1
method - adding them to a
buffer
in the process's internal state, represented by a list (i.e.[]
) - "flushing" the buffer every 5 seconds, or if it reaches its capacity of 10,000 events — whichever comes first
- where "flushing" means saving all the events in the buffer to a database (in this case, ClickHouse)
(For the full context, let's take a look at the codebase: we can see that the Plausible.Event.WriteBuffer
GenServer is used in the PlausibleWeb.Api.ExternalController.event/2
method, which handles the logic for the POST /api/event
API endpoint, which in turn gets called from their JavaScript tracking snippet.)
Before we jump into it, here's the full module, with some minor stylistic tweaks and annotations I've added myself for some additional context:
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
@flush_interval_ms 5_000
@max_buffer_size 10_000
# Client APIs
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
# Server (callbacks)
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
# Private/utility methods
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
end
Note: If you're able to read the code above and know exactly what's going on at every step, you probably don't need to read any further!
Let's start off where the core of the logic actually lives: in the GenServer callbacks.
The callbacks implemented in this particular module are:
-
init/1
(required) - invoked when the server is started, and sets the initial state -
handle_cast/2
- invoked whenGenServer.cast/2
is called, to handle asynchronous messages -
handle_call/3
- invoked whenGenServer.call/3
is called, to handle synchronous messages-
GenServer.call/3
will block until a reply is received (unless the call times out or nodes are disconnected)
-
-
handle_info/2
- invoked to handle all other messages (i.e. outside of those triggered byGenServer.call/3
andGenServer.cast/2
)- e.g. messages/events handled internally within the GenServer
-
terminate/2
- invoked when the GenServer is about to exit
Let's go through each one! 🚀
The init/1
callback is invoked when the GenServer is started, and handles setting the initial internal state of the server process.
Here's the method definition from above:
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
The first line of this method (Process.flag(:trap_exit, true)
) sets us up to "trap exits". This allows us to handle any "clean-up" tasks before the process terminates, in the terminate/2
callback. (We'll discuss this below!)
From Elixir in Action:
"Graceful termination of a GenServer worker involves invoking the terminate/2 callback, but only if the worker process is trapping exits. Therefore, if you want to do some cleanup from a GenServer process, make sure you set up an exit trap from an init/1 callback."
Next, we set up a timer to run the :tick
event in 5 seconds (@flush_interval_ms == 5_000
). As we'll see below, this :tick
event also calls itself recursively, so that it effectively runs every 5 seconds while the server is alive. (That explains why it's called "tick"!)
Finally, we store the initial buffer
(in this case, an empty list, i.e. []
) and the timer
in the internal state.
The handle_cast/2
callback is invoked when GenServer.cast/2
is called, to handle asynchronous messages.
Here's the method definition from above:
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
All this is doing is:
- Inserting a new event into the internal state's
buffer
, by prepending it to the list - Checking if the buffer has reached capacity, and if so,
- cancels the current timer to prevent the next automatic
flush
from happening - performs the
flush
operation manually withdo_flush/1
- sets up a new timer which will trigger the next automatic
flush
in 5 seconds - resets the internal state's
buffer
to an empty list, and updates the timer as well
- cancels the current timer to prevent the next automatic
- If the buffer is not at capacity, it simply updates the internal state's
buffer
The reason this is handled in a handle_cast/2
callback rather than a handle_call/3
callback is so that the client can execute this asynchronously, without blocking on the completion of a potential do_flush/1
method invocation. (As we'll see below, the "flushing"/saving of 10,000 events is a potentially expensive operation that could take a few seconds to complete.)
Let's take a quick look at the do_flush/1
method to see what's going on there:
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
This method is just standard Elixir. All it's doing is taking the events in the buffer, formatting them into plain maps (rather than structs), and then persisting them to the database (ClickHouse). (If we wanted a slightly more explicit name for this method, we could call it something like save_to_clickhouse/1
instead.)
Note that if we wanted to save some whitespace, this could also be written like:
defp do_flush([]), do: nil
defp do_flush(events) do
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
Hooray for pattern matching!
The handle_call/3
callback is invoked when GenServer.call/3
is called, to handle synchronous messages. Note that GenServer.call/3
will block until a reply is received (unless the call times out or nodes are disconnected).
Here's the method definition from above:
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
This code looks quite similar to the logic above, in the handle_cast/2
callback. In this case, we're allow the :flush
event to be called manually, which takes whatever is currently in the state's buffer
and saves it to the database (in the do_flush/1
method). Once again, the buffer
is reset to an empty list, and timer
is reset as well.
The handle_info/2
callback is invoked to handle all other messages (i.e. outside of those triggered by GenServer.call/3
and GenServer.cast/2
). In our case, we use it to handle messages passed around internally within the GenServer itself.
Here's the method definition from above:
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
The :tick
handles flushing the events in our buffer
at a regular interval. (In this case, every 5 seconds.)
It does this by recursively calling itself with Process.send_after(self(), :tick, @flush_interval_ms)
This callback is invoked when the server is about to exit, and can handle any clean-up tasks.
Here's the method definition from above:
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
By trapping exists in the init/1
callback above, we can ensure that before this GenServer process terminates, we flush anything that's left in our buffer
. Otherwise, we might end up in a situation where the process is terminated before some of the events in memory don't have a chance to get saved to the database, and would be lost.
The APIs exposed to the client are mostly boilerplate, and pretty straightforward:
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
Let's break it down!
This is probably the least intuitive of the three methods above — once we understand the code here, the rest should more or less fall into place.
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
As you might guess, the start_link/1
method is responsible for, well, starting the GenServer process. All it's doing is calling start_link/3
on GenServer
itself, with a few strange arguments:
-
__MODULE__
, which is simply an alias for the module itself (in this case,Plausible.Event.WriteBuffer
). This first argument passes thePlausible.Event.WriteBuffer
in as the callback module, so that we can use the server callbacks we implemented above when certain messages are passed to the server process viaGenServer.call
andGenServer.cast
.- Note that calling
GenServer.start_link(Plausible.Event.WriteBuffer, [], name: Plausible.Event.WriteBuffer)
would result in identical behavior. The main advantage of using__MODULE__
is if we decide to rename the module, we only have to update it in one place!
- Note that calling
- An empty list (i.e.
[]
) is passed in as the second argument. This is passed to ourinit/1
callback as the defaultbuffer
. We'll take a look at this in more detail below. - The last argument is a keyword list of options. Here, we assign the process a name with
name: __MODULE__
(which is the same asname: Plausible.Event.WriteBuffer
). This allows us to pass messages to the GenServer using the module name (i.eGenServer.call(__MODULE__, message, timeout)
/GenServer.cast(__MODULE__, message)
). Otherwise, after starting the GenServer, we would have to keep a reference to its process ID (or "PID", which is returned in thestart_link/1
method), and then pass it in explicitly. (This is a very common practice.)
(GenServer also has a method called simply start/3
, which takes the same arguments as start_link/3
— the only difference is that start_link/3
"links" the GenServer process to the current process, which is useful when we want to start the GenServer as part of a "supervision tree". By starting a process with start_link
under a Supervisor
, it allows the application to monitor the process for any errors/crashes, and automatically restart the process if necessary.)
In practice, this is where the process is started: https://github.com/plausible/analytics/blob/master/lib/plausible/application.ex
Here's the relevant code below:
defmodule Plausible.Application do
use Application
def start(_type, _args) do
children = [
# ...
Plausible.Event.WriteBuffer,
# ...
]
opts = [strategy: :one_for_one, name: Plausible.Supervisor]
# ...
Supervisor.start_link(children, opts)
end
# ...
end
GenServer's start_link/3
method is what enables us to start the process under this Supervisor
. (When we include Plausible.Event.WriteBuffer
amongst the children of the supervisor, its start_link/3
method is invoked automatically when Supervisor.start_link/2
is called.)
Hopefully at this point, we have a pretty good understanding of what's going on in Plausible.Event.WriteBuffer.start_link/1
! Now all that's left to cover are the two methods exposed in our client API: insert/1
and flush/0
One last time, here's the insert/1
method:
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
Here we see that all this is doing is taking an event
and sending an asynchronous message to our GenServer via the cast/2
method. (Since we're using cast/2
here instead of call/3
, we know that this is a non-blocking call which will return immediately, regardless of what happens in the callback.)
The first argument of GenServer.cast/2
takes a process ID (i.e. PID) or a registered server name. In this case, we set the name to __MODULE__
in the 3rd argument of our GenServer.start_link/3
above, which is why we're using that here. (This is a very common practice with GenServers.)
The second argument is the "message" we want to send, often in the form of a tuple or an atom. Here we're sending {:insert, event}
, which will be handled in the handle_cast({:insert, event}, ...)
callback discussed above.
Since we don't wait for a return value, the insert/1
method just returns an echo of the event that was passed in, in the form of {:ok, event}
.
We're almost done! Let's take a look at the flush/0
method:
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
This is quite similar to what we saw in insert/1
. The only differences are:
-
GenServer.call/3
, unlikeGenServer.cast/2
, blocks the process until the callback completes. So if the:flush
event ends up taking 10 seconds to execute, theflush/0
method will not return:ok
until 10 seconds have passed. -
call/3
takes a third argument, which we see here is:infinity
. This represents the maximum amount of time we allow the method to take before a timeout error occurs. By default, this is set to5_000
, or 5 seconds. By passing in:infinity
, we're allowing the process to take as long as it needs to complete.
Let's take one last look at the full module definition, to double check that we understand everything that's going on. I've added some comments in the code to help out a bit. 😉
defmodule Plausible.Event.WriteBuffer do
use GenServer
require Logger
# Flush/save every 5 seconds
@flush_interval_ms 5_000
# Allow a maximum of 10,000 events in our buffer list
@max_buffer_size 10_000
############################################################################
# Client APIs
############################################################################
@doc """
Starts a linked GenServer process, passing in the current module
(`__MODULE__` == `Plausible.Event.WriteBuffer`) as both the callback module
and the alias/name of the server process for future reference.
We also pass in an empty list (`[]`) as the initial value of the internal
state's `buffer` (handled in the `init/1` callback below).
"""
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@doc """
Sends an asynchronous `:insert` message to the current module's GenServer
process, which adds the provided `event` to the internal state's `buffer`.
"""
def insert(event) do
GenServer.cast(__MODULE__, {:insert, event})
{:ok, event}
end
@doc """
Sends a (synchronous) `:flush` message to the current module's GenServer
process, which manually "flushes" all events in the internal state's `buffer`
to the database.
Sets the `timeout` option to `:infinity`, allowing this call to take as long
as it needs to complete.
"""
def flush() do
GenServer.call(__MODULE__, :flush, :infinity)
:ok
end
############################################################################
# Server callbacks
############################################################################
@doc """
Sets up the initial state of the GenServer, and traps exits so that we can
gracefully handle a process termination in our `terminate/2` callback below.
"""
@impl true
def init(buffer) do
Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:ok, %{buffer: buffer, timer: timer}}
end
@doc """
Callback that handles `:insert` messages passed to `GenServer.cast/2`.
In this callback, we add the new event to the internal state's `buffer`.
If we reach the maximum capacity of the `buffer`, we "flush" it to the database
and reset our state. Otherwise, we simply update the internal state with the
updated `buffer`.
"""
@impl true
def handle_cast({:insert, event}, %{buffer: buffer, timer: timer} = _state) do
new_buffer = [event | buffer]
if length(new_buffer) >= @max_buffer_size do
Logger.info("Buffer full, flushing to disk")
Process.cancel_timer(timer)
do_flush(new_buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: new_timer}}
else
{:noreply, %{state | buffer: new_buffer}}
end
end
@doc """
Callback that handles internal `:tick` messages.
In this callback, we automatically flush whatever is in the `buffer`
every 5 seconds (i.e. `@flush_interval_ms`).
"""
@impl true
def handle_info(:tick, %{buffer: buffer} = _state) do
do_flush(buffer)
timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:noreply, %{buffer: [], timer: timer}}
end
@doc """
Callback that handles `:flush` messages passed to `GenServer.call/3`.
In this callback, we handle manually flushing whatever is currently in
the internal state's `buffer` to the database.
"""
@impl true
def handle_call(:flush, _from, %{buffer: buffer, timer: timer} = _state) do
Process.cancel_timer(timer)
do_flush(buffer)
new_timer = Process.send_after(self(), :tick, @flush_interval_ms)
{:reply, nil, %{buffer: [], timer: new_timer}}
end
@doc """
Callback that gets invoked when the process is about to shut down.
In this callback, we make sure to flush whatever is currently in the internal
state's `buffer` to the database before the process terminates.
"""
@impl true
def terminate(_reason, %{buffer: buffer} = _state) do
Logger.info("Flushing event buffer before shutdown...")
do_flush(buffer)
end
############################################################################
# Private/utility methods
############################################################################
@doc """
Perform a "flush" of the events in the `buffer` to the database. By "flush",
we simply mean we do a bulk insertion of the events into ClickHouse (the db).
"""
defp do_flush(buffer) do
case buffer do
[] ->
nil
events ->
Logger.info("Flushing #{length(events)} events")
events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__)))
Plausible.ClickhouseRepo.insert_all(Plausible.ClickhouseEvent, events)
end
end
end
Here's the GenServer being used in the wild: https://github.com/plausible/analytics/blob/d27dad729904a4b38350259f9c460d8c78047f25/lib/plausible_web/controllers/api/external_controller.ex#L111
- Frontend hitting
POST /api/event
endpoint: TODO -
/api/event
definition in router: TODO - EventController handling incoming requests: TODO
- Where the GenServer is used: TODO
As we can see, this controller defines the logic for the POST /api/event
endpoint.
TODO: maybe describe how this GenServer is used at the top for context, rather than down here?
TODO: how much more of an explanation is necessary?
TODO: maybe it would be better to explain the client APIs first, and work our way from top to bottom?
TODO: use analogy of Redux store: dispatching events and handling them in a reducer (immutable state and all that jazz)
- (In a way, it's similar to how Redux operates. We dispatch "actions" to the Redux store, which handles updating the application state in a "reducer". In Elixir, our "actions" look like
{:insert, event}
; in JavaScript, it would look something like{action: 'insert', payload: event}
.)
(TODO: include code for basic implementation of a GenServer, see ServerProcess module from book)
ServerProcess
:
defmodule ServerProcess do
def start(cb_module) do
spawn(fn ->
initial_state = cb_module.init()
loop(cb_module, initial_state)
end)
end
def call(pid, request) do
send(pid, {:call, request, self()})
receive do
{:response, response} -> response
end
end
def cast(pid, request) do
send(pid, {:cast, request})
end
defp loop(cb_module, state) do
receive do
{:call, request, caller} ->
{:reply, response, new_state} = cb_module.handle_call(request, caller, state)
send(caller, {:response, response})
loop(cb_module, new_state)
{:cast, request} ->
{:noreply, new_state} = cb_module.handle_cast(request, state)
loop(cb_module, new_state)
end
end
end
Quotes:
There are two types of requests you can send to a GenServer: calls and casts. Calls are synchronous and the server must send a response back to such requests. While the server computes the response, the client is waiting. Casts are asynchronous: the server won’t send a response back and therefore the client won’t wait for one. Both requests are messages sent to the server, and will be handled in sequence. In the above implementation, we pattern-match on the :create messages, to be handled as cast, and on the :lookup messages, to be handled as call.
A GenServer is implemented in two parts: the client API and the server callbacks. You can either combine both parts into a single module or you can separate them into a client module and a server module. The client is any process that invokes the client function. The server is always the process identifier or process name that we will explicitly pass as argument to the client API. Here we’ll use a single module for both the server callbacks and the client API.
[GenServer is] a behaviour module for implementing the server of a client-server relation.
A GenServer is a process like any other Elixir process and it can be used to keep state, execute code asynchronously and so on. The advantage of using a generic server process (GenServer) implemented using this module is that it will have a standard set of interface functions and include functionality for tracing and error reporting. It will also fit into a supervision tree.
A GenServer holds a "state", which is just a piece of data. This data can be a number, like 1
, or a list, like [1, 2, 3]
, or a more complex data structure, like a map: %{foo: "bar", list: [1, 2, 3], flag: true}
, etc.
In this example, our state is a map, with a reference to a timer, and a list to act as a buffer:
state = %{
timer: <timer_reference_id>,
buffer: []
}
When we invoke the call
function on our GenServer, we're effectively doing:
GenServer.call(server, :increment)
# Inside the GenServer.call
send(server, {:call, :increment, self()})
# Inside GenServer.loop
receive do
{:call, request, caller} ->
{:reply, response, new_state} = cb_module.handle_call(request, caller, state)
send(caller, {:response, response})
# etc
end