Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events #91

Merged
merged 7 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,38 @@ import (
"reflect"
)

//
// DeadLetterEvent is delivered to the deadletter actor when a message can't be delivered to it's recipient
type DeadLetterEvent struct {
Target *PID
Message any
Sender *PID
}

type deadLetter struct {
pid *PID
}

func newDeadLetter() Receiver {
pid := NewPID(LocalLookupAddr, "deadLetter")
pid := NewPID(LocalLookupAddr, "deadletter")
return &deadLetter{
pid: pid,
}
}

// Receive implements the Receiver interface, handling the deadletter messages.
// It will log the deadletter message if a logger is set. If not, it will silently
// ignore the message. Any production system should either have a logger set or provide a custom
// deadletter actor.
// Any production system should provide a custom deadletter handler.
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
// intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise.
slog.Debug("default deadletter actor started")
ctx.engine.BroadcastEvent(EventSub{pid: d.pid})
case Stopped:
slog.Debug("default deadletter actor stopped")
ctx.engine.BroadcastEvent(EventUnsub{pid: d.pid})
case Initialized:
slog.Debug("default deadletter actor initialized")
case *DeadLetterEvent:
case DeadLetterEvent:
slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
slog.Error("unknown message arrived at deadletter", "msg", msg)
slog.Error("unknown message arrived at deadletter", "msg", msg,
"msg-type", reflect.TypeOf(msg))
}
}
120 changes: 21 additions & 99 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,31 @@ package actor

import (
"bytes"
"fmt"
"log/slog"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// TestDeadLetterDefault tests the default deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
func TestDeadLetterDefault(t *testing.T) {
logBuffer := SafeBuffer{}
lh := slog.NewTextHandler(&logBuffer, nil)
logger := slog.New(lh)
slog.SetDefault(logger)
e, err := NewEngine()
assert.NoError(t, err)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond) // a flush would be nice here

// check the log buffer for the deadletter
assert.Contains(t, logBuffer.String(), "deadletter arrived")

}

// TestDeadLetterCustom tests the custom deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
// It is using the custom deadletter receiver below.
// It is using the custom deadletter receiver defined inline.
func TestDeadLetterCustom(t *testing.T) {
e, err := NewEngine(
EngineOptDeadletter(newCustomDeadLetter))
e, err := NewEngine()
assert.NoError(t, err)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
// kill a1 actor.
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
fmt.Println(e.Registry)
fmt.Println("ID=> ", dl.PID())
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be our test message
assert.Equal(t, "bar", ev.data)
}

type testActor struct{}
type testMessage struct {
data string
}

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(_ *Context) {
// do nothing
}

type customDeadLetterFetch struct{ flush bool }

// customDeadLetter is a custom deadletter actor / receiver
type customDeadLetter struct {
deadLetters []*DeadLetterEvent
}

func newCustomDeadLetter() Receiver {
return &customDeadLetter{
deadLetters: make([]*DeadLetterEvent, 0),
}
}

// Receive implements the Receiver interface. This is a OK example of an actor that
// that deals with deadletters. It will store the deadletters in a slice.
func (c *customDeadLetter) Receive(ctx *Context) {
switch ctx.Message().(type) {
case *customDeadLetterFetch:
ctx.Respond(c.deadLetters)
if ctx.Message().(*customDeadLetterFetch).flush {
c.deadLetters = make([]*DeadLetterEvent, 0)
}
case *DeadLetterEvent:
slog.Warn("received deadletter event")
msg, ok := ctx.Message().(*DeadLetterEvent)
if !ok {
slog.Error("failed to cast deadletter event")
return
wg := &sync.WaitGroup{}
wg.Add(1)
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Initialized:
c.engine.BroadcastEvent(EventSub{c.pid})
case DeadLetterEvent:
wg.Done()
}
c.deadLetters = append(c.deadLetters, msg)
}
}, "deadletter")
e.SendLocal(invalidPid(), "bar", nil)
wg.Wait()
}

