Skip to content

Commit

Permalink
make deadletter an actor.
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Nov 26, 2023
1 parent e6e3b6c commit 7d3f853
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 27 deletions.
46 changes: 28 additions & 18 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,51 @@ package actor

import (
"github.com/anthdm/hollywood/log"
"sync"
)

// TODO: The deadLetter is implemented as a plain Processer, but
// can actually be implemented as a Receiver. This is a good first issue.
//

type deadLetter struct {
logger log.Logger
pid *PID
msgs []any
msgs []*DeadLetterEvent
}

func newDeadLetter(logger log.Logger) Receiver {
func newDeadLetter() Receiver {
pid := NewPID(LocalLookupAddr, "deadLetter")
msgs := make([]*DeadLetterEvent, 0)
return &deadLetter{
msgs: make([]any, 0),
logger: logger.SubLogger("[deadLetter]"),
pid: NewPID(LocalLookupAddr, "deadLetter"),
msgs: msgs,
pid: pid,
}
}

func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case *DeadLetterEvent:
d.msgs = append(d.msgs, msg)
case Started:
// intialize logger on deadletter startup. this should be sanity checked
d.logger = ctx.Engine().logger.SubLogger("[deadletter]")
d.logger.Debugw("default deadletter actor started")
case Stopped:
d.logger.Debugw("default deadletter actor stopped")
case Initialized:
d.logger.Debugw("default deadletter actor initialized")
case *DeadLetterFlush:
d.msgs = make([]any, 0)
d.logger.Debugw("deadletter queue flushed", "msgs", len(d.msgs), "sender", ctx.Sender())
d.msgs = make([]*DeadLetterEvent, 0)
case *DeadLetterFetch:
ctx.Respond(d.msgs)
d.logger.Debugw("deadletter fetch", "msgs", len(d.msgs), "sender", ctx.Sender(), "flush", msg.Flush)
ctx.Respond(d.msgs) // this is a sync request.
if msg.Flush {
d.msgs = make([]any, 0)
d.msgs = d.msgs[:0]
}
default:
d.logger.Warnw("deadletter arrived", "msg", msg, "sender", ctx.Sender())
dl := DeadLetterEvent{
Target: nil, // todo: how to get the target?
Message: msg,
Sender: ctx.Sender(),
}
d.msgs = append(d.msgs, &dl)
}
}

func (d *deadLetter) PID() *PID { return d.pid }
func (d *deadLetter) Shutdown(_ *sync.WaitGroup) {}
func (d *deadLetter) Start() {}
func (d *deadLetter) Invoke([]Envelope) {}
46 changes: 46 additions & 0 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package actor

import (
"fmt"
"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
"log/slog"
"os"
"testing"
"time"
)

type testActor struct {
}
type testMessage struct {
data string
}

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(ctx *Context) {
// do nothing
}

func TestDeadLetterDefault(t *testing.T) {
lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)
e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
// kill a1 actor.
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond * 50)
resp, err := e.Request(dl.PID(), &DeadLetterFetch{Flush: true}, time.Millisecond*1000).Result()
assert.Nil(t, err)
assert.NotNil(t, resp)
resp2, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok)
assert.Equal(t, 1, len(resp2))

}
19 changes: 17 additions & 2 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Engine struct {

address string
remote Remoter
deadLetter Receiver
deadLetter *PID
logger log.Logger
}

Expand All @@ -42,7 +42,11 @@ func NewEngine(opts ...func(*Engine)) *Engine {
e.EventStream = NewEventStream(e.logger)
e.address = LocalLookupAddr
e.Registry = newRegistry(e)
e.deadLetter = newDeadLetter(e.logger)
// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.logger.Debugw("no deadletter receiver set, registering default")
e.deadLetter = e.Spawn(newDeadLetter, "deadletter")
}
return e
}

Expand All @@ -59,6 +63,12 @@ func EngineOptPidSeparator(sep string) func(*Engine) {
}
}

func EngineOptDeadletter(deadletter *PID) func(*Engine) {
return func(e *Engine) {
e.deadLetter = deadletter
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
Expand Down Expand Up @@ -199,11 +209,16 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return _wg
}

// SendLocal will send the given message to the given PID. If the recipient is not found in the
// registry, the message will be sent to the DeadLetter process instead. If there is no deadletter
// process registered, the function will panic.
func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc := e.Registry.get(pid)
if proc != nil {
proc.Send(pid, msg, sender)
return
}
panic("no way to handle message (didn't find deadletter)")
}

func (e *Engine) isLocalMessage(pid *PID) bool {
Expand Down
4 changes: 3 additions & 1 deletion actor/opts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package actor

import "time"
import (
"time"
)

const (
defaultInboxSize = 1024
Expand Down
2 changes: 1 addition & 1 deletion actor/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (pid *PID) Child(id string, tags ...string) *PID {
}

func (pid *PID) HasTag(tag string) bool {
panic("TODO")
return strings.Contains(pid.ID, pidSeparator+tag+pidSeparator)
}

func (pid *PID) LookupKey() uint64 {
Expand Down
2 changes: 1 addition & 1 deletion actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (p *process) Start() {
p.context.message = Started{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
p.context.engine.EventStream.Publish(&ActivationEvent{PID: p.pid})
p.logger.Debugw("started", "pid", p.pid)
p.logger.Debugw("actor started", "pid", p.pid)
// If we have messages in our buffer, invoke them.
if len(p.mbuffer) > 0 {
p.Invoke(p.mbuffer)
Expand Down
6 changes: 4 additions & 2 deletions actor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func (r *Registry) get(pid *PID) Processer {
if proc, ok := r.lookup[pid.ID]; ok {
return proc
}

return r.engine.deadLetter
if proc, ok := r.lookup["deadletter"]; ok {
return proc
}
panic("no deadletter registered")
}

func (r *Registry) getByID(id string) Processer {
Expand Down
10 changes: 8 additions & 2 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ func (r *Remote) Start() {
}

mux := drpcmux.New()
DRPCRegisterRemote(mux, r.streamReader)
err = DRPCRegisterRemote(mux, r.streamReader)
if err != nil {
r.logger.Errorw("failed to register remote", "err", err)
}
s := drpcserver.New(mux)

r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine, r.logger), "router", actor.WithInboxSize(1024*1024))
r.logger.Infow("server started", "listenAddr", r.config.ListenAddr)
ctx := context.Background()
go s.Serve(ctx, ln)
go func() {
err := s.Serve(ctx, ln)
r.logger.Errorw("drpcserver stopped", "err", err)
}()
}

// Send sends the given message to the process with the given pid over the network.
Expand Down

0 comments on commit 7d3f853

Please sign in to comment.