diff --git a/actor/deadletter.go b/actor/deadletter.go index b54d3dd..bb8b3c7 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -2,6 +2,7 @@ package actor import ( "github.com/anthdm/hollywood/log" + "reflect" ) // @@ -22,28 +23,19 @@ func newDeadLetter() Receiver { } // Receive implements the Receiver interface, handling the deadletter messages. -// Todo: this will grow and grow. Maybe we want a limit on this? func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: - // intialize logger on deadletter startup. this should be sanity checked + // intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise. d.logger = ctx.Engine().logger.SubLogger("[deadletter]") d.logger.Debugw("default deadletter actor started") case Stopped: d.logger.Debugw("default deadletter actor stopped") case Initialized: d.logger.Debugw("default deadletter actor initialized") - case *DeadLetterFlush: - d.logger.Debugw("deadletter queue flushed", "msgs", len(d.msgs), "sender", ctx.Sender()) - d.msgs = make([]*DeadLetterEvent, 0) - case *DeadLetterFetch: - d.logger.Debugw("deadletter fetch", "msgs", len(d.msgs), "sender", ctx.Sender(), "flush", msg.Flush) - ctx.Respond(d.msgs) // this is a sync request. - if msg.Flush { - d.msgs = d.msgs[:0] - } case *DeadLetterEvent: - d.logger.Warnw("deadletter arrived", "msg", msg, "sender", ctx.Sender()) + d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg), + "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) d.msgs = append(d.msgs, msg) default: d.logger.Errorw("unknown message arrived", "msg", msg) diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index b1d843a..7428041 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -1,6 +1,7 @@ package actor import ( + "bytes" "fmt" "github.com/anthdm/hollywood/log" "github.com/stretchr/testify/assert" @@ -14,7 +15,8 @@ import ( // It will spawn a new actor, kill it, send a message to it and then check if the deadletter // received the message. func TestDeadLetterDefault(t *testing.T) { - lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) + logBuffer := bytes.Buffer{} + lh := log.NewHandler(&logBuffer, log.TextFormat, slog.LevelDebug) e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh))) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) @@ -23,16 +25,9 @@ func TestDeadLetterDefault(t *testing.T) { e.Poison(a1).Wait() // poison the a1 actor e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue time.Sleep(time.Millisecond) // a flush would be nice here - // flushes the deadletter queue, and returns the messages: - resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result() - assert.Nil(t, err) - assert.NotNil(t, resp) - respDeadLetters, ok := resp.([]*DeadLetterEvent) - assert.True(t, ok) // should be a slice of deadletter events - assert.Equal(t, 1, len(respDeadLetters)) // should be one deadletter event - ev, ok := respDeadLetters[0].Message.(testMessage) - assert.True(t, ok) // should be a test message - assert.Equal(t, "bar", ev.data) + + // check the log buffer for the deadletter + assert.Contains(t, logBuffer.String(), "deadletter arrived") } @@ -55,7 +50,7 @@ func TestDeadLetterCustom(t *testing.T) { fmt.Println("==== sending message via a1 to deadletter ====") e.Send(a1, testMessage{"bar"}) time.Sleep(time.Millisecond) // a flush would be nice here :-) - resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*10).Result() + resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() assert.Nil(t, err) // no error from the request assert.NotNil(t, resp) // we should get a response to our request respDeadLetters, ok := resp.([]*DeadLetterEvent) @@ -74,10 +69,12 @@ type testMessage struct { func newTestActor() Receiver { return testActor{} } -func (t testActor) Receive(ctx *Context) { +func (t testActor) Receive(_ *Context) { // do nothing } +type customDeadLetterFetch struct{ flush bool } + // customDeadLetter is a custom deadletter actor / receiver type customDeadLetter struct { deadLetters []*DeadLetterEvent @@ -93,10 +90,11 @@ func newCustomDeadLetter() Receiver { // that deals with deadletters. It will store the deadletters in a slice. func (c *customDeadLetter) Receive(ctx *Context) { switch ctx.Message().(type) { - case *DeadLetterFlush: - c.deadLetters = c.deadLetters[:0] - case *DeadLetterFetch: + case *customDeadLetterFetch: ctx.Respond(c.deadLetters) + if ctx.Message().(*customDeadLetterFetch).flush { + c.deadLetters = make([]*DeadLetterEvent, 0) + } case *DeadLetterEvent: slog.Warn("received deadletter event") msg, ok := ctx.Message().(*DeadLetterEvent) diff --git a/actor/types.go b/actor/types.go index 47e9bbb..e0a964f 100644 --- a/actor/types.go +++ b/actor/types.go @@ -10,13 +10,6 @@ type DeadLetterEvent struct { Sender *PID } -// DeadLetterFlush is used to flush the DeadLetter queue. -type DeadLetterFlush struct{} - -type DeadLetterFetch struct { - Flush bool -} - // ActivationEvent is broadcasted over the EventStream each time // a Receiver is spawned and activated. This mean at the point of // receiving this event the Receiver is ready to process messages.