Skip to content

Commit

Permalink
wip: auto discovery of members with zeroconf (#135)
Browse files Browse the repository at this point in the history
* wip: auto discovery of members with zeroconf

* Fixed cluster cleanup

* bench before fixing race condition

* minor doc typos and added test for testing race condition when child is subscribed to event stream.

* wip: fixing racecon

* fixed race condition on childs subscribing to the event stream

* auto discover, but also with bootstrap if needed

* new config construction for cluster

* fixed examples

* updated README

* Added some more documentation to the cluster configuration

* more docs and changed activation config
  • Loading branch information
anthdm authored Jan 1, 2024
1 parent 699addd commit 220df8b
Show file tree
Hide file tree
Showing 18 changed files with 854 additions and 402 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ large number of concurrent users and complex interactions.

## Features

- guaranteed message delivery on actor failure (buffer mechanism)
- fire & forget or request & response messaging, or both.
- Guaranteed message delivery on actor failure (buffer mechanism)
- Fire & forget or request & response messaging, or both
- High performance dRPC as the transport layer
- Optimized proto buffers without reflection
- lightweight and highly customizable
- cluster support [wip]
- Lightweight and highly customizable
- Cluster support with DNS auto discovery for nodes that are on the same network

# Benchmarks

Expand Down
4 changes: 2 additions & 2 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Context struct {
engine *Engine
receiver Receiver
message any
// the context of the parent, if this is the context of a child.
// we need this so we can remove the child from the parent Context
// the context of the parent if we are a child.
// we need this parentCtx, so we can remove the child from the parent Context
// when the child dies.
parentCtx *Context
children *safemap.SafeMap[string, *PID]
Expand Down
19 changes: 16 additions & 3 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ import (
"github.com/stretchr/testify/require"
)

func TestChildEventNoRaceCondition(t *testing.T) {
e, err := NewEngine(nil)
assert.Nil(t, err)

parentPID := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
child := c.SpawnChildFunc(func(childctx *Context) {
}, "child")
c.engine.Subscribe(child)
}
}, "parent")
e.Poison(parentPID).Wait()
}