// SafeBuffer is a threadsafe buffer, used for testing the that the deadletters are logged.
type SafeBuffer struct {
buf bytes.Buffer
mu sync.Mutex
Expand All @@ -127,4 +44,9 @@ func (sb *SafeBuffer) String() string {
return sb.buf.String()
}

// Usage in goroutines...
func invalidPid() *PID {
return &PID{
Address: "local",
ID: "squirrel",
}
}
51 changes: 6 additions & 45 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package actor

import (
"log/slog"
reflect "reflect"
"sync"
"time"
)
Expand All @@ -23,11 +21,9 @@ type Receiver interface {

// Engine represents the actor engine.
type Engine struct {
Registry *Registry

Registry *Registry
address string
remote Remoter
deadLetter *PID
eventStream *PID
initErrors []error
}
Expand All @@ -49,12 +45,7 @@ func NewEngine(opts ...func(*Engine)) (*Engine, error) {
if e.remote != nil {
e.address = e.remote.Address()
}

e.eventStream = e.Spawn(NewEventStream(), "eventstream")
// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.deadLetter = e.Spawn(newDeadLetter, "deadletter")
}
return e, nil
}

Expand All @@ -68,31 +59,6 @@ func EngineOptRemote(r Remoter) func(*Engine) {
}
}

// TODO: Doc
// Todo: make the pid separator a struct variable
func EngineOptPidSeparator(sep string) func(*Engine) {
// This looks weird because the separator is a global variable.
return func(e *Engine) {
pidSeparator = sep
}
}

// EngineOptDeadletter takes an actor and configures the engine to use it for dead letter handling
// This allows you to customize how deadletters are handled.
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
e.remote = r
e.address = r.Address()
r.Start(e)
}

// Spawn spawns a process that will producer by the given Producer and
// can be configured with the given opts.
func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID {
Expand Down Expand Up @@ -164,12 +130,7 @@ func (e *Engine) send(pid *PID, msg any, sender *PID) {
return
}
if e.remote == nil {
slog.Error("failed sending messsage",
"err", "engine has no remote configured",
"to", pid,
"type", reflect.TypeOf(msg),
"msg", msg,
)
e.BroadcastEvent(EngineRemoteMissingEvent{Target: pid, Sender: sender, Message: msg})
return
}
e.remote.Send(pid, msg, sender)
Expand Down Expand Up @@ -243,9 +204,9 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
}
_wg.Add(1)
proc := e.Registry.get(pid)
// deadletter - if we didn't find a process, we will send a deadletter message
// deadletter - if we didn't find a process, we will broadcast a DeadletterEvent
if proc == nil {
e.Send(e.deadLetter, &DeadLetterEvent{
e.BroadcastEvent(DeadLetterEvent{
Target: pid,
Message: poisonPill{_wg, graceful},
Sender: nil,
Expand All @@ -268,8 +229,8 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc := e.Registry.get(pid)
if proc == nil {
// send a deadletter message
e.Send(e.deadLetter, &DeadLetterEvent{
// broadcast a deadLetter message
e.BroadcastEvent(DeadLetterEvent{
Target: pid,
Message: msg,
Sender: sender,
Expand Down
20 changes: 20 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,23 @@ func BenchmarkSendWithSenderMessageLocal(b *testing.B) {
e.SendWithSender(pid, pid, pid)
}
}

type TestReceiveFunc func(*testing.T, *Context)

type TestReceiver struct {
OnReceive TestReceiveFunc
t *testing.T
}

func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer {
return func() Receiver {
return &TestReceiver{
OnReceive: f,
t: t,
}
}
}

func (r *TestReceiver) Receive(ctx *Context) {
r.OnReceive(r.t, ctx)
}
22 changes: 13 additions & 9 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package actor

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
pid *PID
}

// EventUnSub is the message that will be send to unsubscribe from the event stream.
type EventUnsub struct {
pid *PID
}
import (
"context"
"log/slog"
)

type EventStream struct {
subs map[*PID]bool
Expand All @@ -22,13 +17,22 @@ func NewEventStream() Producer {
}
}

// Receive for the event stream. All system-wide events are sent here.
// Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events),
// DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent
func (e *EventStream) Receive(c *Context) {
switch msg := c.Message().(type) {
case EventSub:
e.subs[msg.pid] = true
case EventUnsub:
delete(e.subs, msg.pid)
default:
// check if we should log the event, if so, log it with the relevant level, message and attributes
logMsg, ok := c.Message().(eventLog)
if ok {
level, msg, attr := logMsg.log()
slog.Log(context.Background(), level, msg, attr...)
}
for sub := range e.subs {
c.Forward(sub)
}
Expand Down
Loading