From fae6c0fe7237f1d59ec8ea648e9d376027b87503 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Mon, 1 Jan 2024 11:53:55 +0100 Subject: [PATCH 1/4] cleans up the stack trace, (#138) * cleans up the stack trace, removing the lines from hollywood itself. the stack trace is still printed, but on a single, long line. * test the stack trace cleanup. * make the test fail faster rather than just hanging. --- actor/engine_test.go | 2 +- actor/process.go | 26 +++++++++++++++++++++-- actor/process_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 3 +++ 5 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 actor/process_test.go diff --git a/actor/engine_test.go b/actor/engine_test.go index 8f0e327..a23741a 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -134,7 +134,7 @@ func TestRestarts(t *testing.T) { if msg.data != 10 { panic("I failed to process this message") } else { - fmt.Println("finally processed all my messsages after borking.", msg.data) + fmt.Println("finally processed all my messages after borking", msg.data) wg.Done() } } diff --git a/actor/process.go b/actor/process.go index de08d6a..8c91977 100644 --- a/actor/process.go +++ b/actor/process.go @@ -1,7 +1,9 @@ package actor import ( + "bytes" "fmt" + "github.com/DataDog/gostackparse" "log/slog" "runtime/debug" "sync" @@ -150,8 +152,7 @@ func (p *process) tryRestart(v any) { p.Start() return } - stackTrace := debug.Stack() - fmt.Println(string(stackTrace)) + stackTrace := cleanTrace(debug.Stack()) // If we reach the max restarts, we shutdown the inbox and clean // everything up. if p.restarts == p.MaxRestarts { @@ -211,3 +212,24 @@ func (p *process) Send(_ *PID, msg any, sender *PID) { p.inbox.Send(Envelope{Msg: msg, Sender: sender}) } func (p *process) Shutdown(wg *sync.WaitGroup) { p.cleanup(wg) } + +func cleanTrace(stack []byte) []byte { + goros, err := gostackparse.Parse(bytes.NewReader(stack)) + if err != nil { + slog.Error("failed to parse stacktrace", "err", err) + return stack + } + if len(goros) != 1 { + slog.Error("expected only one goroutine", "goroutines", len(goros)) + return stack + } + // skip the first frames: + goros[0].Stack = goros[0].Stack[4:] + buf := bytes.NewBuffer(nil) + _, _ = fmt.Fprintf(buf, "goroutine %d [%s]\n", goros[0].ID, goros[0].State) + for _, frame := range goros[0].Stack { + _, _ = fmt.Fprintf(buf, "%s\n", frame.Func) + _, _ = fmt.Fprint(buf, "\t", frame.File, ":", frame.Line, "\n") + } + return buf.Bytes() +} diff --git a/actor/process_test.go b/actor/process_test.go new file mode 100644 index 0000000..b0e7dc9 --- /dev/null +++ b/actor/process_test.go @@ -0,0 +1,49 @@ +package actor + +import ( + "bytes" + "fmt" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function +// which triggers the panic is at the top of the stack trace. +func Test_CleanTrace(t *testing.T) { + e, err := NewEngine(nil) + require.NoError(t, err) + type triggerPanic struct { + data int + } + stopCh := make(chan struct{}) + pid := e.SpawnFunc(func(c *Context) { + fmt.Printf("Got message type %T\n", c.Message()) + switch c.Message().(type) { + case Started: + c.Engine().Subscribe(c.pid) + case triggerPanic: + panicWrapper() + case ActorRestartedEvent: + m := c.Message().(ActorRestartedEvent) + // split the panic into lines: + lines := bytes.Split(m.Stacktrace, []byte("\n")) + // check that the second line is the panicWrapper function: + if bytes.Contains(lines[1], []byte("panicWrapper")) { + fmt.Println("stack trace contains panicWrapper at the right line") + stopCh <- struct{}{} + } + } + }, "foo", WithMaxRestarts(1)) + e.Send(pid, triggerPanic{1}) + select { + case <-stopCh: + fmt.Println("test passed") + case <-time.After(time.Second): + t.Error("test timed out. stack trace likely did not contain panicWrapper at the right line") + } +} + +func panicWrapper() { + panic("foo") +} diff --git a/go.mod b/go.mod index f27f6e5..3ebebd4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/anthdm/hollywood go 1.21 require ( + github.com/DataDog/gostackparse v0.7.0 github.com/grandcat/zeroconf v1.0.0 github.com/planetscale/vtprotobuf v0.4.0 github.com/prometheus/client_golang v1.15.0 diff --git a/go.sum b/go.sum index d653c10..fe99f3e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/DataDog/gostackparse v0.7.0 h1:i7dLkXHvYzHV308hnkvVGDL3BR4FWl7IsXNPz/IGQh4= +github.com/DataDog/gostackparse v0.7.0/go.mod h1:lTfqcJKqS9KnXQGnyQMCugq3u1FP6UZMfWR0aitKFMM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= @@ -49,6 +51,7 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= From 699addd77dd91e53731bd8fa044c22a97684b28f Mon Sep 17 00:00:00 2001 From: AR1011 <77502086+AR1011@users.noreply.github.com> Date: Mon, 1 Jan 2024 10:54:23 +0000 Subject: [PATCH 2/4] Make Request timeout configurable (#141) * increase request timeout + fix cluster example build output names in the makefile * make timeout configurable * fix --- Makefile | 4 ++-- cluster/agent.go | 2 +- cluster/cluster.go | 20 ++++++++++++-------- examples/cluster/member_1/main.go | 2 ++ examples/cluster/member_2/main.go | 2 ++ 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 4f49733..776da60 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ build: go build -o bin/metrics examples/metrics/main.go go build -o bin/chatserver examples/chat/server/main.go go build -o bin/chatclient examples/chat/client/main.go - go build -o bin/cluster examples/cluster/member_1/main.go - go build -o bin/cluster examples/cluster/member_2/main.go + go build -o bin/cluster_member_1 examples/cluster/member_1/main.go + go build -o bin/cluster_member_2 examples/cluster/member_2/main.go bench: go run ./_bench/. diff --git a/cluster/agent.go b/cluster/agent.go index 5d21980..9daac75 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -149,7 +149,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID { // Remote activation // TODO: topology hash - resp, err := a.cluster.engine.Request(activatorPID, req, requestTimeout).Result() + resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.requestTimeout).Result() if err != nil { slog.Error("failed activation request", "err", err) return nil diff --git a/cluster/cluster.go b/cluster/cluster.go index 0f169ea..dbaa5e5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -10,8 +10,6 @@ import ( "github.com/google/uuid" ) -var requestTimeout = time.Millisecond * 50 - // Producer is a function that can produce an actor.Producer. // Pretty simple, but yet powerfull tool to construct receivers // depending on Cluster. @@ -27,11 +25,13 @@ type Config struct { ActivationStrategy ActivationStrategy Engine *actor.Engine ClusterProvider Producer + RequestTimeout time.Duration } type Cluster struct { - id string - region string + id string + region string + requestTimeout time.Duration provider Producer engine *actor.Engine @@ -61,6 +61,9 @@ func New(cfg Config) (*Cluster, error) { if len(cfg.Region) == 0 { cfg.Region = "default" } + if cfg.RequestTimeout == 0 { + cfg.RequestTimeout = time.Second + } return &Cluster{ id: cfg.ID, region: cfg.Region, @@ -68,6 +71,7 @@ func New(cfg Config) (*Cluster, error) { engine: cfg.Engine, kinds: []kind{}, activationStrategy: cfg.ActivationStrategy, + requestTimeout: cfg.RequestTimeout, }, nil } @@ -109,7 +113,7 @@ func (c *Cluster) Activate(kind string, config *ActivationConfig) *actor.PID { id: config.ID, region: config.Region, } - resp, err := c.engine.Request(c.agentPID, msg, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, msg, c.requestTimeout).Result() if err != nil { slog.Error("activation failed", "err", err) return nil @@ -155,7 +159,7 @@ func (c *Cluster) HasKindLocal(name string) bool { // Members returns all the members that are part of the cluster. func (c *Cluster) Members() []*Member { - resp, err := c.engine.Request(c.agentPID, getMembers{}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getMembers{}, c.requestTimeout).Result() if err != nil { return []*Member{} } @@ -168,7 +172,7 @@ func (c *Cluster) Members() []*Member { // HasKind returns true whether the given kind is available for activation on // the cluster. func (c *Cluster) HasKind(name string) bool { - resp, err := c.engine.Request(c.agentPID, getKinds{}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getKinds{}, c.requestTimeout).Result() if err != nil { return false } @@ -183,7 +187,7 @@ func (c *Cluster) HasKind(name string) bool { } func (c *Cluster) GetActivated(id string) *actor.PID { - resp, err := c.engine.Request(c.agentPID, getActive{id: id}, requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.requestTimeout).Result() if err != nil { return nil } diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index 19fdc26..cc29fb3 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "time" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" @@ -22,6 +23,7 @@ func main() { Region: "eu-west", ClusterProvider: cluster.NewSelfManagedProvider(), ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), + RequestTimeout: time.Second, }) if err != nil { log.Fatal(err) diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index ebb3713..91609a5 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -2,6 +2,7 @@ package main import ( "log" + "time" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" @@ -26,6 +27,7 @@ func main() { Region: "us-west", ClusterProvider: cluster.NewSelfManagedProvider(bootstrapAddr), ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), + RequestTimeout: time.Second, }) if err != nil { log.Fatal(err) From 220df8b137e4f42e7044d14b3664af3cf0444629 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Mon, 1 Jan 2024 19:33:08 +0100 Subject: [PATCH 3/4] wip: auto discovery of members with zeroconf (#135) * wip: auto discovery of members with zeroconf * Fixed cluster cleanup * bench before fixing race condition * minor doc typos and added test for testing race condition when child is subscribed to event stream. * wip: fixing racecon * fixed race condition on childs subscribing to the event stream * auto discover, but also with bootstrap if needed * new config construction for cluster * fixed examples * updated README * Added some more documentation to the cluster configuration * more docs and changed activation config --- README.md | 8 +- actor/context.go | 4 +- actor/context_test.go | 19 ++- actor/engine.go | 3 +- actor/engine_test.go | 18 +-- actor/event.go | 4 +- actor/process.go | 26 ++- cluster/activator.go | 4 +- cluster/agent.go | 37 ++--- cluster/cluster.go | 247 ++++++++++++++++++++--------- cluster/cluster.pb.go | 253 +++++++++++++++++++----------- cluster/cluster.proto | 5 + cluster/cluster_test.go | 122 +++++++------- cluster/cluster_vtproto.pb.go | 224 ++++++++++++++++++++++++++ cluster/selfmanaged.go | 224 ++++++++++++++++++-------- examples/cluster/member_1/main.go | 31 ++-- examples/cluster/member_2/main.go | 25 +-- remote/remote.go | 2 - 18 files changed, 854 insertions(+), 402 deletions(-) diff --git a/README.md b/README.md index e6caac8..83b4b22 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,12 @@ large number of concurrent users and complex interactions. ## Features -- guaranteed message delivery on actor failure (buffer mechanism) -- fire & forget or request & response messaging, or both. +- Guaranteed message delivery on actor failure (buffer mechanism) +- Fire & forget or request & response messaging, or both - High performance dRPC as the transport layer - Optimized proto buffers without reflection -- lightweight and highly customizable -- cluster support [wip] +- Lightweight and highly customizable +- Cluster support with DNS auto discovery for nodes that are on the same network # Benchmarks diff --git a/actor/context.go b/actor/context.go index aa314cd..04d1073 100644 --- a/actor/context.go +++ b/actor/context.go @@ -16,8 +16,8 @@ type Context struct { engine *Engine receiver Receiver message any - // the context of the parent, if this is the context of a child. - // we need this so we can remove the child from the parent Context + // the context of the parent if we are a child. + // we need this parentCtx, so we can remove the child from the parent Context // when the child dies. parentCtx *Context children *safemap.SafeMap[string, *PID] diff --git a/actor/context_test.go b/actor/context_test.go index 44e2bd9..963678e 100644 --- a/actor/context_test.go +++ b/actor/context_test.go @@ -9,6 +9,21 @@ import ( "github.com/stretchr/testify/require" ) +func TestChildEventNoRaceCondition(t *testing.T) { + e, err := NewEngine(nil) + assert.Nil(t, err) + + parentPID := e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Started: + child := c.SpawnChildFunc(func(childctx *Context) { + }, "child") + c.engine.Subscribe(child) + } + }, "parent") + e.Poison(parentPID).Wait() +} + func TestContextSendRepeat(t *testing.T) { var ( wg = &sync.WaitGroup{} @@ -145,9 +160,7 @@ func TestSpawnChild(t *testing.T) { }, "parent", WithMaxRestarts(0)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Poison(pid, stopwg) - stopwg.Wait() + e.Poison(pid).Wait() assert.Nil(t, e.Registry.get(NewPID("local", "child"))) assert.Nil(t, e.Registry.get(pid)) diff --git a/actor/engine.go b/actor/engine.go index 125a4ac..259442b 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -32,6 +32,7 @@ type Engine struct { eventStream *PID } +// EngineConfig holds the configuration of the engine. type EngineConfig struct { Remote Remoter } @@ -71,6 +72,7 @@ func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID { return e.SpawnProc(proc) } +// SpawnFunc spawns the given function as a stateless receiver/actor. func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID { return e.Spawn(newFuncReceiver(f), kind, opts...) } @@ -193,7 +195,6 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea // Stop will send a non-graceful poisonPill message to the process that is associated with the given PID. // The process will shut down immediately, once it has processed the poisonPill messsage. -// If given a WaitGroup, it blocks till the process is completely shutdown. func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup { return e.sendPoisonPill(pid, false, wg...) } diff --git a/actor/engine_test.go b/actor/engine_test.go index a23741a..83b0aa1 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -234,9 +234,7 @@ func TestStopWaitGroup(t *testing.T) { }, "foo") wg.Wait() - pwg := &sync.WaitGroup{} - e.Stop(pid, pwg) - pwg.Wait() + e.Stop(pid).Wait() assert.Equal(t, int32(1), atomic.LoadInt32(&x)) } @@ -258,9 +256,7 @@ func TestStop(t *testing.T) { }, "foo", WithID(tag)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Stop(pid, stopwg) - stopwg.Wait() + e.Stop(pid).Wait() // When a process is poisoned it should be removed from the registry. // Hence, we should get nil when looking it up in the registry. assert.Nil(t, e.Registry.get(pid)) @@ -286,9 +282,7 @@ func TestPoisonWaitGroup(t *testing.T) { }, "foo") wg.Wait() - pwg := &sync.WaitGroup{} - e.Poison(pid, pwg) - pwg.Wait() + e.Poison(pid).Wait() assert.Equal(t, int32(1), atomic.LoadInt32(&x)) } @@ -310,9 +304,7 @@ func TestPoison(t *testing.T) { }, "foo", WithID(tag)) wg.Wait() - stopwg := &sync.WaitGroup{} - e.Poison(pid, stopwg) - stopwg.Wait() + e.Poison(pid).Wait() // When a process is poisoned it should be removed from the registry. // Hence, we should get NIL when we try to get it. assert.Nil(t, e.Registry.get(pid)) @@ -379,7 +371,7 @@ func TestPoisonPillPrivate(t *testing.T) { } } -// 56 ns/op +// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF func BenchmarkSendMessageLocal(b *testing.B) { e, err := NewEngine(nil) require.NoError(b, err) diff --git a/actor/event.go b/actor/event.go index 3162790..fb2bcf0 100644 --- a/actor/event.go +++ b/actor/event.go @@ -23,7 +23,7 @@ type ActorStartedEvent struct { } func (e ActorStartedEvent) Log() (slog.Level, string, []any) { - return slog.LevelInfo, "Actor started", []any{"pid", e.PID} + return slog.LevelDebug, "Actor started", []any{"pid", e.PID} } // ActorInitializedEvent is broadcasted over the eventStream before an actor @@ -45,7 +45,7 @@ type ActorStoppedEvent struct { } func (e ActorStoppedEvent) Log() (slog.Level, string, []any) { - return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID} + return slog.LevelDebug, "Actor stopped", []any{"pid", e.PID} } // ActorRestartedEvent is broadcasted when an actor crashes and gets restarted diff --git a/actor/process.go b/actor/process.go index 8c91977..853eee5 100644 --- a/actor/process.go +++ b/actor/process.go @@ -24,6 +24,11 @@ type Processer interface { Shutdown(*sync.WaitGroup) } +const ( + procStateRunning int32 = iota + procStateStopped +) + type process struct { Opts @@ -178,29 +183,22 @@ func (p *process) tryRestart(v any) { } func (p *process) cleanup(wg *sync.WaitGroup) { - p.inbox.Stop() - p.context.engine.Registry.Remove(p.pid) - p.context.message = Stopped{} - applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context) - - // We are a child if the parent context is not nil - // No need for a mutex here, cause this is getting called inside the - // the parents children foreach loop, which already locks. if p.context.parentCtx != nil { p.context.parentCtx.children.Delete(p.Kind) } - // We are a parent if we have children running, shutdown all the children. if p.context.children.Len() > 0 { children := p.context.Children() for _, pid := range children { - if wg != nil { - wg.Add(1) - } - proc := p.context.engine.Registry.get(pid) - proc.Shutdown(wg) + p.context.engine.Poison(pid).Wait() } } + + p.inbox.Stop() + p.context.engine.Registry.Remove(p.pid) + p.context.message = Stopped{} + applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context) + p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()}) if wg != nil { wg.Done() diff --git a/cluster/activator.go b/cluster/activator.go index 9499e4b..0725afe 100644 --- a/cluster/activator.go +++ b/cluster/activator.go @@ -25,8 +25,8 @@ type ActivationDetails struct { type defaultActivationStrategy struct{} -// DefaultActivationStrategy selects a random member in the cluster. -func DefaultActivationStrategy() defaultActivationStrategy { +// NewDefaultActivationStrategy selects a random member in the cluster. +func NewDefaultActivationStrategy() defaultActivationStrategy { return defaultActivationStrategy{} } diff --git a/cluster/agent.go b/cluster/agent.go index 9daac75..0f91b3b 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -8,23 +8,17 @@ import ( "golang.org/x/exp/maps" ) -type getActive struct { - id string -} - -type getMembers struct{} - -type getKinds struct{} - -type activate struct { - kind string - id string - region string -} - -type deactivate struct { - pid *actor.PID -} +type ( + activate struct { + kind string + id string + region string + } + getMembers struct{} + getKinds struct{} + deactivate struct{ pid *actor.PID } + getActive struct{ id string } +) type Agent struct { members *MemberSet @@ -59,6 +53,7 @@ func NewAgent(c *Cluster) actor.Producer { func (a *Agent) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: + case actor.Stopped: case *ActorTopology: a.handleActorTopology(msg) case *Members: @@ -129,7 +124,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID { slog.Warn("could not find any members with kind", "kind", kind) return nil } - owner := a.cluster.activationStrategy.ActivateOnMember(ActivationDetails{ + owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{ Members: members, Region: region, Kind: kind, @@ -149,7 +144,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID { // Remote activation // TODO: topology hash - resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.requestTimeout).Result() + resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result() if err != nil { slog.Error("failed activation request", "err", err) return nil @@ -213,7 +208,7 @@ func (a *Agent) memberJoin(member *Member) { Member: member, }) - slog.Debug("member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region) + slog.Info("[CLUSTER] member joined", "id", member.ID, "host", member.Host, "kinds", member.Kinds, "region", member.Region) } func (a *Agent) memberLeave(member *Member) { @@ -229,7 +224,7 @@ func (a *Agent) memberLeave(member *Member) { a.cluster.engine.BroadcastEvent(MemberLeaveEvent{Member: member}) - slog.Debug("member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds) + slog.Info("[CLUSTER] member left", "id", member.ID, "host", member.Host, "kinds", member.Kinds) } func (a *Agent) bcast(msg any) { diff --git a/cluster/cluster.go b/cluster/cluster.go index dbaa5e5..56f9f68 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,84 +3,149 @@ package cluster import ( fmt "fmt" "log/slog" + "math" + "math/rand" "reflect" + "sync" "time" "github.com/anthdm/hollywood/actor" - "github.com/google/uuid" + "github.com/anthdm/hollywood/remote" ) -// Producer is a function that can produce an actor.Producer. -// Pretty simple, but yet powerfull tool to construct receivers -// depending on Cluster. +// pick a reasonable timeout so nodes of long distance networks (should) work. +var defaultRequestTimeout = time.Second + +// Producer is a function that produces an actor.Producer given a *cluster.Cluster. +// Pretty simple, but yet powerfull tool to construct receivers that are depending on Cluster. type Producer func(c *Cluster) actor.Producer // Config holds the cluster configuration type Config struct { - // The individual ID of this specific node - ID string - // The region this node is hosted - Region string + listenAddr string + id string + region string + activationStrategy ActivationStrategy + engine *actor.Engine + provider Producer + requestTimeout time.Duration +} - ActivationStrategy ActivationStrategy - Engine *actor.Engine - ClusterProvider Producer - RequestTimeout time.Duration +// NewConfig returns a Config that is initialized with default values. +func NewConfig() Config { + return Config{ + listenAddr: getRandomListenAddr(), + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + activationStrategy: NewDefaultActivationStrategy(), + provider: NewSelfManagedProvider(NewSelfManagedConfig()), + requestTimeout: defaultRequestTimeout, + } } -type Cluster struct { - id string - region string - requestTimeout time.Duration +// WithRequestTimeout set's the maximum duration of how long a request +// can take between members of the cluster. +// +// Defaults to 1 second to support communication between nodes in +// other regions. +func (config Config) WithRequestTimeout(d time.Duration) Config { + config.requestTimeout = d + return config +} - provider Producer - engine *actor.Engine - agentPID *actor.PID - providerPID *actor.PID +// WithProvider set's the cluster provider. +// +// Defaults to the SelfManagedProvider. +func (config Config) WithProvider(p Producer) Config { + config.provider = p + return config +} - isStarted bool +// WithEngine set's the internal actor engine that will be used +// to power the actors running on the node. +// +// If no engine is given the cluster will instanciate a new +// engine and remote. +func (config Config) WithEngine(e *actor.Engine) Config { + config.engine = e + return config +} - activationStrategy ActivationStrategy +// TODO: Still not convinced about the name "ActivationStrategy". +// TODO: Document this more. +// WithActivationStrategy +func (config Config) WithActivationStrategy(s ActivationStrategy) Config { + config.activationStrategy = s + return config +} - kinds []kind +// WithListenAddr set's the listen address of the underlying remote. +// +// Defaults to a random port number. +func (config Config) WithListenAddr(addr string) Config { + config.listenAddr = addr + return config } -func New(cfg Config) (*Cluster, error) { - if cfg.Engine == nil { - return nil, fmt.Errorf("engine parameter not provided") - } - if cfg.ClusterProvider == nil { - return nil, fmt.Errorf("cluster provider parameter not provided") - } - if cfg.ActivationStrategy == nil { - cfg.ActivationStrategy = DefaultActivationStrategy() - } - if len(cfg.ID) == 0 { - cfg.ID = uuid.New().String() - } - if len(cfg.Region) == 0 { - cfg.Region = "default" +// WithID set's the ID of this node. +// +// Defaults to a random generated ID. +func (config Config) WithID(id string) Config { + config.id = id + return config +} + +// WithRegion set's the region where the member will be hosted. +// +// Defaults to "default" +func (config Config) WithRegion(region string) Config { + config.region = region + return config +} + +// Cluster allows you to write distributed actors. It combines Engine, Remote, and +// Provider which allows members of the cluster to send messages to eachother in a +// self discovering environment. +type Cluster struct { + config Config + engine *actor.Engine + agentPID *actor.PID + providerPID *actor.PID + isStarted bool + kinds []kind +} + +// New returns a new cluster given a Config. +func New(config Config) (*Cluster, error) { + if config.engine == nil { + remote := remote.New(config.listenAddr, nil) + e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) + if err != nil { + return nil, err + } + config.engine = e } - if cfg.RequestTimeout == 0 { - cfg.RequestTimeout = time.Second + c := &Cluster{ + config: config, + engine: config.engine, + kinds: make([]kind, 0), } - return &Cluster{ - id: cfg.ID, - region: cfg.Region, - provider: cfg.ClusterProvider, - engine: cfg.Engine, - kinds: []kind{}, - activationStrategy: cfg.ActivationStrategy, - requestTimeout: cfg.RequestTimeout, - }, nil + return c, nil } // Start the cluster func (c *Cluster) Start() { - c.agentPID = c.engine.Spawn(NewAgent(c), "cluster", actor.WithID(c.id)) - c.providerPID = c.engine.Spawn(c.provider(c), "provider", actor.WithID(c.id)) + c.agentPID = c.engine.Spawn(NewAgent(c), "cluster", actor.WithID(c.config.id)) + c.providerPID = c.engine.Spawn(c.config.provider(c), "provider", actor.WithID(c.config.id)) c.isStarted = true - return +} + +// Stop will shutdown the cluster poisoning all its actors. +func (c *Cluster) Stop() *sync.WaitGroup { + wg := sync.WaitGroup{} + c.engine.Poison(c.agentPID, &wg) + c.engine.Poison(c.providerPID, &wg) + return &wg } // Spawn an actor locally on the node with cluster awareness. @@ -95,25 +160,44 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act return pid } -// TODO: Doc this when its more usefull. type ActivationConfig struct { - // if empty, a unique identifier will be generated. - ID string - Region string + id string + region string +} + +// NewActivationConfig returns a new default config. +func NewActivationConfig() ActivationConfig { + return ActivationConfig{ + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + } +} + +// WithID set's the id of the actor that will be activated on the cluster. +// +// Defaults to a random identifier. +func (config ActivationConfig) WithID(id string) ActivationConfig { + config.id = id + return config +} + +// WithRegion set's the region on where this actor (potentially) will be spawned +// +// Defaults to a "default". +func (config ActivationConfig) WithRegion(region string) ActivationConfig { + config.region = region + return config } // Activate actives the given actor kind with an optional id. If there is no id // given, the engine will create an unique id automatically. -func (c *Cluster) Activate(kind string, config *ActivationConfig) *actor.PID { - if config == nil { - config = &ActivationConfig{} - } +func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID { msg := activate{ kind: kind, - id: config.ID, - region: config.Region, + id: config.id, + region: config.region, } - resp, err := c.engine.Request(c.agentPID, msg, c.requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result() if err != nil { slog.Error("activation failed", "err", err) return nil @@ -159,7 +243,7 @@ func (c *Cluster) HasKindLocal(name string) bool { // Members returns all the members that are part of the cluster. func (c *Cluster) Members() []*Member { - resp, err := c.engine.Request(c.agentPID, getMembers{}, c.requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getMembers{}, c.config.requestTimeout).Result() if err != nil { return []*Member{} } @@ -172,7 +256,7 @@ func (c *Cluster) Members() []*Member { // HasKind returns true whether the given kind is available for activation on // the cluster. func (c *Cluster) HasKind(name string) bool { - resp, err := c.engine.Request(c.agentPID, getKinds{}, c.requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getKinds{}, c.config.requestTimeout).Result() if err != nil { return false } @@ -186,8 +270,9 @@ func (c *Cluster) HasKind(name string) bool { return false } +// TODO: Weird func (c *Cluster) GetActivated(id string) *actor.PID { - resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.requestTimeout).Result() + resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result() if err != nil { return nil } @@ -197,11 +282,6 @@ func (c *Cluster) GetActivated(id string) *actor.PID { return nil } -// PID returns the reachable actor process id, which is the Agent actor. -func (c *Cluster) PID() *actor.PID { - return c.agentPID -} - // Member returns the member info of this node. func (c *Cluster) Member() *Member { kinds := make([]string, len(c.kinds)) @@ -209,10 +289,10 @@ func (c *Cluster) Member() *Member { kinds[i] = c.kinds[i].name } m := &Member{ - ID: c.id, + ID: c.config.id, Host: c.engine.Address(), Kinds: kinds, - Region: c.region, + Region: c.config.region, } return m } @@ -224,5 +304,24 @@ func (c *Cluster) Engine() *actor.Engine { // Region return the region of the cluster. func (c *Cluster) Region() string { - return c.region + return c.config.region +} + +// ID returns the ID of the cluster. +func (c *Cluster) ID() string { + return c.config.id +} + +// Address returns the host/address of the cluster. +func (c *Cluster) Address() string { + return c.agentPID.Address +} + +// PID returns the reachable actor process id, which is the Agent actor. +func (c *Cluster) PID() *actor.PID { + return c.agentPID +} + +func getRandomListenAddr() string { + return fmt.Sprintf("127.0.0.1:%d", rand.Intn(50000)+10000) } diff --git a/cluster/cluster.pb.go b/cluster/cluster.pb.go index c24672a..9da4ea0 100644 --- a/cluster/cluster.pb.go +++ b/cluster/cluster.pb.go @@ -210,6 +210,7 @@ func (x *Members) GetMembers() []*Member { return nil } +// TODO: Deprecated type MembersJoin struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -304,6 +305,53 @@ func (x *MembersLeave) GetMembers() []*Member { return nil } +type Handshake struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Member *Member `protobuf:"bytes,1,opt,name=Member,proto3" json:"Member,omitempty"` +} + +func (x *Handshake) Reset() { + *x = Handshake{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Handshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Handshake) ProtoMessage() {} + +func (x *Handshake) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{5} +} + +func (x *Handshake) GetMember() *Member { + if x != nil { + return x.Member + } + return nil +} + type Topology struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -319,7 +367,7 @@ type Topology struct { func (x *Topology) Reset() { *x = Topology{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -332,7 +380,7 @@ func (x *Topology) String() string { func (*Topology) ProtoMessage() {} func (x *Topology) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -345,7 +393,7 @@ func (x *Topology) ProtoReflect() protoreflect.Message { // Deprecated: Use Topology.ProtoReflect.Descriptor instead. func (*Topology) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{5} + return file_cluster_proto_rawDescGZIP(), []int{6} } func (x *Topology) GetHash() uint64 { @@ -394,7 +442,7 @@ type ActorInfo struct { func (x *ActorInfo) Reset() { *x = ActorInfo{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -407,7 +455,7 @@ func (x *ActorInfo) String() string { func (*ActorInfo) ProtoMessage() {} func (x *ActorInfo) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -420,7 +468,7 @@ func (x *ActorInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use ActorInfo.ProtoReflect.Descriptor instead. func (*ActorInfo) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{6} + return file_cluster_proto_rawDescGZIP(), []int{7} } func (x *ActorInfo) GetPID() *actor.PID { @@ -441,7 +489,7 @@ type ActorTopology struct { func (x *ActorTopology) Reset() { *x = ActorTopology{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -454,7 +502,7 @@ func (x *ActorTopology) String() string { func (*ActorTopology) ProtoMessage() {} func (x *ActorTopology) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -467,7 +515,7 @@ func (x *ActorTopology) ProtoReflect() protoreflect.Message { // Deprecated: Use ActorTopology.ProtoReflect.Descriptor instead. func (*ActorTopology) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{7} + return file_cluster_proto_rawDescGZIP(), []int{8} } func (x *ActorTopology) GetActors() []*ActorInfo { @@ -488,7 +536,7 @@ type Activation struct { func (x *Activation) Reset() { *x = Activation{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -501,7 +549,7 @@ func (x *Activation) String() string { func (*Activation) ProtoMessage() {} func (x *Activation) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -514,7 +562,7 @@ func (x *Activation) ProtoReflect() protoreflect.Message { // Deprecated: Use Activation.ProtoReflect.Descriptor instead. func (*Activation) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{8} + return file_cluster_proto_rawDescGZIP(), []int{9} } func (x *Activation) GetPID() *actor.PID { @@ -535,7 +583,7 @@ type Deactivation struct { func (x *Deactivation) Reset() { *x = Deactivation{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -548,7 +596,7 @@ func (x *Deactivation) String() string { func (*Deactivation) ProtoMessage() {} func (x *Deactivation) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -561,7 +609,7 @@ func (x *Deactivation) ProtoReflect() protoreflect.Message { // Deprecated: Use Deactivation.ProtoReflect.Descriptor instead. func (*Deactivation) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{9} + return file_cluster_proto_rawDescGZIP(), []int{10} } func (x *Deactivation) GetPID() *actor.PID { @@ -585,7 +633,7 @@ type ActivationRequest struct { func (x *ActivationRequest) Reset() { *x = ActivationRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -598,7 +646,7 @@ func (x *ActivationRequest) String() string { func (*ActivationRequest) ProtoMessage() {} func (x *ActivationRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -611,7 +659,7 @@ func (x *ActivationRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead. func (*ActivationRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{10} + return file_cluster_proto_rawDescGZIP(), []int{11} } func (x *ActivationRequest) GetKind() string { @@ -655,7 +703,7 @@ type ActivationResponse struct { func (x *ActivationResponse) Reset() { *x = ActivationResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -668,7 +716,7 @@ func (x *ActivationResponse) String() string { func (*ActivationResponse) ProtoMessage() {} func (x *ActivationResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[11] + mi := &file_cluster_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -681,7 +729,7 @@ func (x *ActivationResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead. func (*ActivationResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{11} + return file_cluster_proto_rawDescGZIP(), []int{12} } func (x *ActivationResponse) GetPID() *actor.PID { @@ -732,49 +780,52 @@ var file_cluster_proto_rawDesc = []byte{ 0x72, 0x73, 0x22, 0x39, 0x0a, 0x0c, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, - 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x22, 0xc2, 0x01, - 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, - 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x29, - 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, - 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, 0x23, 0x0a, 0x04, 0x6c, 0x65, 0x66, - 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x04, 0x6c, 0x65, 0x66, 0x74, 0x12, 0x27, - 0x0a, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, + 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x22, 0x34, 0x0a, + 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x4d, 0x65, + 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x06, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x22, 0xc2, 0x01, 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, + 0x68, 0x61, 0x73, 0x68, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, + 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, + 0x23, 0x0a, 0x04, 0x6c, 0x65, 0x66, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x04, + 0x6c, 0x65, 0x66, 0x74, 0x12, 0x27, 0x0a, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, + 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x12, 0x29, 0x0a, + 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, - 0x06, 0x6a, 0x6f, 0x69, 0x6e, 0x65, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x65, 0x64, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x2e, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x65, 0x64, 0x22, 0x29, 0x0a, 0x09, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x3b, 0x0a, - 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x54, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x2a, - 0x0a, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x22, 0x2a, 0x0a, 0x0a, 0x41, 0x63, - 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, - 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x2c, 0x0a, 0x0c, 0x44, 0x65, 0x61, 0x63, 0x74, 0x69, - 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, - 0x03, 0x50, 0x49, 0x44, 0x22, 0x73, 0x0a, 0x11, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x4b, 0x69, 0x6e, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0e, 0x0a, - 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x12, 0x16, 0x0a, - 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x52, - 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, - 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, - 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x22, 0x70, 0x0a, 0x12, 0x41, 0x63, 0x74, - 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, - 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x12, 0x18, 0x0a, - 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, - 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, - 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, - 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x42, 0x25, 0x5a, 0x23, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6e, 0x74, 0x68, 0x64, 0x6d, - 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x29, 0x0a, 0x09, 0x41, 0x63, 0x74, 0x6f, + 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, + 0x50, 0x49, 0x44, 0x22, 0x3b, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x6f, 0x72, 0x54, 0x6f, 0x70, 0x6f, + 0x6c, 0x6f, 0x67, 0x79, 0x12, 0x2a, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x41, + 0x63, 0x74, 0x6f, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x06, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, + 0x22, 0x2a, 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, + 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x2c, 0x0a, 0x0c, + 0x44, 0x65, 0x61, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, + 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, + 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, 0x50, 0x49, 0x44, 0x22, 0x73, 0x0a, 0x11, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x12, 0x0a, 0x04, 0x4b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x4b, + 0x69, 0x6e, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x52, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, 0x22, 0x0a, 0x0c, 0x74, + 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x22, + 0x70, 0x0a, 0x12, 0x41, 0x63, 0x74, 0x69, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x03, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x03, + 0x50, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x22, 0x0a, + 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x6f, 0x6c, 0x6f, 0x67, 0x79, 0x48, 0x61, 0x73, + 0x68, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x61, 0x6e, 0x74, 0x68, 0x64, 0x6d, 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, + 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -789,41 +840,43 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_cluster_proto_goTypes = []interface{}{ (*CID)(nil), // 0: cluster.CID (*Member)(nil), // 1: cluster.Member (*Members)(nil), // 2: cluster.Members (*MembersJoin)(nil), // 3: cluster.MembersJoin (*MembersLeave)(nil), // 4: cluster.MembersLeave - (*Topology)(nil), // 5: cluster.Topology - (*ActorInfo)(nil), // 6: cluster.ActorInfo - (*ActorTopology)(nil), // 7: cluster.ActorTopology - (*Activation)(nil), // 8: cluster.Activation - (*Deactivation)(nil), // 9: cluster.Deactivation - (*ActivationRequest)(nil), // 10: cluster.ActivationRequest - (*ActivationResponse)(nil), // 11: cluster.ActivationResponse - (*actor.PID)(nil), // 12: actor.PID + (*Handshake)(nil), // 5: cluster.Handshake + (*Topology)(nil), // 6: cluster.Topology + (*ActorInfo)(nil), // 7: cluster.ActorInfo + (*ActorTopology)(nil), // 8: cluster.ActorTopology + (*Activation)(nil), // 9: cluster.Activation + (*Deactivation)(nil), // 10: cluster.Deactivation + (*ActivationRequest)(nil), // 11: cluster.ActivationRequest + (*ActivationResponse)(nil), // 12: cluster.ActivationResponse + (*actor.PID)(nil), // 13: actor.PID } var file_cluster_proto_depIdxs = []int32{ - 12, // 0: cluster.CID.PID:type_name -> actor.PID + 13, // 0: cluster.CID.PID:type_name -> actor.PID 1, // 1: cluster.Members.members:type_name -> cluster.Member 1, // 2: cluster.MembersJoin.members:type_name -> cluster.Member 1, // 3: cluster.MembersLeave.members:type_name -> cluster.Member - 1, // 4: cluster.Topology.members:type_name -> cluster.Member - 1, // 5: cluster.Topology.left:type_name -> cluster.Member - 1, // 6: cluster.Topology.joined:type_name -> cluster.Member - 1, // 7: cluster.Topology.blocked:type_name -> cluster.Member - 12, // 8: cluster.ActorInfo.PID:type_name -> actor.PID - 6, // 9: cluster.ActorTopology.actors:type_name -> cluster.ActorInfo - 12, // 10: cluster.Activation.PID:type_name -> actor.PID - 12, // 11: cluster.Deactivation.PID:type_name -> actor.PID - 12, // 12: cluster.ActivationResponse.PID:type_name -> actor.PID - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 1, // 4: cluster.Handshake.Member:type_name -> cluster.Member + 1, // 5: cluster.Topology.members:type_name -> cluster.Member + 1, // 6: cluster.Topology.left:type_name -> cluster.Member + 1, // 7: cluster.Topology.joined:type_name -> cluster.Member + 1, // 8: cluster.Topology.blocked:type_name -> cluster.Member + 13, // 9: cluster.ActorInfo.PID:type_name -> actor.PID + 7, // 10: cluster.ActorTopology.actors:type_name -> cluster.ActorInfo + 13, // 11: cluster.Activation.PID:type_name -> actor.PID + 13, // 12: cluster.Deactivation.PID:type_name -> actor.PID + 13, // 13: cluster.ActivationResponse.PID:type_name -> actor.PID + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -893,7 +946,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Topology); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -905,7 +958,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActorInfo); i { + switch v := v.(*Topology); i { case 0: return &v.state case 1: @@ -917,7 +970,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActorTopology); i { + switch v := v.(*ActorInfo); i { case 0: return &v.state case 1: @@ -929,7 +982,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Activation); i { + switch v := v.(*ActorTopology); i { case 0: return &v.state case 1: @@ -941,7 +994,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Deactivation); i { + switch v := v.(*Activation); i { case 0: return &v.state case 1: @@ -953,7 +1006,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ActivationRequest); i { + switch v := v.(*Deactivation); i { case 0: return &v.state case 1: @@ -965,6 +1018,18 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ActivationRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ActivationResponse); i { case 0: return &v.state @@ -983,7 +1048,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cluster_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/cluster.proto b/cluster/cluster.proto index c164f41..4498f38 100644 --- a/cluster/cluster.proto +++ b/cluster/cluster.proto @@ -21,6 +21,7 @@ message Members { repeated Member members = 1; } +// TODO: Deprecated message MembersJoin { repeated Member members = 1; } @@ -28,6 +29,10 @@ message MembersJoin { message MembersLeave { repeated Member members = 1; } + +message Handshake { + Member Member = 1; +} message Topology { uint64 hash = 1; diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 9676186..07ef320 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -28,18 +28,19 @@ func NewInventory() actor.Receiver { func (i Inventory) Receive(c *actor.Context) {} -func TestClusterShouldWorkWithDefaultValues(t *testing.T) { - remote := remote.New(getRandomLocalhostAddr(), nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) +func TestFooBarBaz(t *testing.T) { + config := NewConfig() + cluster, err := New(config) assert.Nil(t, err) - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(), - Engine: e, - } - c, err := New(cfg) + _ = cluster +} + +func TestClusterShouldWorkWithDefaultValues(t *testing.T) { + config := NewConfig() + c, err := New(config) assert.Nil(t, err) - assert.True(t, len(c.id) > 0) - assert.Equal(t, c.region, "default") + assert.True(t, len(c.config.id) > 0) + assert.Equal(t, c.config.region, "default") } func TestRegisterKind(t *testing.T) { @@ -51,18 +52,15 @@ func TestRegisterKind(t *testing.T) { } func TestClusterSpawn(t *testing.T) { - c1Addr := getRandomLocalhostAddr() - c1 := makeCluster(t, c1Addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: c1Addr, - ID: "A", - }) - - expectedPID := actor.NewPID(c1Addr, "player/1") + var ( + c1Addr = getRandomLocalhostAddr() + c1 = makeCluster(t, c1Addr, "A", "eu-west") + c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + wg = sync.WaitGroup{} + expectedPID = actor.NewPID(c1Addr, "player/1") + ) - wg := sync.WaitGroup{} wg.Add(2) - eventPID := c1.engine.SpawnFunc(func(c *actor.Context) { switch msg := c.Message().(type) { case MemberJoinEvent: @@ -88,15 +86,14 @@ func TestClusterSpawn(t *testing.T) { c1.Start() c2.Start() wg.Wait() + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMemberJoin(t *testing.T) { - addr := getRandomLocalhostAddr() - c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") + c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") c2.RegisterKind("player", NewPlayer, nil) wg := sync.WaitGroup{} @@ -105,6 +102,7 @@ func TestMemberJoin(t *testing.T) { switch msg := c.Message().(type) { // we do this so we are 100% sure nodes are connected with eachother. case MemberJoinEvent: + fmt.Println(msg) if msg.Member.ID == "B" { _ = msg wg.Done() @@ -118,15 +116,17 @@ func TestMemberJoin(t *testing.T) { wg.Wait() assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestActivate(t *testing.T) { - addr := getRandomLocalhostAddr() - c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + var ( + addr = getRandomLocalhostAddr() + c1 = makeCluster(t, addr, "A", "eu-west") + c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + ) c2.RegisterKind("player", NewPlayer, nil) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") @@ -139,7 +139,7 @@ func TestActivate(t *testing.T) { if msg.Member.ID == "B" { // Because c1 doesnt have player registered locally we can only spawned // the player on c2 - pid := c1.Activate("player", &ActivationConfig{ID: "1"}) + pid := c1.Activate("player", NewActivationConfig().WithID("1")) assert.True(t, pid.Equals(expectedPID)) } wg.Done() @@ -154,15 +154,15 @@ func TestActivate(t *testing.T) { assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) assert.True(t, c1.GetActivated("player/1").Equals(expectedPID)) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestDeactivate(t *testing.T) { addr := getRandomLocalhostAddr() c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ - ListenAddr: addr, - ID: "A", - }) + c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") c2.RegisterKind("player", NewPlayer, nil) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") @@ -172,7 +172,7 @@ func TestDeactivate(t *testing.T) { switch msg := c.Message().(type) { case MemberJoinEvent: if msg.Member.ID == "B" { - pid := c1.Activate("player", &ActivationConfig{ID: "1"}) + pid := c1.Activate("player", NewActivationConfig().WithID("1")) assert.True(t, pid.Equals(expectedPID)) } case ActivationEvent: @@ -190,27 +190,25 @@ func TestDeactivate(t *testing.T) { assert.Equal(t, len(c1.Members()), 2) assert.True(t, c1.HasKind("player")) assert.Nil(t, c1.GetActivated("player/1")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMemberLeave(t *testing.T) { c1Addr := getRandomLocalhostAddr() c2Addr := getRandomLocalhostAddr() - remote := remote.New(c2Addr, nil) + remote := remote.New(c2Addr, nil) e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) if err != nil { log.Fatal(err) } - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(MemberAddr{ - ListenAddr: c1Addr, - ID: "A", - }), - ID: "B", - Region: "eu-east", - Engine: e, - } - c2, err := New(cfg) + config := NewConfig(). + WithID("B"). + WithRegion("eu-east"). + WithEngine(e) + c2, err := New(config) assert.Nil(t, err) c1 := makeCluster(t, c1Addr, "A", "eu-west") @@ -226,7 +224,7 @@ func TestMemberLeave(t *testing.T) { remote.Stop().Wait() } case MemberLeaveEvent: - assert.Equal(t, msg.Member.ID, c2.id) + assert.Equal(t, msg.Member.ID, c2.ID()) wg.Done() } }, "event") @@ -236,6 +234,9 @@ func TestMemberLeave(t *testing.T) { wg.Wait() assert.Equal(t, len(c1.Members()), 1) assert.False(t, c1.HasKind("player")) + + c1.Stop().Wait() + c2.Stop().Wait() } func TestMembersExcept(t *testing.T) { @@ -268,23 +269,16 @@ func TestMembersExcept(t *testing.T) { assert.Equal(t, am[0].ID, "C") } -func makeCluster(t *testing.T, addr, id, region string, members ...MemberAddr) *Cluster { - remote := remote.New(addr, nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote}) - if err != nil { - log.Fatal(err) - } - cfg := Config{ - ClusterProvider: NewSelfManagedProvider(members...), - ID: id, - Region: region, - Engine: e, - } - c, err := New(cfg) +func makeCluster(t *testing.T, addr, id, region string) *Cluster { + config := NewConfig(). + WithID(id). + WithListenAddr(addr). + WithRegion(region) + c, err := New(config) assert.Nil(t, err) return c } func getRandomLocalhostAddr() string { - return fmt.Sprintf("localhost:%d", rand.Intn(50000)+10000) + return fmt.Sprintf("127.0.0.1:%d", rand.Intn(50000)+10000) } diff --git a/cluster/cluster_vtproto.pb.go b/cluster/cluster_vtproto.pb.go index 9d6b21d..30e2e70 100644 --- a/cluster/cluster_vtproto.pb.go +++ b/cluster/cluster_vtproto.pb.go @@ -141,6 +141,24 @@ func (m *MembersLeave) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *Handshake) CloneVT() *Handshake { + if m == nil { + return (*Handshake)(nil) + } + r := &Handshake{ + Member: m.Member.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *Handshake) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *Topology) CloneVT() *Topology { if m == nil { return (*Topology)(nil) @@ -491,6 +509,25 @@ func (this *MembersLeave) EqualMessageVT(thatMsg proto.Message) bool { } return this.EqualVT(that) } +func (this *Handshake) EqualVT(that *Handshake) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if !this.Member.EqualVT(that.Member) { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *Handshake) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*Handshake) + if !ok { + return false + } + return this.EqualVT(that) +} func (this *Topology) EqualVT(that *Topology) bool { if this == that { return true @@ -1011,6 +1048,49 @@ func (m *MembersLeave) MarshalToSizedBufferVT(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Handshake) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Handshake) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *Handshake) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Member != nil { + size, err := m.Member.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *Topology) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1721,6 +1801,49 @@ func (m *MembersLeave) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Handshake) MarshalVTStrict() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVTStrict(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Handshake) MarshalToVTStrict(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVTStrict(dAtA[:size]) +} + +func (m *Handshake) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.Member != nil { + size, err := m.Member.MarshalToSizedBufferVTStrict(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *Topology) MarshalVTStrict() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -2254,6 +2377,20 @@ func (m *MembersLeave) SizeVT() (n int) { return n } +func (m *Handshake) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Member != nil { + l = m.Member.SizeVT() + n += 1 + l + sov(uint64(l)) + } + n += len(m.unknownFields) + return n +} + func (m *Topology) SizeVT() (n int) { if m == nil { return 0 @@ -3049,6 +3186,93 @@ func (m *MembersLeave) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *Handshake) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Handshake: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Handshake: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Member", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Member == nil { + m.Member = &Member{} + } + if err := m.Member.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Topology) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index 0763fd1..66eb166 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -1,44 +1,79 @@ package cluster import ( + "context" + "fmt" + "log" + "log/slog" + "net" + "reflect" + "strconv" "time" "github.com/anthdm/hollywood/actor" + "github.com/grandcat/zeroconf" ) -const memberPingInterval = time.Second * 5 +const ( + serviceName = "_actor.hollywood_" + domain = "local." + memberPingInterval = time.Second * 2 +) +// MemberAddr represents a reachable node in the cluster. type MemberAddr struct { ListenAddr string ID string } -type memberLeave struct { - ListenAddr string +type ( + memberLeave struct { + ListenAddr string + } + memberPing struct{} +) + +type SelfManagedConfig struct { + bootstrapMembers []MemberAddr } -type memberPing struct{} +func NewSelfManagedConfig() SelfManagedConfig { + return SelfManagedConfig{ + bootstrapMembers: make([]MemberAddr, 0), + } +} + +func (c SelfManagedConfig) WithBootstrapMember(member MemberAddr) SelfManagedConfig { + c.bootstrapMembers = append(c.bootstrapMembers, member) + return c +} type SelfManaged struct { - cluster *Cluster - bootstrapAddrs []MemberAddr - members *MemberSet - memberPinger actor.SendRepeater - eventSubPID *actor.PID + config SelfManagedConfig + cluster *Cluster + members *MemberSet + memberPinger actor.SendRepeater + eventSubPID *actor.PID pid *actor.PID membersAlive *MemberSet + + resolver *zeroconf.Resolver + announcer *zeroconf.Server + + ctx context.Context + cancel context.CancelFunc } -func NewSelfManagedProvider(addrs ...MemberAddr) Producer { +func NewSelfManagedProvider(config SelfManagedConfig) Producer { return func(c *Cluster) actor.Producer { return func() actor.Receiver { return &SelfManaged{ - cluster: c, - bootstrapAddrs: addrs, - members: NewMemberSet(), - membersAlive: NewMemberSet(), + config: config, + cluster: c, + members: NewMemberSet(), + membersAlive: NewMemberSet(), } } } @@ -47,71 +82,69 @@ func NewSelfManagedProvider(addrs ...MemberAddr) Producer { func (s *SelfManaged) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: + s.ctx, s.cancel = context.WithCancel(context.Background()) s.pid = c.PID() + s.members.Add(s.cluster.Member()) - members := &Members{ - Members: s.members.Slice(), - } - s.cluster.engine.Send(s.cluster.PID(), members) + s.sendMembersToAgent() + s.memberPinger = c.SendRepeat(c.PID(), memberPing{}, memberPingInterval) s.start(c) case actor.Stopped: s.memberPinger.Stop() s.cluster.engine.Unsubscribe(s.eventSubPID) - case *MembersJoin: - for _, member := range msg.Members { - s.addMember(member) - } - ourMembers := &Members{ - Members: s.members.Slice(), - } - s.members.ForEach(func(member *Member) bool { - s.cluster.engine.Send(memberToProviderPID(member), ourMembers) - return true + s.announcer.Shutdown() + s.cancel() + case *Handshake: + s.addMembers(msg.Member) + members := s.members.Slice() + s.cluster.engine.Send(c.Sender(), &Members{ + Members: members, }) case *Members: - for _, member := range msg.Members { - s.addMember(member) - } - if s.members.Len() > 0 { - members := &Members{ - Members: s.members.Slice(), - } - s.cluster.engine.Send(s.cluster.PID(), members) - } + s.addMembers(msg.Members...) case memberPing: - s.members.ForEach(func(member *Member) bool { - if member.Host != s.cluster.agentPID.Address { - ping := &actor.Ping{ - From: c.PID(), - } - c.Send(memberToProviderPID(member), ping) - } - return true - }) + s.handleMemberPing(c) case memberLeave: member := s.members.GetByHost(msg.ListenAddr) s.removeMember(member) + case *actor.Ping: + _ = msg + default: + slog.Warn("received unhandled message", "msg", msg, "t", reflect.TypeOf(msg)) } } -// If we receive members from another node in the cluster -// we respond with all the members we know of, and ofcourse -// add the new one. -func (s *SelfManaged) addMember(member *Member) { - if !s.members.Contains(member) { - s.members.Add(member) +func (s *SelfManaged) handleMemberPing(c *actor.Context) { + s.members.ForEach(func(member *Member) bool { + if member.Host != s.cluster.agentPID.Address { + ping := &actor.Ping{ + From: c.PID(), + } + c.Send(memberToProviderPID(member), ping) + } + return true + }) +} + +func (s *SelfManaged) addMembers(members ...*Member) { + for _, member := range members { + if !s.members.Contains(member) { + s.members.Add(member) + } } + s.sendMembersToAgent() } func (s *SelfManaged) removeMember(member *Member) { if s.members.Contains(member) { s.members.Remove(member) } - s.updateCluster() + s.sendMembersToAgent() } -func (s *SelfManaged) updateCluster() { +// send all the current members to the local cluster agent. +func (s *SelfManaged) sendMembersToAgent() { members := &Members{ Members: s.members.Slice(), } @@ -119,20 +152,79 @@ func (s *SelfManaged) updateCluster() { } func (s *SelfManaged) start(c *actor.Context) { - s.eventSubPID = c.SpawnChildFunc(func(ctx *actor.Context) { - switch msg := ctx.Message().(type) { - case actor.RemoteUnreachableEvent: - ctx.Send(s.pid, memberLeave{ListenAddr: msg.ListenAddr}) - } - }, "event") - + s.eventSubPID = c.SpawnChildFunc(s.handleEventStream, "event") s.cluster.engine.Subscribe(s.eventSubPID) - members := &MembersJoin{ - Members: s.members.Slice(), + // send handshake to all bootstrap members if any. + for _, member := range s.config.bootstrapMembers { + memberPID := actor.NewPID(member.ListenAddr, "provider/"+member.ID) + s.cluster.engine.SendWithSender(memberPID, &Handshake{ + Member: s.cluster.Member(), + }, c.PID()) + } + + s.initAutoDiscovery() + s.startAutoDiscovery() +} + +func (s *SelfManaged) initAutoDiscovery() { + resolver, err := zeroconf.NewResolver() + if err != nil { + log.Fatal(err) + } + s.resolver = resolver + + host, portstr, err := net.SplitHostPort(s.cluster.agentPID.Address) + if err != nil { + log.Fatal(err) + } + port, err := strconv.Atoi(portstr) + if err != nil { + log.Fatal(err) } - for _, ma := range s.bootstrapAddrs { - memberPID := actor.NewPID(ma.ListenAddr, "provider/"+ma.ID) - s.cluster.engine.Send(memberPID, members) + + server, err := zeroconf.RegisterProxy( + s.cluster.ID(), + serviceName, + domain, + port, + fmt.Sprintf("member_%s", s.cluster.ID()), + []string{host}, + []string{"txtv=0", "lo=1", "la=2"}, nil) + if err != nil { + log.Fatal(err) + } + s.announcer = server +} + +func (s *SelfManaged) startAutoDiscovery() { + entries := make(chan *zeroconf.ServiceEntry) + go func(results <-chan *zeroconf.ServiceEntry) { + for entry := range results { + if entry.Instance != s.cluster.ID() { + host := fmt.Sprintf("%s:%d", entry.AddrIPv4[0], entry.Port) + hs := &Handshake{ + Member: s.cluster.Member(), + } + // create the reachable PID for this member. + memberPID := actor.NewPID(host, "provider/"+entry.Instance) + self := actor.NewPID(s.cluster.agentPID.Address, "provider/"+s.cluster.ID()) + s.cluster.engine.SendWithSender(memberPID, hs, self) + } + } + slog.Info("[CLUSTER] stopping discovery", "id", s.cluster.ID()) + }(entries) + + err := s.resolver.Browse(s.ctx, serviceName, domain, entries) + if err != nil { + slog.Error("[CLUSTER] discovery failed", "err", err) + panic(err) + } +} + +func (s *SelfManaged) handleEventStream(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.RemoteUnreachableEvent: + c.Send(s.pid, memberLeave{ListenAddr: msg.ListenAddr}) } } diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index cc29fb3..ecad953 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -2,7 +2,6 @@ package main import ( "log" - "time" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" @@ -12,19 +11,11 @@ import ( // Member 1 of the cluster func main() { - r := remote.New("127.0.0.1:3000", nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: r}) - if err != nil { - log.Fatal(err) - } - c, err := cluster.New(cluster.Config{ - ID: "A", - Engine: e, - Region: "eu-west", - ClusterProvider: cluster.NewSelfManagedProvider(), - ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), - RequestTimeout: time.Second, - }) + config := cluster.NewConfig(). + WithID("A"). + WithListenAddr("127.0.0.1:3000"). + WithRegion("eu-west") + c, err := cluster.New(config) if err != nil { log.Fatal(err) } @@ -34,12 +25,12 @@ func main() { switch msg := ctx.Message().(type) { case cluster.MemberJoinEvent: if msg.Member.ID == "B" { - msg := &cluster.ActivationConfig{ - ID: "bob", - Region: "us-west", - } - playerPID := c.Activate("playerSession", msg) - ctx.Send(playerPID, &remote.TestMessage{Data: []byte("hello from member 1")}) + config := cluster.NewActivationConfig(). + WithID("bob"). + WithRegion("us-west") + playerPID := c.Activate("playerSession", config) + msg := &remote.TestMessage{Data: []byte("hello from member 1")} + ctx.Send(playerPID, msg) } } }, "event") diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index 91609a5..8d36030 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -2,33 +2,18 @@ package main import ( "log" - "time" - "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" "github.com/anthdm/hollywood/examples/cluster/shared" - "github.com/anthdm/hollywood/remote" ) // Member 2 of the cluster func main() { - bootstrapAddr := cluster.MemberAddr{ - ListenAddr: "127.0.0.1:3000", - ID: "A", - } - r := remote.New("127.0.0.1:3001", nil) - e, err := actor.NewEngine(&actor.EngineConfig{Remote: r}) - if err != nil { - log.Fatal(err) - } - cluster, err := cluster.New(cluster.Config{ - ID: "B", - Engine: e, - Region: "us-west", - ClusterProvider: cluster.NewSelfManagedProvider(bootstrapAddr), - ActivationStrategy: shared.RegionBasedActivationStrategy("eu-west"), - RequestTimeout: time.Second, - }) + config := cluster.NewConfig(). + WithID("B"). + WithListenAddr("127.0.0.1:3001"). + WithRegion("us-west") + cluster, err := cluster.New(config) if err != nil { log.Fatal(err) } diff --git a/remote/remote.go b/remote/remote.go index 25ee549..a9b1529 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -109,10 +109,8 @@ func (r *Remote) Stop() *sync.WaitGroup { slog.Warn("remote already stopped but stop was called", "state", r.state.Load()) return &sync.WaitGroup{} // return empty waitgroup so the caller can still wait without panicking. } - slog.Debug("stopping remote") r.state.Store(stateStopped) r.stopCh <- struct{}{} - slog.Debug("stop signal sent") return r.stopWg } From da5c2fd388a9ed1ba20d0a86dba146ccbcd2d5b1 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Mon, 1 Jan 2024 19:57:55 +0100 Subject: [PATCH 4/4] updated readme (#142) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 83b4b22..2f66999 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ large number of concurrent users and complex interactions. - High performance dRPC as the transport layer - Optimized proto buffers without reflection - Lightweight and highly customizable -- Cluster support with DNS auto discovery for nodes that are on the same network +- Cluster support for writing distributed self discovering actors # Benchmarks