From c5433f44f84ec91c7d92d6afa93ac523fb1e6319 Mon Sep 17 00:00:00 2001 From: Per Buer Date: Wed, 6 Dec 2023 17:52:43 +0100 Subject: [PATCH] cleanup. --- actor/deadletter.go | 2 -- actor/deadletter_test.go | 26 ++++++++++------------ actor/engine.go | 37 +------------------------------ actor/event_stream.go | 7 ++++-- examples/middleware/hooks/main.go | 2 +- 5 files changed, 19 insertions(+), 55 deletions(-) diff --git a/actor/deadletter.go b/actor/deadletter.go index 44e9859..b0c3169 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -28,12 +28,10 @@ func newDeadLetter() Receiver { func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: - // Subscribe to deadletters ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid}) case Stopped: ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid}) case Initialized: - slog.Debug("default deadletter actor initializing") case DeadLetterEvent: slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg), "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index 7d47fd2..30d5164 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -14,8 +14,8 @@ import ( ) // TestDeadLetterDefault tests the default deadletter handling. -// It will spawn a new actor, kill it, send a message to it and then check if the deadletter -// received the message. +// It will spawn a new actor, kill it, send a message to it and then check if there is a message +// logged to the default logger func TestDeadLetterDefault(t *testing.T) { logBuffer := SafeBuffer{} logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -25,8 +25,6 @@ func TestDeadLetterDefault(t *testing.T) { time.Sleep(10 * time.Millisecond) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) - dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) // should be registered by default e.Poison(a1).Wait() // poison the a1 actor e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue time.Sleep(time.Millisecond * 100) // a flush would be nice here @@ -45,14 +43,12 @@ func TestDeadLetterDefault(t *testing.T) { func TestDeadLetterCustom(t *testing.T) { debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) slog.SetDefault(debuglogger) - e, err := NewEngine( - EngineOptDeadletter(newCustomDeadLetter)) + e, err := NewEngine() assert.NoError(t, err) - time.Sleep(10 * time.Millisecond) + dl := e.Spawn(newCustomDeadLetter, "deadletter") + assert.NotNil(t, dl) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) - dl := e.Registry.getByID("deadletter") - assert.NotNil(t, dl) es := e.Registry.getByID("eventstream") assert.NotNil(t, es) @@ -61,13 +57,15 @@ func TestDeadLetterCustom(t *testing.T) { fmt.Println("==== sending message via a1 to deadletter ====") e.Send(a1, testMessage{"bar"}) time.Sleep(time.Millisecond * 100) // a flush would be nice here :-) - resp, err := e.Request(dl.PID(), customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() + resp, err := e.Request(dl, customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() assert.Nil(t, err) // no error from the request assert.NotNil(t, resp) // we should get a response to our request respDeadLetters, ok := resp.([]DeadLetterEvent) assert.True(t, ok) // got a slice of deadletter events - return - assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event + // stop the tests if we don't have any deadletters + if len(respDeadLetters) != 1 { + t.Fatal("expected 1 deadletters, got", len(respDeadLetters)) + } ev, ok := respDeadLetters[0].Message.(testMessage) assert.True(t, ok) // should be our test message assert.Equal(t, "bar", ev.data) @@ -103,12 +101,12 @@ func newCustomDeadLetter() Receiver { func (c *customDeadLetter) Receive(ctx *Context) { es := ctx.engine.Registry.getByID("eventstream") if es == nil { - slog.Error("custom deadletter; no eventstream found") + fmt.Println("custom deadletter; no eventstream found") } switch ctx.Message().(type) { case Started: slog.Debug("custom deadletter starting", "action", "subscribing") - ctx.Engine().BroadcastEvent(DeadletterSub{pid: ctx.pid}) + ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid}) time.Sleep(time.Millisecond * 10) case Stopped: slog.Debug("custom deadletter stopping", "action", "unsubscribing") diff --git a/actor/engine.go b/actor/engine.go index f19d7bf..a4aa1a2 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -1,7 +1,6 @@ package actor import ( - "fmt" "sync" "time" ) @@ -22,11 +21,9 @@ type Receiver interface { // Engine represents the actor engine. type Engine struct { - Registry *Registry - + Registry *Registry address string remote Remoter - deadLetter *PID eventStream *PID initErrors []error } @@ -48,12 +45,7 @@ func NewEngine(opts ...func(*Engine)) (*Engine, error) { if e.remote != nil { e.address = e.remote.Address() } - e.eventStream = e.Spawn(NewEventStream(), "eventstream") - // if no deadletter is registered, we will register the default deadletter from deadletter.go - if e.deadLetter == nil { - e.deadLetter = e.Spawn(newDeadLetter, "deadletter") - } return e, nil } @@ -67,31 +59,6 @@ func EngineOptRemote(r Remoter) func(*Engine) { } } -// TODO: Doc -// Todo: make the pid separator a struct variable -func EngineOptPidSeparator(sep string) func(*Engine) { - // This looks weird because the separator is a global variable. - return func(e *Engine) { - pidSeparator = sep - } -} - -// EngineOptDeadletter takes an actor and configures the engine to use it for dead letter handling -// This allows you to customize how deadletters are handled. -func EngineOptDeadletter(d Producer) func(*Engine) { - return func(e *Engine) { - e.deadLetter = e.Spawn(d, "deadletter") - } -} - -// WithRemote returns a new actor Engine with the given Remoter, -// and will call its Start function -func (e *Engine) WithRemote(r Remoter) { - e.remote = r - e.address = r.Address() - r.Start(e) -} - // Spawn spawns a process that will producer by the given Producer and // can be configured with the given opts. func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID { @@ -154,8 +121,6 @@ func (e *Engine) Send(pid *PID, msg any) { func (e *Engine) BroadcastEvent(msg any) { if e.eventStream != nil { e.send(e.eventStream, msg, nil) - } else { - fmt.Println("Brain damage: event stream is nil") } } diff --git a/actor/event_stream.go b/actor/event_stream.go index 502a185..3707eb7 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "reflect" ) type EventStream struct { @@ -25,7 +24,6 @@ func NewEventStream() Producer { // Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events), // DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent func (e *EventStream) Receive(c *Context) { - fmt.Printf("EventStream.Receive: %v\n", reflect.TypeOf(c.Message())) switch msg := c.Message().(type) { case EventSub: e.subs[msg.pid] = true @@ -44,6 +42,11 @@ func (e *EventStream) Receive(c *Context) { c.engine.BroadcastEvent(DeadLetterLoopEvent{}) break } + if len(e.dlsubs) == 0 { + slog.Warn("deadletter arrived, but no subscribers", + "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) + break + } for sub := range e.dlsubs { c.Forward(sub) } diff --git a/examples/middleware/hooks/main.go b/examples/middleware/hooks/main.go index 92345e4..f69f877 100644 --- a/examples/middleware/hooks/main.go +++ b/examples/middleware/hooks/main.go @@ -44,7 +44,7 @@ func WithHooks() func(actor.ReceiveFunc) actor.ReceiveFunc { func main() { // Create a new engine - e, err := actor.NewEngine(actor.EngineOptPidSeparator("→")) + e, err := actor.NewEngine() if err != nil { panic(err) }