Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: auto discovery of members with zeroconf #135

Merged
merged 13 commits into from
Jan 1, 2024
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ large number of concurrent users and complex interactions.

## Features

- guaranteed message delivery on actor failure (buffer mechanism)
- fire & forget or request & response messaging, or both.
- Guaranteed message delivery on actor failure (buffer mechanism)
- Fire & forget or request & response messaging, or both
- High performance dRPC as the transport layer
- Optimized proto buffers without reflection
- lightweight and highly customizable
- cluster support [wip]
- Lightweight and highly customizable
- Cluster support with DNS auto discovery for nodes that are on the same network

# Benchmarks

Expand Down
4 changes: 2 additions & 2 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Context struct {
engine *Engine
receiver Receiver
message any
// the context of the parent, if this is the context of a child.
// we need this so we can remove the child from the parent Context
// the context of the parent if we are a child.
// we need this parentCtx, so we can remove the child from the parent Context
// when the child dies.
parentCtx *Context
children *safemap.SafeMap[string, *PID]
Expand Down
19 changes: 16 additions & 3 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ import (
"github.com/stretchr/testify/require"
)

func TestChildEventNoRaceCondition(t *testing.T) {
e, err := NewEngine(nil)
assert.Nil(t, err)

parentPID := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
child := c.SpawnChildFunc(func(childctx *Context) {
}, "child")
c.engine.Subscribe(child)
}
}, "parent")
e.Poison(parentPID).Wait()
}

func TestContextSendRepeat(t *testing.T) {
var (
wg = &sync.WaitGroup{}
Expand Down Expand Up @@ -145,9 +160,7 @@ func TestSpawnChild(t *testing.T) {
}, "parent", WithMaxRestarts(0))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()

assert.Nil(t, e.Registry.get(NewPID("local", "child")))
assert.Nil(t, e.Registry.get(pid))
Expand Down
3 changes: 2 additions & 1 deletion actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Engine struct {
eventStream *PID
}

// EngineConfig holds the configuration of the engine.
type EngineConfig struct {
Remote Remoter
}
Expand Down Expand Up @@ -71,6 +72,7 @@ func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID {
return e.SpawnProc(proc)
}

// SpawnFunc spawns the given function as a stateless receiver/actor.
func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID {
return e.Spawn(newFuncReceiver(f), kind, opts...)
}
Expand Down Expand Up @@ -193,7 +195,6 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea

// Stop will send a non-graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down immediately, once it has processed the poisonPill messsage.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, false, wg...)
}
Expand Down
18 changes: 5 additions & 13 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ func TestStopWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Stop(pid, pwg)
pwg.Wait()
e.Stop(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -258,9 +256,7 @@ func TestStop(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Stop(pid, stopwg)
stopwg.Wait()
e.Stop(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get nil when looking it up in the registry.
assert.Nil(t, e.Registry.get(pid))
Expand All @@ -286,9 +282,7 @@ func TestPoisonWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Poison(pid, pwg)
pwg.Wait()
e.Poison(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -310,9 +304,7 @@ func TestPoison(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get NIL when we try to get it.
assert.Nil(t, e.Registry.get(pid))
Expand Down Expand Up @@ -379,7 +371,7 @@ func TestPoisonPillPrivate(t *testing.T) {
}
}

// 56 ns/op
// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF
func BenchmarkSendMessageLocal(b *testing.B) {
e, err := NewEngine(nil)
require.NoError(b, err)
Expand Down
4 changes: 2 additions & 2 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ActorStartedEvent struct {
}

func (e ActorStartedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor started", []any{"pid", e.PID}
return slog.LevelDebug, "Actor started", []any{"pid", e.PID}
}

// ActorInitializedEvent is broadcasted over the eventStream before an actor
Expand All @@ -45,7 +45,7 @@ type ActorStoppedEvent struct {
}

func (e ActorStoppedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID}
return slog.LevelDebug, "Actor stopped", []any{"pid", e.PID}
}

// ActorRestartedEvent is broadcasted when an actor crashes and gets restarted
Expand Down
26 changes: 12 additions & 14 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type Processer interface {
Shutdown(*sync.WaitGroup)
}

const (
procStateRunning int32 = iota
procStateStopped
)

type process struct {
Opts

Expand Down Expand Up @@ -178,29 +183,22 @@ func (p *process) tryRestart(v any) {
}

func (p *process) cleanup(wg *sync.WaitGroup) {
p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

// We are a child if the parent context is not nil
// No need for a mutex here, cause this is getting called inside the
// the parents children foreach loop, which already locks.
if p.context.parentCtx != nil {
p.context.parentCtx.children.Delete(p.Kind)
}

// We are a parent if we have children running, shutdown all the children.
if p.context.children.Len() > 0 {
children := p.context.Children()
for _, pid := range children {
if wg != nil {
wg.Add(1)
}
proc := p.context.engine.Registry.get(pid)
proc.Shutdown(wg)
p.context.engine.Poison(pid).Wait()
}
}

p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()})
if wg != nil {
wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions cluster/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ActivationDetails struct {

type defaultActivationStrategy struct{}

// DefaultActivationStrategy selects a random member in the cluster.
func DefaultActivationStrategy() defaultActivationStrategy {
// NewDefaultActivationStrategy selects a random member in the cluster.
func NewDefaultActivationStrategy() defaultActivationStrategy {
return defaultActivationStrategy{}
}

Expand Down
37 changes: 16 additions & 21 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ import (
"golang.org/x/exp/maps"
)

type getActive struct {
id string
}

type getMembers struct{}

type getKinds struct{}

type activate struct {
kind string
id string
region string
}

type deactivate struct {
pid *actor.PID
}
type (
activate struct {
kind string
id string
region string
}
getMembers struct{}
getKinds struct{}
deactivate struct{ pid *actor.PID }
getActive struct{ id string }
)

type Agent struct {
members *MemberSet
Expand Down Expand Up @@ -59,6 +53,7 @@ func NewAgent(c *Cluster) actor.Producer {
func (a *Agent) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
case actor.Stopped:
case *ActorTopology:
a.handleActorTopology(msg)
case *Members:
Expand Down Expand Up @@ -129,7 +124,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
slog.Warn("could not find any members with kind", "kind", kind)
return nil
}
owner := a.cluster.activationStrategy.ActivateOnMember(ActivationDetails{
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{
Members: members,
Region: region,
Kind: kind,
Expand All @@ -149,7 +144,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
// Remote activation

// TODO: topology hash
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.requestTimeout).Result()
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result()
if err != nil {
slog.Error("failed activation request", "err", err)
return nil
Expand Down Expand Up @@ -213,7 +208,7 @@ func (a *Agent) memberJoin(member *Member) {
Member: member,
})

slog.Debug("member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region)
slog.Info("[CLUSTER] member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region)
}

func (a *Agent) memberLeave(member *Member) {
Expand All @@ -229,7 +224,7 @@ func (a *Agent) memberLeave(member *Member) {

a.cluster.engine.BroadcastEvent(MemberLeaveEvent{Member: member})

slog.Debug("member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds)
slog.Info("[CLUSTER] member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds)
}

func (a *Agent) bcast(msg any) {
Expand Down
Loading