diff --git a/actor/deadletter.go b/actor/deadletter.go index 0905eb9..068430c 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -2,41 +2,51 @@ package actor import ( "github.com/anthdm/hollywood/log" - "sync" ) -// TODO: The deadLetter is implemented as a plain Processer, but -// can actually be implemented as a Receiver. This is a good first issue. +// type deadLetter struct { logger log.Logger pid *PID - msgs []any + msgs []*DeadLetterEvent } -func newDeadLetter(logger log.Logger) Receiver { +func newDeadLetter() Receiver { + pid := NewPID(LocalLookupAddr, "deadLetter") + msgs := make([]*DeadLetterEvent, 0) return &deadLetter{ - msgs: make([]any, 0), - logger: logger.SubLogger("[deadLetter]"), - pid: NewPID(LocalLookupAddr, "deadLetter"), + msgs: msgs, + pid: pid, } } func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { - case *DeadLetterEvent: - d.msgs = append(d.msgs, msg) + case Started: + // intialize logger on deadletter startup. this should be sanity checked + d.logger = ctx.Engine().logger.SubLogger("[deadletter]") + d.logger.Debugw("default deadletter actor started") + case Stopped: + d.logger.Debugw("default deadletter actor stopped") + case Initialized: + d.logger.Debugw("default deadletter actor initialized") case *DeadLetterFlush: - d.msgs = make([]any, 0) + d.logger.Debugw("deadletter queue flushed", "msgs", len(d.msgs), "sender", ctx.Sender()) + d.msgs = make([]*DeadLetterEvent, 0) case *DeadLetterFetch: - ctx.Respond(d.msgs) + d.logger.Debugw("deadletter fetch", "msgs", len(d.msgs), "sender", ctx.Sender(), "flush", msg.Flush) + ctx.Respond(d.msgs) // this is a sync request. if msg.Flush { - d.msgs = make([]any, 0) + d.msgs = d.msgs[:0] } + default: + d.logger.Warnw("deadletter arrived", "msg", msg, "sender", ctx.Sender()) + dl := DeadLetterEvent{ + Target: nil, // todo: how to get the target? + Message: msg, + Sender: ctx.Sender(), + } + d.msgs = append(d.msgs, &dl) } } - -func (d *deadLetter) PID() *PID { return d.pid } -func (d *deadLetter) Shutdown(_ *sync.WaitGroup) {} -func (d *deadLetter) Start() {} -func (d *deadLetter) Invoke([]Envelope) {} diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go new file mode 100644 index 0000000..9cd20bf --- /dev/null +++ b/actor/deadletter_test.go @@ -0,0 +1,46 @@ +package actor + +import ( + "fmt" + "github.com/anthdm/hollywood/log" + "github.com/stretchr/testify/assert" + "log/slog" + "os" + "testing" + "time" +) + +type testActor struct { +} +type testMessage struct { + data string +} + +func newTestActor() Receiver { + return testActor{} +} +func (t testActor) Receive(ctx *Context) { + // do nothing +} + +func TestDeadLetterDefault(t *testing.T) { + lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) + e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh))) + a1 := e.Spawn(newTestActor, "a1") + assert.NotNil(t, a1) + dl := e.Registry.getByID("deadletter") + assert.NotNil(t, dl) // should be registered by default + // kill a1 actor. + e.Poison(a1).Wait() // poison the a1 actor + // should be in deadletter + fmt.Println("==== sending message via a1 to deadletter ====") + e.Send(a1, testMessage{"bar"}) + time.Sleep(time.Millisecond * 50) + resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*1000).Result() + assert.Nil(t, err) + assert.NotNil(t, resp) + resp2, ok := resp.([]*DeadLetterEvent) + assert.True(t, ok) + assert.Equal(t, 1, len(resp2)) + +} diff --git a/actor/engine.go b/actor/engine.go index 534aa70..bb3481f 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -28,7 +28,7 @@ type Engine struct { address string remote Remoter - deadLetter Receiver + deadLetter *PID logger log.Logger } @@ -42,7 +42,11 @@ func NewEngine(opts ...func(*Engine)) *Engine { e.EventStream = NewEventStream(e.logger) e.address = LocalLookupAddr e.Registry = newRegistry(e) - e.deadLetter = newDeadLetter(e.logger) + // if no deadletter is registered, we will register the default deadletter from deadletter.go + if e.deadLetter == nil { + e.logger.Debugw("no deadletter receiver set, registering default") + e.deadLetter = e.Spawn(newDeadLetter, "deadletter") + } return e } @@ -59,6 +63,12 @@ func EngineOptPidSeparator(sep string) func(*Engine) { } } +func EngineOptDeadletter(deadletter *PID) func(*Engine) { + return func(e *Engine) { + e.deadLetter = deadletter + } +} + // WithRemote returns a new actor Engine with the given Remoter, // and will call its Start function func (e *Engine) WithRemote(r Remoter) { @@ -199,11 +209,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup { return _wg } +// SendLocal will send the given message to the given PID. If the recipient is not found in the +// registry, the message will be sent to the DeadLetter process instead. If there is no deadletter +// process registered, the function will panic. func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) { proc := e.Registry.get(pid) if proc != nil { proc.Send(pid, msg, sender) + return } + panic("no way to handle message (didn't find deadletter)") } func (e *Engine) isLocalMessage(pid *PID) bool { diff --git a/actor/opts.go b/actor/opts.go index 0249ebd..d604066 100644 --- a/actor/opts.go +++ b/actor/opts.go @@ -1,6 +1,8 @@ package actor -import "time" +import ( + "time" +) const ( defaultInboxSize = 1024 diff --git a/actor/pid.go b/actor/pid.go index 143ad11..f1dc6e6 100644 --- a/actor/pid.go +++ b/actor/pid.go @@ -38,7 +38,7 @@ func (pid *PID) Child(id string, tags ...string) *PID { } func (pid *PID) HasTag(tag string) bool { - panic("TODO") + return strings.Contains(pid.ID, pidSeparator+tag+pidSeparator) } func (pid *PID) LookupKey() uint64 { diff --git a/actor/process.go b/actor/process.go index e242926..8aeba20 100644 --- a/actor/process.go +++ b/actor/process.go @@ -112,7 +112,7 @@ func (p *process) Start() { p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) p.context.engine.EventStream.Publish(&ActivationEvent{PID: p.pid}) - p.logger.Debugw("started", "pid", p.pid) + p.logger.Debugw("actor started", "pid", p.pid) // If we have messages in our buffer, invoke them. if len(p.mbuffer) > 0 { p.Invoke(p.mbuffer) diff --git a/actor/registry.go b/actor/registry.go index 482f5cb..56a1c4a 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -35,8 +35,10 @@ func (r *Registry) get(pid *PID) Processer { if proc, ok := r.lookup[pid.ID]; ok { return proc } - - return r.engine.deadLetter + if proc, ok := r.lookup["deadletter"]; ok { + return proc + } + panic("no deadletter registered") } func (r *Registry) getByID(id string) Processer { diff --git a/remote/remote.go b/remote/remote.go index 37dc8d9..b2ef0b7 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -41,13 +41,19 @@ func (r *Remote) Start() { } mux := drpcmux.New() - DRPCRegisterRemote(mux, r.streamReader) + err = DRPCRegisterRemote(mux, r.streamReader) + if err != nil { + r.logger.Errorw("failed to register remote", "err", err) + } s := drpcserver.New(mux) r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine, r.logger), "router", actor.WithInboxSize(1024*1024)) r.logger.Infow("server started", "listenAddr", r.config.ListenAddr) ctx := context.Background() - go s.Serve(ctx, ln) + go func() { + err := s.Serve(ctx, ln) + r.logger.Errorw("drpcserver stopped", "err", err) + }() } // Send sends the given message to the process with the given pid over the network.