From 1993ba37c2f25f6e900f36ffdddf1525276c9ac9 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 13:37:27 +0100 Subject: [PATCH 1/7] wip; architecture in place. some weird loop is happening, --- actor/deadletter.go | 26 ++++++---- actor/deadletter_test.go | 71 +++++++++++++++++--------- actor/engine.go | 17 ++----- actor/engine_test.go | 20 ++++++++ actor/event_stream.go | 47 +++++++++++++---- actor/events.go | 107 +++++++++++++++++++++++++++++++++++++++ actor/process.go | 29 ++++++----- actor/registry.go | 5 +- actor/thelpers.go | 25 --------- actor/types.go | 22 -------- 10 files changed, 247 insertions(+), 122 deletions(-) create mode 100644 actor/events.go delete mode 100644 actor/thelpers.go diff --git a/actor/deadletter.go b/actor/deadletter.go index 8e034e6..44e9859 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -5,36 +5,40 @@ import ( "reflect" ) -// +// DeadLetterEvent is delivered to the deadletter actor when a message can't be delivered to it's recipient +type DeadLetterEvent struct { + Target *PID + Message any + Sender *PID +} type deadLetter struct { pid *PID } func newDeadLetter() Receiver { - pid := NewPID(LocalLookupAddr, "deadLetter") + pid := NewPID(LocalLookupAddr, "deadletter") return &deadLetter{ pid: pid, } } // Receive implements the Receiver interface, handling the deadletter messages. -// It will log the deadletter message if a logger is set. If not, it will silently -// ignore the message. Any production system should either have a logger set or provide a custom -// deadletter actor. +// Any production system should provide a custom deadletter handler. func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: - // intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise. - slog.Debug("default deadletter actor started") + // Subscribe to deadletters + ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid}) case Stopped: - slog.Debug("default deadletter actor stopped") + ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid}) case Initialized: - slog.Debug("default deadletter actor initialized") - case *DeadLetterEvent: + slog.Debug("default deadletter actor initializing") + case DeadLetterEvent: slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg), "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) default: - slog.Error("unknown message arrived at deadletter", "msg", msg) + slog.Error("unknown message arrived at deadletter", "msg", msg, + "msg-type", reflect.TypeOf(msg)) } } diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 59ec3f4..d635f24 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -4,6 +4,8 @@ import ( "bytes" "fmt" "log/slog" + "os" + "reflect" "sync" "testing" "time" @@ -16,21 +18,23 @@ import ( // received the message. func TestDeadLetterDefault(t *testing.T) { logBuffer := SafeBuffer{} - lh := slog.NewTextHandler(&logBuffer, nil) - logger := slog.New(lh) + logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug})) slog.SetDefault(logger) e, err := NewEngine() assert.NoError(t, err) + time.Sleep(10 * time.Millisecond) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) // should be registered by default - e.Poison(a1).Wait() // poison the a1 actor - e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue - time.Sleep(time.Millisecond) // a flush would be nice here + assert.NotNil(t, dl) // should be registered by default + e.Poison(a1).Wait() // poison the a1 actor + e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue + time.Sleep(time.Millisecond * 100) // a flush would be nice here // check the log buffer for the deadletter - assert.Contains(t, logBuffer.String(), "deadletter arrived") + logStr := logBuffer.String() + fmt.Println(logStr) + assert.Contains(t, logStr, "deadletter arrived") } @@ -39,25 +43,28 @@ func TestDeadLetterDefault(t *testing.T) { // received the message. // It is using the custom deadletter receiver below. func TestDeadLetterCustom(t *testing.T) { + debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + slog.SetDefault(debuglogger) e, err := NewEngine( EngineOptDeadletter(newCustomDeadLetter)) assert.NoError(t, err) + time.Sleep(10 * time.Millisecond) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) // should be registered by default + assert.NotNil(t, dl) + es := e.Registry.getByID("eventstream") + assert.NotNil(t, es) + // kill a1 actor. e.Poison(a1).Wait() // poison the a1 actor - // should be in deadletter fmt.Println("==== sending message via a1 to deadletter ====") - fmt.Println(e.Registry) - fmt.Println("ID=> ", dl.PID()) e.Send(a1, testMessage{"bar"}) - time.Sleep(time.Millisecond) // a flush would be nice here :-) - resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() + time.Sleep(time.Millisecond * 100) // a flush would be nice here :-) + resp, err := e.Request(dl.PID(), customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() assert.Nil(t, err) // no error from the request assert.NotNil(t, resp) // we should get a response to our request - respDeadLetters, ok := resp.([]*DeadLetterEvent) + respDeadLetters, ok := resp.([]DeadLetterEvent) assert.True(t, ok) // got a slice of deadletter events assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event ev, ok := respDeadLetters[0].Message.(testMessage) @@ -81,32 +88,50 @@ type customDeadLetterFetch struct{ flush bool } // customDeadLetter is a custom deadletter actor / receiver type customDeadLetter struct { - deadLetters []*DeadLetterEvent + deadLetters []DeadLetterEvent } func newCustomDeadLetter() Receiver { return &customDeadLetter{ - deadLetters: make([]*DeadLetterEvent, 0), + deadLetters: make([]DeadLetterEvent, 0), } } // Receive implements the Receiver interface. This is a OK example of an actor that // that deals with deadletters. It will store the deadletters in a slice. func (c *customDeadLetter) Receive(ctx *Context) { + es := ctx.engine.Registry.getByID("eventstream") + if es == nil { + slog.Error("custom deadletter; no eventstream found") + } switch ctx.Message().(type) { - case *customDeadLetterFetch: + case Started: + slog.Debug("custom deadletter starting", "action", "subscribing") + ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid}) + time.Sleep(time.Millisecond * 10) + case Stopped: + slog.Debug("custom deadletter stopping", "action", "unsubscribing") + ctx.engine.BroadcastEvent(DeadletterUnSub{pid: ctx.pid}) + case customDeadLetterFetch: + flush := ctx.Message().(customDeadLetterFetch).flush + slog.Debug("custom deadletter; received fetch request", + "flush", flush, + "messages", len(c.deadLetters)) ctx.Respond(c.deadLetters) - if ctx.Message().(*customDeadLetterFetch).flush { - c.deadLetters = make([]*DeadLetterEvent, 0) + if ctx.Message().(customDeadLetterFetch).flush { + c.deadLetters = c.deadLetters[:0] } - case *DeadLetterEvent: - slog.Warn("received deadletter event") - msg, ok := ctx.Message().(*DeadLetterEvent) + case DeadLetterEvent: + slog.Warn("custom deadletter; received deadletter event") + msg, ok := ctx.Message().(DeadLetterEvent) if !ok { - slog.Error("failed to cast deadletter event") + slog.Error("should never happen. brain damaged.") return } c.deadLetters = append(c.deadLetters, msg) + default: + slog.Error("custom deadletter; received unknown message", + "msg", ctx.Message(), "msg-type", reflect.TypeOf(ctx.Message())) } } diff --git a/actor/engine.go b/actor/engine.go index 872ef21..d7a55ad 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -1,8 +1,6 @@ package actor import ( - "log/slog" - reflect "reflect" "sync" "time" ) @@ -164,12 +162,7 @@ func (e *Engine) send(pid *PID, msg any, sender *PID) { return } if e.remote == nil { - slog.Error("failed sending messsage", - "err", "engine has no remote configured", - "to", pid, - "type", reflect.TypeOf(msg), - "msg", msg, - ) + e.BroadcastEvent(EngineRemoteMissingEvent{Target: pid, Sender: sender, Message: msg}) return } e.remote.Send(pid, msg, sender) @@ -243,9 +236,9 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup) } _wg.Add(1) proc := e.Registry.get(pid) - // deadletter - if we didn't find a process, we will send a deadletter message + // deadletter - if we didn't find a process, we will broadcast a DeadletterEvent if proc == nil { - e.Send(e.deadLetter, &DeadLetterEvent{ + e.BroadcastEvent(DeadLetterEvent{ Target: pid, Message: poisonPill{_wg, graceful}, Sender: nil, @@ -268,8 +261,8 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup) func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) { proc := e.Registry.get(pid) if proc == nil { - // send a deadletter message - e.Send(e.deadLetter, &DeadLetterEvent{ + // broadcast a deadLetter message + e.BroadcastEvent(DeadLetterEvent{ Target: pid, Message: msg, Sender: sender, diff --git a/actor/engine_test.go b/actor/engine_test.go index 58accaa..a440c0c 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -337,3 +337,23 @@ func BenchmarkSendWithSenderMessageLocal(b *testing.B) { e.SendWithSender(pid, pid, pid) } } + +type TestReceiveFunc func(*testing.T, *Context) + +type TestReceiver struct { + OnReceive TestReceiveFunc + t *testing.T +} + +func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer { + return func() Receiver { + return &TestReceiver{ + OnReceive: f, + t: t, + } + } +} + +func (r *TestReceiver) Receive(ctx *Context) { + r.OnReceive(r.t, ctx) +} diff --git a/actor/event_stream.go b/actor/event_stream.go index e2c761f..502a185 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -1,34 +1,59 @@ package actor -// EventSub is the message that will be send to subscribe to the event stream. -type EventSub struct { - pid *PID -} - -// EventUnSub is the message that will be send to unsubscribe from the event stream. -type EventUnsub struct { - pid *PID -} +import ( + "context" + "fmt" + "log/slog" + "reflect" +) type EventStream struct { - subs map[*PID]bool + subs map[*PID]bool + dlsubs map[*PID]bool } func NewEventStream() Producer { return func() Receiver { return &EventStream{ - subs: make(map[*PID]bool), + subs: make(map[*PID]bool), + dlsubs: make(map[*PID]bool), } } } +// Receive for the event stream. All system-wide events are sent here. +// Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events), +// DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent func (e *EventStream) Receive(c *Context) { + fmt.Printf("EventStream.Receive: %v\n", reflect.TypeOf(c.Message())) switch msg := c.Message().(type) { case EventSub: e.subs[msg.pid] = true + fmt.Println("EventStream.Receive: EventSub") case EventUnsub: delete(e.subs, msg.pid) + fmt.Println("EventStream.Receive: EventUnsub") + case DeadletterSub: + e.dlsubs[msg.pid] = true + case DeadletterUnSub: + delete(e.subs, msg.pid) + case DeadLetterEvent: + // to avoid a loop, check that the message isn't a deadletter. + _, ok := msg.Message.(DeadLetterEvent) + if ok { + c.engine.BroadcastEvent(DeadLetterLoopEvent{}) + break + } + for sub := range e.dlsubs { + c.Forward(sub) + } default: + // check if we should log the event, if so, log it with the relevant level, message and attributes + logMsg, ok := c.Message().(eventLog) + if ok { + level, msg, attr := logMsg.log() + slog.Log(context.Background(), level, msg, attr...) + } for sub := range e.subs { c.Forward(sub) } diff --git a/actor/events.go b/actor/events.go new file mode 100644 index 0000000..e08ffc2 --- /dev/null +++ b/actor/events.go @@ -0,0 +1,107 @@ +package actor + +import ( + "log/slog" + "time" +) + +// Here the events are defined. + +// eventLog is an interface that the various Events can choose to implement. If they do, the event stream +// will log these events to slog. +type eventLog interface { + log() (slog.Level, string, []any) +} + +type DeadletterSub struct { + pid *PID +} + +type DeadletterUnSub struct { + pid *PID +} + +// EventSub is the message that will be send to subscribe to the event stream. +type EventSub struct { + pid *PID +} + +// EventUnSub is the message that will be send to unsubscribe from the event stream. +type EventUnsub struct { + pid *PID +} + +// ActorStartedEvent is broadcasted over the EventStream each time +// a Receiver (Actor) is spawned and activated. This means, that at +// the point of receiving this event the Receiver (Actor) is ready +// to process messages. +type ActorStartedEvent struct { + PID *PID + Timestamp time.Time +} + +func (e ActorStartedEvent) log() (slog.Level, string, []any) { + return slog.LevelInfo, "Actor started", []any{"pid", e.PID.GetID()} +} + +// ActorStoppedEvent is broadcasted over the EventStream each time +// a process is terminated. +type ActorStoppedEvent struct { + PID *PID + Timestamp time.Time +} + +func (e ActorStoppedEvent) log() (slog.Level, string, []any) { + return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID.GetID()} +} + +// ActorRestartedEvent is broadcasted when an actor crashes and gets restarted +type ActorRestartedEvent struct { + PID *PID + Timestamp time.Time + Stacktrace []byte + Reason any + Restarts int32 +} + +func (e ActorRestartedEvent) log() (slog.Level, string, []any) { + return slog.LevelError, "Actor crashed and restarted", + []any{"pid", e.PID.GetID(), "stack", string(e.Stacktrace), + "reason", e.Reason, "restarts", e.Restarts} +} + +// ActorMaxRestartsExceededEvent gets created if an actor crashes too many times +type ActorMaxRestartsExceededEvent struct { + PID *PID + Timestamp time.Time +} + +func (e ActorMaxRestartsExceededEvent) log() (slog.Level, string, []any) { + return slog.LevelError, "Actor crashed too many times", []any{"pid", e.PID.GetID()} +} + +// ActorDuplicateIdEvent gets published if we try to register the same name twice. +// Todo: Make a test for this. +type ActorDuplicateIdEvent struct { + PID *PID +} + +func (e ActorDuplicateIdEvent) log() (slog.Level, string, []any) { + return slog.LevelError, "Actor name already claimed", []any{"pid", e.PID.GetID()} +} + +type EngineRemoteMissingEvent struct { + Target *PID + Sender *PID + Message any +} + +func (e EngineRemoteMissingEvent) log() (slog.Level, string, []any) { + return slog.LevelError, "Engine has no remote", []any{"sender", e.Target.GetID()} +} + +type DeadLetterLoopEvent struct{} + +func (e DeadLetterLoopEvent) log() (slog.Level, string, []any) { + return slog.LevelError, "Deadletter loop detected", []any{} +} diff --git a/actor/process.go b/actor/process.go index af176b8..4617636 100644 --- a/actor/process.go +++ b/actor/process.go @@ -125,8 +125,7 @@ func (p *process) Start() { p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) - p.context.engine.BroadcastEvent(ActorStartedEvent{PID: p.pid}) - slog.Debug("actor started", "pid", p.pid) + p.context.engine.BroadcastEvent(ActorStartedEvent{PID: p.pid, Timestamp: time.Now()}) // If we have messages in our buffer, invoke them. if len(p.mbuffer) > 0 { p.Invoke(p.mbuffer) @@ -146,25 +145,28 @@ func (p *process) tryRestart(v any) { p.Start() return } - - fmt.Println(string(debug.Stack())) + stackTrace := debug.Stack() + fmt.Println(string(stackTrace)) // If we reach the max restarts, we shutdown the inbox and clean // everything up. if p.restarts == p.MaxRestarts { - slog.Error("max restarts exceeded, shutting down...", - "pid", p.pid, "restarts", p.restarts) + p.context.engine.BroadcastEvent(ActorMaxRestartsExceededEvent{ + PID: p.pid, + Timestamp: time.Now(), + }) p.cleanup(nil) return } p.restarts++ // Restart the process after its restartDelay - slog.Error("actor restarting", - "n", p.restarts, - "maxRestarts", p.MaxRestarts, - "pid", p.pid, - "reason", v, - ) + p.context.engine.BroadcastEvent(ActorRestartedEvent{ + PID: p.pid, + Timestamp: time.Now(), + Stacktrace: stackTrace, + Reason: v, + Restarts: p.restarts, + }) time.Sleep(p.Opts.RestartDelay) p.Start() } @@ -193,8 +195,7 @@ func (p *process) cleanup(wg *sync.WaitGroup) { proc.Shutdown(wg) } } - slog.Debug("shutdown", "pid", p.pid) - p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid}) + p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()}) if wg != nil { wg.Done() } diff --git a/actor/registry.go b/actor/registry.go index 04f9b1e..62da219 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -1,7 +1,6 @@ package actor import ( - "log/slog" "sync" ) @@ -53,9 +52,7 @@ func (r *Registry) add(proc Processer) { defer r.mu.Unlock() id := proc.PID().ID if _, ok := r.lookup[id]; ok { - slog.Warn("process already registered", - "pid", proc.PID(), - ) + r.engine.BroadcastEvent(ActorDuplicateIdEvent{PID: proc.PID()}) return } r.lookup[id] = proc diff --git a/actor/thelpers.go b/actor/thelpers.go deleted file mode 100644 index 3222423..0000000 --- a/actor/thelpers.go +++ /dev/null @@ -1,25 +0,0 @@ -package actor - -import "testing" - -// Todo: move this to the test files. It's not used anywhere else. - -type TestReceiveFunc func(*testing.T, *Context) - -type TestReceiver struct { - OnReceive TestReceiveFunc - t *testing.T -} - -func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer { - return func() Receiver { - return &TestReceiver{ - OnReceive: f, - t: t, - } - } -} - -func (r *TestReceiver) Receive(ctx *Context) { - r.OnReceive(r.t, ctx) -} diff --git a/actor/types.go b/actor/types.go index 4a3126b..259a014 100644 --- a/actor/types.go +++ b/actor/types.go @@ -2,28 +2,6 @@ package actor import "sync" -// DeadLetterEvent is broadcasted over the EventStream each time -// a message cannot be delivered to the target PID. -type DeadLetterEvent struct { - Target *PID - Message any - Sender *PID -} - -// ActorStartedEvent is broadcasted over the EventStream each time -// a Receiver (Actor) is spawned and activated. This means, that at -// the point of receiving this event the Receiver (Actor) is ready -// to process messages. -type ActorStartedEvent struct { - PID *PID -} - -// ActorStoppedEvent is broadcasted over the EventStream each time -// a process is terminated. -type ActorStoppedEvent struct { - PID *PID -} - type InternalError struct { From string Err error From c93d7e99e231aa441f00ff00d5a4a4ef71c6f25d Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 14:15:35 +0100 Subject: [PATCH 2/7] weird --- actor/deadletter_test.go | 5 +++-- actor/engine.go | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index d635f24..7d47fd2 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -65,7 +65,8 @@ func TestDeadLetterCustom(t *testing.T) { assert.Nil(t, err) // no error from the request assert.NotNil(t, resp) // we should get a response to our request respDeadLetters, ok := resp.([]DeadLetterEvent) - assert.True(t, ok) // got a slice of deadletter events + assert.True(t, ok) // got a slice of deadletter events + return assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event ev, ok := respDeadLetters[0].Message.(testMessage) assert.True(t, ok) // should be our test message @@ -107,7 +108,7 @@ func (c *customDeadLetter) Receive(ctx *Context) { switch ctx.Message().(type) { case Started: slog.Debug("custom deadletter starting", "action", "subscribing") - ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid}) + ctx.Engine().BroadcastEvent(DeadletterSub{pid: ctx.pid}) time.Sleep(time.Millisecond * 10) case Stopped: slog.Debug("custom deadletter stopping", "action", "unsubscribing") diff --git a/actor/engine.go b/actor/engine.go index d7a55ad..f19d7bf 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -1,6 +1,7 @@ package actor import ( + "fmt" "sync" "time" ) @@ -153,6 +154,8 @@ func (e *Engine) Send(pid *PID, msg any) { func (e *Engine) BroadcastEvent(msg any) { if e.eventStream != nil { e.send(e.eventStream, msg, nil) + } else { + fmt.Println("Brain damage: event stream is nil") } } From c5433f44f84ec91c7d92d6afa93ac523fb1e6319 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 17:52:43 +0100 Subject: [PATCH 3/7] cleanup. --- actor/deadletter.go | 2 -- actor/deadletter_test.go | 26 ++++++++++------------ actor/engine.go | 37 +------------------------------ actor/event_stream.go | 7 ++++-- examples/middleware/hooks/main.go | 2 +- 5 files changed, 19 insertions(+), 55 deletions(-) diff --git a/actor/deadletter.go b/actor/deadletter.go index 44e9859..b0c3169 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -28,12 +28,10 @@ func newDeadLetter() Receiver { func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: - // Subscribe to deadletters ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid}) case Stopped: ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid}) case Initialized: - slog.Debug("default deadletter actor initializing") case DeadLetterEvent: slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg), "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 7d47fd2..30d5164 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -14,8 +14,8 @@ import ( ) // TestDeadLetterDefault tests the default deadletter handling. -// It will spawn a new actor, kill it, send a message to it and then check if the deadletter -// received the message. +// It will spawn a new actor, kill it, send a message to it and then check if there is a message +// logged to the default logger func TestDeadLetterDefault(t *testing.T) { logBuffer := SafeBuffer{} logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -25,8 +25,6 @@ func TestDeadLetterDefault(t *testing.T) { time.Sleep(10 * time.Millisecond) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) - dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) // should be registered by default e.Poison(a1).Wait() // poison the a1 actor e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue time.Sleep(time.Millisecond * 100) // a flush would be nice here @@ -45,14 +43,12 @@ func TestDeadLetterDefault(t *testing.T) { func TestDeadLetterCustom(t *testing.T) { debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) slog.SetDefault(debuglogger) - e, err := NewEngine( - EngineOptDeadletter(newCustomDeadLetter)) + e, err := NewEngine() assert.NoError(t, err) - time.Sleep(10 * time.Millisecond) + dl := e.Spawn(newCustomDeadLetter, "deadletter") + assert.NotNil(t, dl) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) - dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) es := e.Registry.getByID("eventstream") assert.NotNil(t, es) @@ -61,13 +57,15 @@ func TestDeadLetterCustom(t *testing.T) { fmt.Println("==== sending message via a1 to deadletter ====") e.Send(a1, testMessage{"bar"}) time.Sleep(time.Millisecond * 100) // a flush would be nice here :-) - resp, err := e.Request(dl.PID(), customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() + resp, err := e.Request(dl, customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() assert.Nil(t, err) // no error from the request assert.NotNil(t, resp) // we should get a response to our request respDeadLetters, ok := resp.([]DeadLetterEvent) assert.True(t, ok) // got a slice of deadletter events - return - assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event + // stop the tests if we don't have any deadletters + if len(respDeadLetters) != 1 { + t.Fatal("expected 1 deadletters, got", len(respDeadLetters)) + } ev, ok := respDeadLetters[0].Message.(testMessage) assert.True(t, ok) // should be our test message assert.Equal(t, "bar", ev.data) @@ -103,12 +101,12 @@ func newCustomDeadLetter() Receiver { func (c *customDeadLetter) Receive(ctx *Context) { es := ctx.engine.Registry.getByID("eventstream") if es == nil { - slog.Error("custom deadletter; no eventstream found") + fmt.Println("custom deadletter; no eventstream found") } switch ctx.Message().(type) { case Started: slog.Debug("custom deadletter starting", "action", "subscribing") - ctx.Engine().BroadcastEvent(DeadletterSub{pid: ctx.pid}) + ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid}) time.Sleep(time.Millisecond * 10) case Stopped: slog.Debug("custom deadletter stopping", "action", "unsubscribing") diff --git a/actor/engine.go b/actor/engine.go index f19d7bf..a4aa1a2 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -1,7 +1,6 @@ package actor import ( - "fmt" "sync" "time" ) @@ -22,11 +21,9 @@ type Receiver interface { // Engine represents the actor engine. type Engine struct { - Registry *Registry - + Registry *Registry address string remote Remoter - deadLetter *PID eventStream *PID initErrors []error } @@ -48,12 +45,7 @@ func NewEngine(opts ...func(*Engine)) (*Engine, error) { if e.remote != nil { e.address = e.remote.Address() } - e.eventStream = e.Spawn(NewEventStream(), "eventstream") - // if no deadletter is registered, we will register the default deadletter from deadletter.go - if e.deadLetter == nil { - e.deadLetter = e.Spawn(newDeadLetter, "deadletter") - } return e, nil } @@ -67,31 +59,6 @@ func EngineOptRemote(r Remoter) func(*Engine) { } } -// TODO: Doc -// Todo: make the pid separator a struct variable -func EngineOptPidSeparator(sep string) func(*Engine) { - // This looks weird because the separator is a global variable. - return func(e *Engine) { - pidSeparator = sep - } -} - -// EngineOptDeadletter takes an actor and configures the engine to use it for dead letter handling -// This allows you to customize how deadletters are handled. -func EngineOptDeadletter(d Producer) func(*Engine) { - return func(e *Engine) { - e.deadLetter = e.Spawn(d, "deadletter") - } -} - -// WithRemote returns a new actor Engine with the given Remoter, -// and will call its Start function -func (e *Engine) WithRemote(r Remoter) { - e.remote = r - e.address = r.Address() - r.Start(e) -} - // Spawn spawns a process that will producer by the given Producer and // can be configured with the given opts. func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID { @@ -154,8 +121,6 @@ func (e *Engine) Send(pid *PID, msg any) { func (e *Engine) BroadcastEvent(msg any) { if e.eventStream != nil { e.send(e.eventStream, msg, nil) - } else { - fmt.Println("Brain damage: event stream is nil") } } diff --git a/actor/event_stream.go b/actor/event_stream.go index 502a185..3707eb7 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "reflect" ) type EventStream struct { @@ -25,7 +24,6 @@ func NewEventStream() Producer { // Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events), // DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent func (e *EventStream) Receive(c *Context) { - fmt.Printf("EventStream.Receive: %v\n", reflect.TypeOf(c.Message())) switch msg := c.Message().(type) { case EventSub: e.subs[msg.pid] = true @@ -44,6 +42,11 @@ func (e *EventStream) Receive(c *Context) { c.engine.BroadcastEvent(DeadLetterLoopEvent{}) break } + if len(e.dlsubs) == 0 { + slog.Warn("deadletter arrived, but no subscribers", + "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) + break + } for sub := range e.dlsubs { c.Forward(sub) } diff --git a/examples/middleware/hooks/main.go b/examples/middleware/hooks/main.go index 92345e4..f69f877 100644 --- a/examples/middleware/hooks/main.go +++ b/examples/middleware/hooks/main.go @@ -44,7 +44,7 @@ func WithHooks() func(actor.ReceiveFunc) actor.ReceiveFunc { func main() { // Create a new engine - e, err := actor.NewEngine(actor.EngineOptPidSeparator("→")) + e, err := actor.NewEngine() if err != nil { panic(err) } From c87a2d1e956242ef17ed5ca48e1f255e32e40413 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 18:37:13 +0100 Subject: [PATCH 4/7] cleanup the deadletter tests. --- actor/deadletter_test.go | 131 +++++++-------------------------------- 1 file changed, 23 insertions(+), 108 deletions(-) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 30d5164..6738ea4 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -2,15 +2,11 @@ package actor import ( "bytes" - "fmt" + "github.com/stretchr/testify/assert" "log/slog" - "os" - "reflect" "sync" "testing" "time" - - "github.com/stretchr/testify/assert" ) // TestDeadLetterDefault tests the default deadletter handling. @@ -22,118 +18,32 @@ func TestDeadLetterDefault(t *testing.T) { slog.SetDefault(logger) e, err := NewEngine() assert.NoError(t, err) - time.Sleep(10 * time.Millisecond) - a1 := e.Spawn(newTestActor, "a1") - assert.NotNil(t, a1) - e.Poison(a1).Wait() // poison the a1 actor - e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue - time.Sleep(time.Millisecond * 100) // a flush would be nice here - + e.Send(invalidPid(), "bar") // should end up the deadletter queue + time.Sleep(time.Millisecond) // wait for the deadletter to be processed // check the log buffer for the deadletter - logStr := logBuffer.String() - fmt.Println(logStr) - assert.Contains(t, logStr, "deadletter arrived") - + assert.Contains(t, logBuffer.String(), "deadletter arrived") } // TestDeadLetterCustom tests the custom deadletter handling. -// It will spawn a new actor, kill it, send a message to it and then check if the deadletter -// received the message. -// It is using the custom deadletter receiver below. +// It is using the custom deadletter receiver defined inline. func TestDeadLetterCustom(t *testing.T) { - debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - slog.SetDefault(debuglogger) e, err := NewEngine() assert.NoError(t, err) - dl := e.Spawn(newCustomDeadLetter, "deadletter") - assert.NotNil(t, dl) - a1 := e.Spawn(newTestActor, "a1") - assert.NotNil(t, a1) - es := e.Registry.getByID("eventstream") - assert.NotNil(t, es) - - // kill a1 actor. - e.Poison(a1).Wait() // poison the a1 actor - fmt.Println("==== sending message via a1 to deadletter ====") - e.Send(a1, testMessage{"bar"}) - time.Sleep(time.Millisecond * 100) // a flush would be nice here :-) - resp, err := e.Request(dl, customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() - assert.Nil(t, err) // no error from the request - assert.NotNil(t, resp) // we should get a response to our request - respDeadLetters, ok := resp.([]DeadLetterEvent) - assert.True(t, ok) // got a slice of deadletter events - // stop the tests if we don't have any deadletters - if len(respDeadLetters) != 1 { - t.Fatal("expected 1 deadletters, got", len(respDeadLetters)) - } - ev, ok := respDeadLetters[0].Message.(testMessage) - assert.True(t, ok) // should be our test message - assert.Equal(t, "bar", ev.data) -} - -type testActor struct{} -type testMessage struct { - data string -} - -func newTestActor() Receiver { - return testActor{} -} -func (t testActor) Receive(_ *Context) { - // do nothing -} - -type customDeadLetterFetch struct{ flush bool } - -// customDeadLetter is a custom deadletter actor / receiver -type customDeadLetter struct { - deadLetters []DeadLetterEvent -} - -func newCustomDeadLetter() Receiver { - return &customDeadLetter{ - deadLetters: make([]DeadLetterEvent, 0), - } -} - -// Receive implements the Receiver interface. This is a OK example of an actor that -// that deals with deadletters. It will store the deadletters in a slice. -func (c *customDeadLetter) Receive(ctx *Context) { - es := ctx.engine.Registry.getByID("eventstream") - if es == nil { - fmt.Println("custom deadletter; no eventstream found") - } - switch ctx.Message().(type) { - case Started: - slog.Debug("custom deadletter starting", "action", "subscribing") - ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid}) - time.Sleep(time.Millisecond * 10) - case Stopped: - slog.Debug("custom deadletter stopping", "action", "unsubscribing") - ctx.engine.BroadcastEvent(DeadletterUnSub{pid: ctx.pid}) - case customDeadLetterFetch: - flush := ctx.Message().(customDeadLetterFetch).flush - slog.Debug("custom deadletter; received fetch request", - "flush", flush, - "messages", len(c.deadLetters)) - ctx.Respond(c.deadLetters) - if ctx.Message().(customDeadLetterFetch).flush { - c.deadLetters = c.deadLetters[:0] - } - case DeadLetterEvent: - slog.Warn("custom deadletter; received deadletter event") - msg, ok := ctx.Message().(DeadLetterEvent) - if !ok { - slog.Error("should never happen. brain damaged.") - return + wg := &sync.WaitGroup{} + wg.Add(1) + e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Initialized: + c.engine.BroadcastEvent(DeadletterSub{c.pid}) + case DeadLetterEvent: + wg.Done() } - c.deadLetters = append(c.deadLetters, msg) - default: - slog.Error("custom deadletter; received unknown message", - "msg", ctx.Message(), "msg-type", reflect.TypeOf(ctx.Message())) - } + }, "deadletter") + e.SendLocal(invalidPid(), "bar", nil) + wg.Wait() } +// SafeBuffer is a threadsafe buffer, used for testing the that the deadletters are logged. type SafeBuffer struct { buf bytes.Buffer mu sync.Mutex @@ -151,4 +61,9 @@ func (sb *SafeBuffer) String() string { return sb.buf.String() } -// Usage in goroutines... +func invalidPid() *PID { + return &PID{ + Address: "local", + ID: "squirrel", + } +} From 7f1b362341fd435d684cf507937773bdad689116 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 18:49:35 +0100 Subject: [PATCH 5/7] simplify dead letter handling even more. --- actor/deadletter.go | 4 ++-- actor/deadletter_test.go | 2 +- actor/event_stream.go | 16 +++++----------- actor/events.go | 8 -------- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/actor/deadletter.go b/actor/deadletter.go index b0c3169..5e68264 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -28,9 +28,9 @@ func newDeadLetter() Receiver { func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: - ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid}) + ctx.engine.BroadcastEvent(EventSub{pid: d.pid}) case Stopped: - ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid}) + ctx.engine.BroadcastEvent(EventUnsub{pid: d.pid}) case Initialized: case DeadLetterEvent: slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg), diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 6738ea4..1c6e72e 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -34,7 +34,7 @@ func TestDeadLetterCustom(t *testing.T) { e.SpawnFunc(func(c *Context) { switch c.Message().(type) { case Initialized: - c.engine.BroadcastEvent(DeadletterSub{c.pid}) + c.engine.BroadcastEvent(EventSub{c.pid}) case DeadLetterEvent: wg.Done() } diff --git a/actor/event_stream.go b/actor/event_stream.go index 3707eb7..97243ad 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -7,15 +7,13 @@ import ( ) type EventStream struct { - subs map[*PID]bool - dlsubs map[*PID]bool + subs map[*PID]bool } func NewEventStream() Producer { return func() Receiver { return &EventStream{ - subs: make(map[*PID]bool), - dlsubs: make(map[*PID]bool), + subs: make(map[*PID]bool), } } } @@ -31,10 +29,6 @@ func (e *EventStream) Receive(c *Context) { case EventUnsub: delete(e.subs, msg.pid) fmt.Println("EventStream.Receive: EventUnsub") - case DeadletterSub: - e.dlsubs[msg.pid] = true - case DeadletterUnSub: - delete(e.subs, msg.pid) case DeadLetterEvent: // to avoid a loop, check that the message isn't a deadletter. _, ok := msg.Message.(DeadLetterEvent) @@ -42,12 +36,12 @@ func (e *EventStream) Receive(c *Context) { c.engine.BroadcastEvent(DeadLetterLoopEvent{}) break } - if len(e.dlsubs) == 0 { - slog.Warn("deadletter arrived, but no subscribers", + if len(e.subs) == 0 { + slog.Warn("deadletter arrived, but no subscribers to event stream", "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) break } - for sub := range e.dlsubs { + for sub := range e.subs { c.Forward(sub) } default: diff --git a/actor/events.go b/actor/events.go index e08ffc2..ace3bea 100644 --- a/actor/events.go +++ b/actor/events.go @@ -13,14 +13,6 @@ type eventLog interface { log() (slog.Level, string, []any) } -type DeadletterSub struct { - pid *PID -} - -type DeadletterUnSub struct { - pid *PID -} - // EventSub is the message that will be send to subscribe to the event stream. type EventSub struct { pid *PID From 1c43960c8ed0b30348997c3aa2a4e58a755b9ec9 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 18:55:53 +0100 Subject: [PATCH 6/7] simplify the event stream handling. remove special handling of deadletters. --- actor/event_stream.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/actor/event_stream.go b/actor/event_stream.go index 97243ad..50ccef5 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -2,7 +2,6 @@ package actor import ( "context" - "fmt" "log/slog" ) @@ -25,25 +24,8 @@ func (e *EventStream) Receive(c *Context) { switch msg := c.Message().(type) { case EventSub: e.subs[msg.pid] = true - fmt.Println("EventStream.Receive: EventSub") case EventUnsub: delete(e.subs, msg.pid) - fmt.Println("EventStream.Receive: EventUnsub") - case DeadLetterEvent: - // to avoid a loop, check that the message isn't a deadletter. - _, ok := msg.Message.(DeadLetterEvent) - if ok { - c.engine.BroadcastEvent(DeadLetterLoopEvent{}) - break - } - if len(e.subs) == 0 { - slog.Warn("deadletter arrived, but no subscribers to event stream", - "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) - break - } - for sub := range e.subs { - c.Forward(sub) - } default: // check if we should log the event, if so, log it with the relevant level, message and attributes logMsg, ok := c.Message().(eventLog) From ba84b41c5f975f7c408cfcf810ac66f3d51a701f Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 18:57:04 +0100 Subject: [PATCH 7/7] we cannot test the default deadletter handling as there is no handling of it in the default. --- actor/deadletter_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 1c6e72e..71e1332 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -3,27 +3,10 @@ package actor import ( "bytes" "github.com/stretchr/testify/assert" - "log/slog" "sync" "testing" - "time" ) -// TestDeadLetterDefault tests the default deadletter handling. -// It will spawn a new actor, kill it, send a message to it and then check if there is a message -// logged to the default logger -func TestDeadLetterDefault(t *testing.T) { - logBuffer := SafeBuffer{} - logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug})) - slog.SetDefault(logger) - e, err := NewEngine() - assert.NoError(t, err) - e.Send(invalidPid(), "bar") // should end up the deadletter queue - time.Sleep(time.Millisecond) // wait for the deadletter to be processed - // check the log buffer for the deadletter - assert.Contains(t, logBuffer.String(), "deadletter arrived") -} - // TestDeadLetterCustom tests the custom deadletter handling. // It is using the custom deadletter receiver defined inline. func TestDeadLetterCustom(t *testing.T) {