Skip to content

Commit

Permalink
Console leader election (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Nov 29, 2022
1 parent fb417d9 commit 5942ea2
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 17 deletions.
51 changes: 41 additions & 10 deletions lib/console/deployer.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
defmodule Console.Deployer do
use GenServer
alias Console.Commands.{Plural, Command}
alias Console.Services.{Builds, Users}
alias Console.Services.{Builds, Users, LeaderElection}
alias Console.Schema.Build
alias Console.Plural.Context
require Logger

@poll_interval 10_000
@group :deployer
@leader "deployer"

defmodule State, do: defstruct [:storage, :ref, :pid, :build, :id, :timer, :leader]

Expand All @@ -24,25 +25,45 @@ defmodule Console.Deployer do
Logger.info "Starting deployer"
:pg2.create(@group)
:pg2.join(@group, self())
{:ok, ref} = :timer.send_interval(@poll_interval, :poll)
{:ok, _} = :timer.send_interval(@poll_interval, :poll)
send self(), :init
if Console.conf(:initialize) do
:timer.send_interval(:timer.minutes(2), :sync)
end
{:ok, %State{storage: storage, id: Ecto.UUID.generate(), ref: ref}}
{:ok, %State{storage: storage, id: Ecto.UUID.generate()}}
end

def wake(), do: GenServer.call(__MODULE__, :poll)
def me(), do: {__MODULE__, node()}

def cancel(), do: GenServer.call(__MODULE__, :cancel)
def elect() do
Logger.info "attempting to elect #{me()} as leader"
LeaderElection.elect(me(), @leader)
end

def leader() do
case LeaderElection.get(@leader) do
%{ref: ref} -> ref
_ -> __MODULE__
end
end

def file(path), do: GenServer.call(leader(), {:file, path})

def wake(), do: GenServer.call(leader(), :poll)

def state(), do: GenServer.call(__MODULE__, :state)
def cancel(), do: GenServer.call(leader(), :cancel)

def ping(), do: GenServer.call(__MODULE__, :ping)
def state(), do: GenServer.call(leader(), :state)

def update(repo, content, tool), do: GenServer.call(__MODULE__, {:update, repo, content, tool})
def ping(), do: GenServer.call(leader(), :ping)

def exec(fun), do: GenServer.call(__MODULE__, {:exec, fun})
def update(repo, content, tool), do: GenServer.call(leader(), {:update, repo, content, tool})

def exec(fun), do: GenServer.call(leader(), {:exec, fun})

def handle_call({:file, path}, _, state) do
{:reply, File.read(path), state}
end

def handle_call(:poll, _, %State{} = state) do
send(self(), :poll)
Expand Down Expand Up @@ -73,6 +94,7 @@ defmodule Console.Deployer do
def handle_cast(:sync, %State{ref: nil, storage: storage} = state) do
Logger.info "Resyncing git state"
storage.init()
storage.doctor()
{:noreply, state}
end
def handle_cast(:sync, state), do: {:noreply, state}
Expand All @@ -85,13 +107,17 @@ defmodule Console.Deployer do

def handle_info(:poll, %State{pid: nil, storage: storage, id: id} = state) do
Logger.info "Checking for pending builds, pid: #{inspect(self())}, node: #{node()}"
with {:ok, %Build{} = build} <- Builds.poll(id) do
with {:ok, _} <- elect(),
{:ok, %Build{} = build} <- Builds.poll(id) do
{pid, ref} = perform(storage, build)
{:noreply, %{state | ref: ref, pid: pid, build: build}}
else
{:error, :locked} ->
Logger.info "deployer is locked"
{:noreply, state}
{:error, :following} ->
Logger.info "#{node()} is a follower"
{:noreply, state}
_ ->
Logger.info "No build found"
{:noreply, state}
Expand All @@ -100,6 +126,7 @@ defmodule Console.Deployer do

def handle_info(:poll, %State{pid: pid} = state) when is_pid(pid) do
Logger.info "Build #{inspect(pid)} already running"
elect()
{:noreply, ping(state)}
end

Expand All @@ -115,6 +142,10 @@ defmodule Console.Deployer do

def terminate(state, reason) do
Logger.info "Terminating with state: #{inspect(state)} reason #{inspect(reason)}"
case LeaderElection.clear(me(), @leader) do
{:ok, _} -> Logger.info "removed #{me()} as cluster leader"
_ -> Logger.info "#{me()} is not leader, moving on"
end
:ok
end

Expand Down
9 changes: 5 additions & 4 deletions lib/console/plural/context.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Console.Plural.Context do
import Console
alias Console.Deployer

defstruct [:configuration, :bundles, :smtp]

Expand Down Expand Up @@ -34,10 +35,10 @@ defmodule Console.Plural.Context do
end

def get() do
location()
|> YamlElixir.read_from_file()
|> case do
{:ok, %{"spec" => spec}} -> {:ok, new(spec)}
with {:ok, content} <- Deployer.file(location()),
{:ok, %{"spec" => spec}} <- YamlElixir.read_from_string(content) do
{:ok, new(spec)}
else
_ -> {:error, :not_found}
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/console/schema/leader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Console.Schema.Leader do
use Piazza.Ecto.Schema

schema "leaders" do
field :name, :string
field :ref, Piazza.Ecto.Types.Erlang
field :heartbeat, :utc_datetime_usec

timestamps()
end

def with_lock(query \\ __MODULE__) do
from(l in query, lock: "FOR UPDATE")
end

@valid ~w(name ref heartbeat)a

def changeset(model, attrs \\ %{}) do
model
|> cast(attrs, @valid)
|> unique_constraint(:name)
|> validate_required(@valid)
end
end
61 changes: 61 additions & 0 deletions lib/console/services/leader_election.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Console.Services.LeaderElection do
use Console.Services.Base
alias Console.Schema.Leader

@type error :: {:error, term}
@type leader_resp :: {:ok, Leader.t} | error

@spec get(binary) :: Leader.t | nil
def get(name), do: Console.Repo.get_by(Leader, name: name)

@spec atomic_get(binary) :: Leader.t | nil
def atomic_get(name), do: Console.Repo.get_by(Leader.with_lock(), name: name)

@doc """
Wipes the leader record if `ref` owns `name`, otherwise fails
"""
@spec clear(term, binary) :: leader_resp
def clear(ref, name) do
start_transaction()
|> add_operation(:fetch, fn _ ->
case atomic_get(name) do
%Leader{ref: ^ref} = l -> {:ok, l}
_ -> {:error, :following}
end
end)
|> add_operation(:update, fn %{fetch: l} -> Console.Repo.delete(l) end)
|> execute(extract: :update)
end

@doc """
Locks the record for `name` then if either `ref` currently owns it or it does not exist, upserts
the record with a current heartbeat.
If `ref` does not own the record, it fails
"""
@spec elect(term, binary) :: leader_resp
def elect(ref, name) do
start_transaction()
|> add_operation(:fetch, fn _ ->
case atomic_get(name) do
%Leader{ref: ^ref} = leader -> {:ok, leader}
%Leader{} = leader -> check_hearbeat(leader)
nil -> {:ok, %Leader{name: name}}
end
end)
|> add_operation(:update, fn %{fetch: fetch} ->
fetch
|> Leader.changeset(%{ref: ref, heartbeat: Timex.now()})
|> Console.Repo.insert_or_update()
end)
|> execute(extract: :update)
end

defp check_hearbeat(%Leader{heartbeat: beat} = leader) do
expired = Timex.now() |> Timex.shift(seconds: -30)
case Timex.before?(beat, expired) do
true -> {:ok, leader}
false -> {:error, :following}
end
end
end
5 changes: 3 additions & 2 deletions lib/console/services/plural.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Console.Services.Plural do
alias Console.Deployer
alias Console.Schema.{User, Manifest}
alias Console.Services.{Builds}
alias Console.Plural.{Repositories, Users, Recipe, Installation, OIDCProvider, Manifest, Context}
Expand All @@ -21,12 +22,12 @@ defmodule Console.Services.Plural do

def terraform_file(repository) do
terraform_filename(repository)
|> File.read()
|> Deployer.file()
end

def values_file(repository) do
vals_filename(repository)
|> File.read()
|> Deployer.file()
end

def update_configuration(repository, update, tool) do
Expand Down
1 change: 1 addition & 0 deletions lib/console/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Console.Storage do

@type result :: :ok | {:error, term}

@callback doctor() :: result

@callback init() :: result

Expand Down
14 changes: 14 additions & 0 deletions lib/console/storage/git.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ defmodule Console.Storage.Git do
do: git("clean", ["-f"])
end

def doctor() do
with {:ok, _} <- check_detached(),
do: pull()
end

defp check_detached() do
case git("rev-parse", ["--symbolic-full-name", "HEAD"]) do
{:ok, "HEAD" <> _} ->
git("stash")
git("checkout", [branch()])
_ -> {:ok, ""}
end
end

def git(cmd, args \\ []),
do: cmd("git", [cmd | args], workspace())

Expand Down
16 changes: 16 additions & 0 deletions priv/repo/migrations/20221128153822_add_leaders.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Console.Repo.Migrations.AddLeaders do
use Ecto.Migration

def change do
create table(:leaders, primary_key: false) do
add :id, :uuid, primary_key: true
add :heartbeat, :utc_datetime_usec
add :ref, :binary
add :name, :string

timestamps()
end

create unique_index(:leaders, [:name])
end
end
2 changes: 1 addition & 1 deletion test/console/plural/context_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Console.Plural.ContextTest do
use ExUnit.Case
use Console.DataCase, async: false
alias Console.Plural.Context

describe "#merge/2" do
Expand Down
59 changes: 59 additions & 0 deletions test/console/services/leader_election_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Console.Services.LeaderElectionTest do
use Console.DataCase, async: true
alias Console.Services.LeaderElection

describe "elect/2" do
test "if no leader exists it will elect" do
{:ok, leader} = LeaderElection.elect(self(), "usa")

assert leader.name == "usa"
assert leader.ref == self()
assert leader.heartbeat
end

test "if another leader exists, it will fail" do
insert(:leader, ref: :else, name: "usa")

{:error, _} = LeaderElection.elect(self(), "usa")
end

test "if a stale leader exists, it will take ownership" do
insert(:leader, ref: :else, name: "usa", heartbeat: Timex.now() |> Timex.shift(minutes: -1))

{:ok, leader} = LeaderElection.elect(self(), "usa")

assert leader.name == "usa"
assert leader.ref == self()
assert leader.heartbeat
end

test "if you are leader, the hearbeat will be updated" do
old = insert(:leader, name: "usa", heartbeat: Timex.now() |> Timex.shift(seconds: -10))

{:ok, leader} = LeaderElection.elect(self(), "usa")

assert leader.name == "usa"
assert leader.ref == self()
assert Timex.after?(leader.heartbeat, old.heartbeat)
end
end

describe "#clear/2" do
test "if you're leader, it will clear" do
leader = insert(:leader, name: "usa")

{:ok, del} = LeaderElection.clear(self(), "usa")

assert del.id == leader.id
refute refetch(del)
end

test "if you aren't leader, it will ignore" do
leader = insert(:leader, ref: :other, name: "usa")

{:error, _} = LeaderElection.clear(self(), "usa")

assert refetch(leader)
end
end
end
8 changes: 8 additions & 0 deletions test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,12 @@ defmodule Console.Factory do
seen_at: Timex.now()
}
end

def leader_factory do
%Schema.Leader{
name: sequence(:leader, & "leader-#{&1}"),
ref: self(),
heartbeat: Timex.now()
}
end
end

0 comments on commit 5942ea2

Please sign in to comment.