func TestContextSendRepeat(t *testing.T) {
var (
wg = &sync.WaitGroup{}
Expand Down Expand Up @@ -145,9 +160,7 @@ func TestSpawnChild(t *testing.T) {
}, "parent", WithMaxRestarts(0))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()

assert.Nil(t, e.Registry.get(NewPID("local", "child")))
assert.Nil(t, e.Registry.get(pid))
Expand Down
3 changes: 2 additions & 1 deletion actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Engine struct {
eventStream *PID
}

// EngineConfig holds the configuration of the engine.
type EngineConfig struct {
Remote Remoter
}
Expand Down Expand Up @@ -71,6 +72,7 @@ func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID {
return e.SpawnProc(proc)
}

// SpawnFunc spawns the given function as a stateless receiver/actor.
func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID {
return e.Spawn(newFuncReceiver(f), kind, opts...)
}
Expand Down Expand Up @@ -193,7 +195,6 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea

// Stop will send a non-graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down immediately, once it has processed the poisonPill messsage.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, false, wg...)
}
Expand Down
18 changes: 5 additions & 13 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ func TestStopWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Stop(pid, pwg)
pwg.Wait()
e.Stop(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -258,9 +256,7 @@ func TestStop(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Stop(pid, stopwg)
stopwg.Wait()
e.Stop(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get nil when looking it up in the registry.
assert.Nil(t, e.Registry.get(pid))
Expand All @@ -286,9 +282,7 @@ func TestPoisonWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Poison(pid, pwg)
pwg.Wait()
e.Poison(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -310,9 +304,7 @@ func TestPoison(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get NIL when we try to get it.
assert.Nil(t, e.Registry.get(pid))
Expand Down Expand Up @@ -379,7 +371,7 @@ func TestPoisonPillPrivate(t *testing.T) {
}
}

// 56 ns/op
// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF
func BenchmarkSendMessageLocal(b *testing.B) {
e, err := NewEngine(nil)
require.NoError(b, err)
Expand Down
4 changes: 2 additions & 2 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ActorStartedEvent struct {
}

func (e ActorStartedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor started", []any{"pid", e.PID}
return slog.LevelDebug, "Actor started", []any{"pid", e.PID}
}

// ActorInitializedEvent is broadcasted over the eventStream before an actor
Expand All @@ -45,7 +45,7 @@ type ActorStoppedEvent struct {
}

func (e ActorStoppedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID}
return slog.LevelDebug, "Actor stopped", []any{"pid", e.PID}
}

// ActorRestartedEvent is broadcasted when an actor crashes and gets restarted
Expand Down
26 changes: 12 additions & 14 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type Processer interface {
Shutdown(*sync.WaitGroup)
}

const (
procStateRunning int32 = iota
procStateStopped
)

type process struct {
Opts

Expand Down Expand Up @@ -178,29 +183,22 @@ func (p *process) tryRestart(v any) {
}

func (p *process) cleanup(wg *sync.WaitGroup) {
p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

// We are a child if the parent context is not nil
// No need for a mutex here, cause this is getting called inside the
// the parents children foreach loop, which already locks.
if p.context.parentCtx != nil {
p.context.parentCtx.children.Delete(p.Kind)
}

// We are a parent if we have children running, shutdown all the children.
if p.context.children.Len() > 0 {
children := p.context.Children()
for _, pid := range children {
if wg != nil {
wg.Add(1)
}
proc := p.context.engine.Registry.get(pid)
proc.Shutdown(wg)
p.context.engine.Poison(pid).Wait()
}
}

p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()})
if wg != nil {
wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions cluster/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ActivationDetails struct {

type defaultActivationStrategy struct{}

// DefaultActivationStrategy selects a random member in the cluster.
func DefaultActivationStrategy() defaultActivationStrategy {
// NewDefaultActivationStrategy selects a random member in the cluster.
func NewDefaultActivationStrategy() defaultActivationStrategy {
return defaultActivationStrategy{}
}

Expand Down
37 changes: 16 additions & 21 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ import (
"golang.org/x/exp/maps"
)

type getActive struct {
id string
}

type getMembers struct{}

type getKinds struct{}

type activate struct {
kind string
id string
region string
}

type deactivate struct {
pid *actor.PID
}
type (
activate struct {
kind string
id string
region string
}
getMembers struct{}
getKinds struct{}
deactivate struct{ pid *actor.PID }
getActive struct{ id string }
)

type Agent struct {
members *MemberSet
Expand Down Expand Up @@ -59,6 +53,7 @@ func NewAgent(c *Cluster) actor.Producer {
func (a *Agent) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
case actor.Stopped:
case *ActorTopology:
a.handleActorTopology(msg)
case *Members:
Expand Down Expand Up @@ -129,7 +124,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
slog.Warn("could not find any members with kind", "kind", kind)
return nil
}
owner := a.cluster.activationStrategy.ActivateOnMember(ActivationDetails{
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{
Members: members,
Region: region,
Kind: kind,
Expand All @@ -149,7 +144,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
// Remote activation

// TODO: topology hash
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.requestTimeout).Result()
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result()
if err != nil {
slog.Error("failed activation request", "err", err)
return nil
Expand Down Expand Up @@ -213,7 +208,7 @@ func (a *Agent) memberJoin(member *Member) {
Member: member,
})

slog.Debug("member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region)
slog.Info("[CLUSTER] member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region)
}

func (a *Agent) memberLeave(member *Member) {
Expand All @@ -229,7 +224,7 @@ func (a *Agent) memberLeave(member *Member) {

a.cluster.engine.BroadcastEvent(MemberLeaveEvent{Member: member})

slog.Debug("member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds)
slog.Info("[CLUSTER] member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds)
}

func (a *Agent) bcast(msg any) {
Expand Down
Loading

0 comments on commit 220df8b

Please sign in to comment.