Skip to content

Commit

Permalink
cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Dec 6, 2023
1 parent c93d7e9 commit c5433f4
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 55 deletions.
2 changes: 0 additions & 2 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ func newDeadLetter() Receiver {
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
// Subscribe to deadletters
ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid})
case Stopped:
ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid})
case Initialized:
slog.Debug("default deadletter actor initializing")
case DeadLetterEvent:
slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
Expand Down
26 changes: 12 additions & 14 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

// TestDeadLetterDefault tests the default deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
// It will spawn a new actor, kill it, send a message to it and then check if there is a message
// logged to the default logger
func TestDeadLetterDefault(t *testing.T) {
logBuffer := SafeBuffer{}
logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug}))
Expand All @@ -25,8 +25,6 @@ func TestDeadLetterDefault(t *testing.T) {
time.Sleep(10 * time.Millisecond)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond * 100) // a flush would be nice here
Expand All @@ -45,14 +43,12 @@ func TestDeadLetterDefault(t *testing.T) {
func TestDeadLetterCustom(t *testing.T) {
debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
slog.SetDefault(debuglogger)
e, err := NewEngine(
EngineOptDeadletter(newCustomDeadLetter))
e, err := NewEngine()
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
dl := e.Spawn(newCustomDeadLetter, "deadletter")
assert.NotNil(t, dl)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl)
es := e.Registry.getByID("eventstream")
assert.NotNil(t, es)

Expand All @@ -61,13 +57,15 @@ func TestDeadLetterCustom(t *testing.T) {
fmt.Println("==== sending message via a1 to deadletter ====")
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond * 100) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
resp, err := e.Request(dl, customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
return
assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event
// stop the tests if we don't have any deadletters
if len(respDeadLetters) != 1 {
t.Fatal("expected 1 deadletters, got", len(respDeadLetters))
}
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be our test message
assert.Equal(t, "bar", ev.data)
Expand Down Expand Up @@ -103,12 +101,12 @@ func newCustomDeadLetter() Receiver {
func (c *customDeadLetter) Receive(ctx *Context) {
es := ctx.engine.Registry.getByID("eventstream")
if es == nil {
slog.Error("custom deadletter; no eventstream found")
fmt.Println("custom deadletter; no eventstream found")
}
switch ctx.Message().(type) {
case Started:
slog.Debug("custom deadletter starting", "action", "subscribing")
ctx.Engine().BroadcastEvent(DeadletterSub{pid: ctx.pid})
ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid})
time.Sleep(time.Millisecond * 10)
case Stopped:
slog.Debug("custom deadletter stopping", "action", "unsubscribing")
Expand Down
37 changes: 1 addition & 36 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package actor

import (
"fmt"
"sync"
"time"
)
Expand All @@ -22,11 +21,9 @@ type Receiver interface {

// Engine represents the actor engine.
type Engine struct {
Registry *Registry

Registry *Registry
address string
remote Remoter
deadLetter *PID
eventStream *PID
initErrors []error
}
Expand All @@ -48,12 +45,7 @@ func NewEngine(opts ...func(*Engine)) (*Engine, error) {
if e.remote != nil {
e.address = e.remote.Address()
}

e.eventStream = e.Spawn(NewEventStream(), "eventstream")
// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.deadLetter = e.Spawn(newDeadLetter, "deadletter")
}
return e, nil
}

Expand All @@ -67,31 +59,6 @@ func EngineOptRemote(r Remoter) func(*Engine) {
}
}

// TODO: Doc
// Todo: make the pid separator a struct variable
func EngineOptPidSeparator(sep string) func(*Engine) {
// This looks weird because the separator is a global variable.
return func(e *Engine) {
pidSeparator = sep
}
}

// EngineOptDeadletter takes an actor and configures the engine to use it for dead letter handling
// This allows you to customize how deadletters are handled.
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
e.remote = r
e.address = r.Address()
r.Start(e)
}

// Spawn spawns a process that will producer by the given Producer and
// can be configured with the given opts.
func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID {
Expand Down Expand Up @@ -154,8 +121,6 @@ func (e *Engine) Send(pid *PID, msg any) {
func (e *Engine) BroadcastEvent(msg any) {
if e.eventStream != nil {
e.send(e.eventStream, msg, nil)
} else {
fmt.Println("Brain damage: event stream is nil")
}
}

Expand Down
7 changes: 5 additions & 2 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"reflect"
)

type EventStream struct {
Expand All @@ -25,7 +24,6 @@ func NewEventStream() Producer {
// Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events),
// DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent
func (e *EventStream) Receive(c *Context) {
fmt.Printf("EventStream.Receive: %v\n", reflect.TypeOf(c.Message()))
switch msg := c.Message().(type) {
case EventSub:
e.subs[msg.pid] = true
Expand All @@ -44,6 +42,11 @@ func (e *EventStream) Receive(c *Context) {
c.engine.BroadcastEvent(DeadLetterLoopEvent{})
break
}
if len(e.dlsubs) == 0 {
slog.Warn("deadletter arrived, but no subscribers",
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
break
}
for sub := range e.dlsubs {
c.Forward(sub)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/middleware/hooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func WithHooks() func(actor.ReceiveFunc) actor.ReceiveFunc {

func main() {
// Create a new engine
e, err := actor.NewEngine(actor.EngineOptPidSeparator("→"))
e, err := actor.NewEngine()
if err != nil {
panic(err)
}
Expand Down

0 comments on commit c5433f4

Please sign in to comment.