Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Reworking the event stream so its also an actor #79

Merged
merged 5 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions _bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
)

func makeRemoteEngine(addr string) *actor.Engine {
e := actor.NewEngine()
r := remote.New(e, remote.Config{ListenAddr: addr})
e.WithRemote(r)
r := remote.New(remote.Config{ListenAddr: addr})
e := actor.NewEngine(actor.EngineOptRemote(r))
return e
}

func benchmarkRemote() {
var (
a = makeRemoteEngine("127.0.0.1:3000")
b = makeRemoteEngine("127.0.0.1:3001")
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
)
its := []int{
1_000_000,
Expand All @@ -39,7 +38,7 @@ func benchmarkRemote() {

func benchmarkLocal() {
e := actor.NewEngine()
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
its := []int{
1_000_000,
10_000_000,
Expand Down
5 changes: 4 additions & 1 deletion actor/deadletter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package actor

import (
"github.com/anthdm/hollywood/log"
fmt "fmt"
"reflect"

"github.com/anthdm/hollywood/log"
)

//
Expand Down Expand Up @@ -34,6 +36,7 @@ func (d *deadLetter) Receive(ctx *Context) {
case Initialized:
d.logger.Debugw("default deadletter actor initialized")
case *DeadLetterEvent:
fmt.Println("received deadletter", msg)
d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
Expand Down
7 changes: 5 additions & 2 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package actor
import (
"bytes"
"fmt"
"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
)

// TestDeadLetterDefault tests the default deadletter handling.
Expand Down Expand Up @@ -49,6 +50,8 @@ func TestDeadLetterCustom(t *testing.T) {
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
fmt.Println(e.Registry)
fmt.Println("ID=> ", dl.PID())
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
Expand Down
74 changes: 52 additions & 22 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package actor

import (
"log/slog"
reflect "reflect"
"sync"
"time"

Expand All @@ -10,7 +12,7 @@ import (
type Remoter interface {
Address() string
Send(*PID, any, *PID)
Start()
Start(*Engine)
}

// Producer is any function that can return a Receiver
Expand All @@ -23,26 +25,29 @@ type Receiver interface {

// Engine represents the actor engine.
type Engine struct {
EventStream *EventStream
Registry *Registry
Registry *Registry

address string
remote Remoter
deadLetter *PID
logger log.Logger
address string
remote Remoter
deadLetter *PID
eventStream *PID
logger log.Logger
}

// NewEngine returns a new actor Engine.
// You can pass an optional logger through
func NewEngine(opts ...func(*Engine)) *Engine {
e := &Engine{}
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.address = LocalLookupAddr
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.EventStream = NewEventStream() //
for _, o := range opts {
o(e)
}
if e.remote != nil {
e.address = e.remote.Address()
}

e.eventStream = e.Spawn(NewEventStream(), "eventstream")
// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.logger.Debugw("no deadletter receiver set, registering default")
Expand All @@ -51,36 +56,38 @@ func NewEngine(opts ...func(*Engine)) *Engine {
return e
}

// TODO: Doc
func EngineOptLogger(logger log.Logger) func(*Engine) {
return func(e *Engine) {
e.logger = logger
// This is a bit hacky, but we need to set the logger for the eventstream
// which cannot be set in the constructor since the logger is not set yet.
e.EventStream.logger = logger.SubLogger("[eventStream]")
}
}

// TODO: Doc
func EngineOptRemote(r Remoter) func(*Engine) {
return func(e *Engine) {
e.remote = r
e.address = r.Address()
// TODO: potential error not handled here
r.Start(e)
}
}

// TODO: Doc
func EngineOptPidSeparator(sep string) func(*Engine) {
// This looks weird because the separator is a global variable.
return func(e *Engine) {
pidSeparator = sep
}
}

// TODO: Doc
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
e.remote = r
e.address = r.Address()
r.Start()
}

// Spawn spawns a process that will producer by the given Producer and
// can be configured with the given opts.
func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID {
Expand Down Expand Up @@ -138,19 +145,32 @@ func (e *Engine) Send(pid *PID, msg any) {
e.send(pid, msg, nil)
}

// BroadcastEvent will broadcast the given message over the eventstream, notifying all
// actors that are subscribed.
func (e *Engine) BroadcastEvent(msg any) {
if e.eventStream != nil {
e.send(e.eventStream, msg, nil)
}
}

func (e *Engine) send(pid *PID, msg any, sender *PID) {
if e.isLocalMessage(pid) {
e.SendLocal(pid, msg, sender)
return
}
if e.remote == nil {
e.logger.Errorw("failed sending messsage",
"err", "engine has no remote configured")
slog.Error("failed sending messsage",
"err", "engine has no remote configured",
"to", pid,
"type", reflect.TypeOf(msg),
"msg", msg,
)
return
}
e.remote.Send(pid, msg, sender)
}

// TODO: documentation
type SendRepeater struct {
engine *Engine
self *PID
Expand Down Expand Up @@ -254,6 +274,16 @@ func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc.Send(pid, msg, sender)
}

// Subscribe will subscribe the given PID to the event stream.
func (e *Engine) Subscribe(pid *PID) {
e.Send(e.eventStream, EventSub{pid: pid})
}

// Unsubscribe will un subscribe the given PID from the event stream.
func (e *Engine) Unsubscribe(pid *PID) {
e.Send(e.eventStream, EventUnsub{pid: pid})
}

func (e *Engine) isLocalMessage(pid *PID) bool {
return e.address == pid.Address
}
Expand Down
78 changes: 22 additions & 56 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,36 @@
package actor

import (
"math"
"math/rand"
"sync"

"github.com/anthdm/hollywood/log"
)

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
id uint32
}

type EventStreamFunc func(event any)

type EventStream struct {
mu sync.RWMutex
subs map[*EventSub]EventStreamFunc
logger log.Logger
pid *PID
}

func NewEventStream() *EventStream {
return &EventStream{
subs: make(map[*EventSub]EventStreamFunc),
}
// EventUnSub is the message that will be send to unsubscribe from the event stream.
type EventUnsub struct {
pid *PID
}

func (e *EventStream) Unsubscribe(sub *EventSub) {
e.mu.Lock()
defer e.mu.Unlock()

delete(e.subs, sub)

e.logger.Debugw("unsubscribe",
"subs", len(e.subs),
"id", sub.id,
)
type EventStream struct {
subs map[*PID]bool
}

func (e *EventStream) Subscribe(f EventStreamFunc) *EventSub {
e.mu.Lock()
defer e.mu.Unlock()

sub := &EventSub{
id: uint32(rand.Intn(math.MaxUint32)),
func NewEventStream() Producer {
return func() Receiver {
return &EventStream{
subs: make(map[*PID]bool),
}
}
e.subs[sub] = f

e.logger.Debugw("subscribe",
"subs", len(e.subs),
"id", sub.id,
)

return sub
}

func (e *EventStream) Publish(msg any) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, f := range e.subs {
go f(msg)
func (e *EventStream) Receive(c *Context) {
switch msg := c.Message().(type) {
case EventSub:
e.subs[msg.pid] = true
case EventUnsub:
delete(e.subs, msg.pid)
default:
for sub := range e.subs {
c.Forward(sub)
}
}
}

func (e *EventStream) Len() int {
e.mu.RLock()
defer e.mu.RUnlock()
return len(e.subs)
}
Loading