diff --git a/_bench/main.go b/_bench/main.go index fabf001..71d723b 100644 --- a/_bench/main.go +++ b/_bench/main.go @@ -12,9 +12,8 @@ import ( ) func makeRemoteEngine(addr string) *actor.Engine { - e := actor.NewEngine() - r := remote.New(e, remote.Config{ListenAddr: addr}) - e.WithRemote(r) + r := remote.New(remote.Config{ListenAddr: addr}) + e := actor.NewEngine(actor.EngineOptRemote(r)) return e } @@ -22,7 +21,7 @@ func benchmarkRemote() { var ( a = makeRemoteEngine("127.0.0.1:3000") b = makeRemoteEngine("127.0.0.1:3001") - pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8)) + pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0)) ) its := []int{ 1_000_000, @@ -39,7 +38,7 @@ func benchmarkRemote() { func benchmarkLocal() { e := actor.NewEngine() - pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8)) + pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0)) its := []int{ 1_000_000, 10_000_000, diff --git a/actor/deadletter.go b/actor/deadletter.go index 2a1d27a..b5fda1d 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -1,8 +1,10 @@ package actor import ( - "github.com/anthdm/hollywood/log" + fmt "fmt" "reflect" + + "github.com/anthdm/hollywood/log" ) // @@ -34,6 +36,7 @@ func (d *deadLetter) Receive(ctx *Context) { case Initialized: d.logger.Debugw("default deadletter actor initialized") case *DeadLetterEvent: + fmt.Println("received deadletter", msg) d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg), "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) default: diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index da2fba7..7d93382 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -3,13 +3,14 @@ package actor import ( "bytes" "fmt" - "github.com/anthdm/hollywood/log" - "github.com/stretchr/testify/assert" "log/slog" "os" "sync" "testing" "time" + + "github.com/anthdm/hollywood/log" + "github.com/stretchr/testify/assert" ) // TestDeadLetterDefault tests the default deadletter handling. @@ -49,6 +50,8 @@ func TestDeadLetterCustom(t *testing.T) { e.Poison(a1).Wait() // poison the a1 actor // should be in deadletter fmt.Println("==== sending message via a1 to deadletter ====") + fmt.Println(e.Registry) + fmt.Println("ID=> ", dl.PID()) e.Send(a1, testMessage{"bar"}) time.Sleep(time.Millisecond) // a flush would be nice here :-) resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result() diff --git a/actor/engine.go b/actor/engine.go index bce14f1..3d244f4 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -1,6 +1,8 @@ package actor import ( + "log/slog" + reflect "reflect" "sync" "time" @@ -10,7 +12,7 @@ import ( type Remoter interface { Address() string Send(*PID, any, *PID) - Start() + Start(*Engine) } // Producer is any function that can return a Receiver @@ -23,26 +25,29 @@ type Receiver interface { // Engine represents the actor engine. type Engine struct { - EventStream *EventStream - Registry *Registry + Registry *Registry - address string - remote Remoter - deadLetter *PID - logger log.Logger + address string + remote Remoter + deadLetter *PID + eventStream *PID + logger log.Logger } // NewEngine returns a new actor Engine. // You can pass an optional logger through func NewEngine(opts ...func(*Engine)) *Engine { e := &Engine{} + e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter e.address = LocalLookupAddr - e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter - e.EventStream = NewEventStream() // for _, o := range opts { o(e) } + if e.remote != nil { + e.address = e.remote.Address() + } + e.eventStream = e.Spawn(NewEventStream(), "eventstream") // if no deadletter is registered, we will register the default deadletter from deadletter.go if e.deadLetter == nil { e.logger.Debugw("no deadletter receiver set, registering default") @@ -51,15 +56,24 @@ func NewEngine(opts ...func(*Engine)) *Engine { return e } +// TODO: Doc func EngineOptLogger(logger log.Logger) func(*Engine) { return func(e *Engine) { e.logger = logger - // This is a bit hacky, but we need to set the logger for the eventstream - // which cannot be set in the constructor since the logger is not set yet. - e.EventStream.logger = logger.SubLogger("[eventStream]") } } +// TODO: Doc +func EngineOptRemote(r Remoter) func(*Engine) { + return func(e *Engine) { + e.remote = r + e.address = r.Address() + // TODO: potential error not handled here + r.Start(e) + } +} + +// TODO: Doc func EngineOptPidSeparator(sep string) func(*Engine) { // This looks weird because the separator is a global variable. return func(e *Engine) { @@ -67,20 +81,13 @@ func EngineOptPidSeparator(sep string) func(*Engine) { } } +// TODO: Doc func EngineOptDeadletter(d Producer) func(*Engine) { return func(e *Engine) { e.deadLetter = e.Spawn(d, "deadletter") } } -// WithRemote returns a new actor Engine with the given Remoter, -// and will call its Start function -func (e *Engine) WithRemote(r Remoter) { - e.remote = r - e.address = r.Address() - r.Start() -} - // Spawn spawns a process that will producer by the given Producer and // can be configured with the given opts. func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID { @@ -138,19 +145,32 @@ func (e *Engine) Send(pid *PID, msg any) { e.send(pid, msg, nil) } +// BroadcastEvent will broadcast the given message over the eventstream, notifying all +// actors that are subscribed. +func (e *Engine) BroadcastEvent(msg any) { + if e.eventStream != nil { + e.send(e.eventStream, msg, nil) + } +} + func (e *Engine) send(pid *PID, msg any, sender *PID) { if e.isLocalMessage(pid) { e.SendLocal(pid, msg, sender) return } if e.remote == nil { - e.logger.Errorw("failed sending messsage", - "err", "engine has no remote configured") + slog.Error("failed sending messsage", + "err", "engine has no remote configured", + "to", pid, + "type", reflect.TypeOf(msg), + "msg", msg, + ) return } e.remote.Send(pid, msg, sender) } +// TODO: documentation type SendRepeater struct { engine *Engine self *PID @@ -254,6 +274,16 @@ func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) { proc.Send(pid, msg, sender) } +// Subscribe will subscribe the given PID to the event stream. +func (e *Engine) Subscribe(pid *PID) { + e.Send(e.eventStream, EventSub{pid: pid}) +} + +// Unsubscribe will un subscribe the given PID from the event stream. +func (e *Engine) Unsubscribe(pid *PID) { + e.Send(e.eventStream, EventUnsub{pid: pid}) +} + func (e *Engine) isLocalMessage(pid *PID) bool { return e.address == pid.Address } diff --git a/actor/event_stream.go b/actor/event_stream.go index 7d0b8b4..e2c761f 100644 --- a/actor/event_stream.go +++ b/actor/event_stream.go @@ -1,70 +1,36 @@ package actor -import ( - "math" - "math/rand" - "sync" - - "github.com/anthdm/hollywood/log" -) - +// EventSub is the message that will be send to subscribe to the event stream. type EventSub struct { - id uint32 -} - -type EventStreamFunc func(event any) - -type EventStream struct { - mu sync.RWMutex - subs map[*EventSub]EventStreamFunc - logger log.Logger + pid *PID } -func NewEventStream() *EventStream { - return &EventStream{ - subs: make(map[*EventSub]EventStreamFunc), - } +// EventUnSub is the message that will be send to unsubscribe from the event stream. +type EventUnsub struct { + pid *PID } -func (e *EventStream) Unsubscribe(sub *EventSub) { - e.mu.Lock() - defer e.mu.Unlock() - - delete(e.subs, sub) - - e.logger.Debugw("unsubscribe", - "subs", len(e.subs), - "id", sub.id, - ) +type EventStream struct { + subs map[*PID]bool } -func (e *EventStream) Subscribe(f EventStreamFunc) *EventSub { - e.mu.Lock() - defer e.mu.Unlock() - - sub := &EventSub{ - id: uint32(rand.Intn(math.MaxUint32)), +func NewEventStream() Producer { + return func() Receiver { + return &EventStream{ + subs: make(map[*PID]bool), + } } - e.subs[sub] = f - - e.logger.Debugw("subscribe", - "subs", len(e.subs), - "id", sub.id, - ) - - return sub } -func (e *EventStream) Publish(msg any) { - e.mu.RLock() - defer e.mu.RUnlock() - for _, f := range e.subs { - go f(msg) +func (e *EventStream) Receive(c *Context) { + switch msg := c.Message().(type) { + case EventSub: + e.subs[msg.pid] = true + case EventUnsub: + delete(e.subs, msg.pid) + default: + for sub := range e.subs { + c.Forward(sub) + } } } - -func (e *EventStream) Len() int { - e.mu.RLock() - defer e.mu.RUnlock() - return len(e.subs) -} diff --git a/actor/event_stream_test.go b/actor/event_stream_test.go index 23df18a..3b06c48 100644 --- a/actor/event_stream_test.go +++ b/actor/event_stream_test.go @@ -1,41 +1,48 @@ package actor import ( - sync "sync" + fmt "fmt" + "sync" "testing" - - "github.com/stretchr/testify/assert" ) -func TestEventStream(t *testing.T) { +type CustomEvent struct { + msg string +} + +func TestEventStreamLocal(t *testing.T) { e := NewEngine() wg := sync.WaitGroup{} - subs := []*EventSub{} - var mu sync.RWMutex - - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - sub := e.EventStream.Subscribe(func(event any) { - s, ok := event.(string) - assert.True(t, ok) - assert.Equal(t, "foo", s) - }) - - e.EventStream.Publish("foo") - mu.Lock() - subs = append(subs, sub) - mu.Unlock() + wg.Add(2) + e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Started: + c.Engine().Subscribe(c.PID()) + case CustomEvent: + fmt.Println("actor a received event") wg.Done() - }(i) - } - + } + }, "actor_a") + + e.SpawnFunc(func(c *Context) { + switch c.Message().(type) { + case Started: + c.Engine().Subscribe(c.PID()) + case CustomEvent: + fmt.Println("actor b received event") + wg.Done() + } + }, "actor_b") + e.BroadcastEvent(CustomEvent{msg: "foo"}) + // make sure both actors have received the event. + // If so, the test has passed. wg.Wait() - assert.Equal(t, 10, e.EventStream.Len()) +} + +func TestEventStreamActorStartedEvent(t *testing.T) { + +} - for _, sub := range subs { - e.EventStream.Unsubscribe(sub) - } +func TestEventStreamActorStoppedEvent(t *testing.T) { - assert.Equal(t, 0, e.EventStream.Len()) } diff --git a/actor/process.go b/actor/process.go index c5b0670..ed9b274 100644 --- a/actor/process.go +++ b/actor/process.go @@ -128,7 +128,7 @@ func (p *process) Start() { p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) - p.context.engine.EventStream.Publish(&ActivationEvent{PID: p.pid}) + p.context.engine.BroadcastEvent(&ActorStartedEvent{PID: p.pid}) p.logger.Debugw("actor started", "pid", p.pid) // If we have messages in our buffer, invoke them. if len(p.mbuffer) > 0 { @@ -197,8 +197,7 @@ func (p *process) cleanup(wg *sync.WaitGroup) { } } p.logger.Debugw("shutdown", "pid", p.pid) - // Send TerminationEvent to the eventstream - p.context.engine.EventStream.Publish(&TerminationEvent{PID: p.pid}) + p.context.engine.BroadcastEvent(&ActorStoppedEvent{PID: p.pid}) if wg != nil { wg.Done() } diff --git a/actor/types.go b/actor/types.go index d4836a7..4a3126b 100644 --- a/actor/types.go +++ b/actor/types.go @@ -10,16 +10,17 @@ type DeadLetterEvent struct { Sender *PID } -// ActivationEvent is broadcasted over the EventStream each time -// a Receiver is spawned and activated. This mean at the point of -// receiving this event the Receiver is ready to process messages. -type ActivationEvent struct { +// ActorStartedEvent is broadcasted over the EventStream each time +// a Receiver (Actor) is spawned and activated. This means, that at +// the point of receiving this event the Receiver (Actor) is ready +// to process messages. +type ActorStartedEvent struct { PID *PID } -// TerminationEvent is broadcasted over the EventStream each time +// ActorStoppedEvent is broadcasted over the EventStream each time // a process is terminated. -type TerminationEvent struct { +type ActorStoppedEvent struct { PID *PID } diff --git a/examples/chat/client/main.go b/examples/chat/client/main.go index d49f727..3de6a42 100644 --- a/examples/chat/client/main.go +++ b/examples/chat/client/main.go @@ -4,10 +4,11 @@ import ( "bufio" "flag" "fmt" - "github.com/anthdm/hollywood/log" "log/slog" "os" + "github.com/anthdm/hollywood/log" + "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/examples/chat/types" "github.com/anthdm/hollywood/remote" @@ -47,12 +48,11 @@ func main() { ) flag.Parse() - e := actor.NewEngine(actor.EngineOptLogger(log.Default())) - rem := remote.New(e, remote.Config{ + rem := remote.New(remote.Config{ ListenAddr: *listenAt, Logger: log.NewLogger("[remote]", log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)), }) - e.WithRemote(rem) + e := actor.NewEngine(actor.EngineOptLogger(log.Default()), actor.EngineOptRemote(rem)) var ( // the process ID of the server diff --git a/examples/chat/server/main.go b/examples/chat/server/main.go index 036dabd..855ae51 100644 --- a/examples/chat/server/main.go +++ b/examples/chat/server/main.go @@ -57,11 +57,11 @@ func main() { listenAt = flag.String("listen", "127.0.0.1:4000", "") ) flag.Parse() - e := actor.NewEngine() - rem := remote.New(e, remote.Config{ + rem := remote.New(remote.Config{ ListenAddr: *listenAt, }) - e.WithRemote(rem) + e := actor.NewEngine(actor.EngineOptRemote(rem)) + e.Spawn(newServer, "server") select {} diff --git a/examples/eventstream/main.go b/examples/eventstream/main.go index 4916196..976ab0f 100644 --- a/examples/eventstream/main.go +++ b/examples/eventstream/main.go @@ -2,51 +2,49 @@ package main import ( "fmt" - "github.com/anthdm/hollywood/log" - "log/slog" - "os" - "sync" + "time" "github.com/anthdm/hollywood/actor" ) -func main() { - lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) - e := actor.NewEngine(actor.EngineOptLogger(log.NewLogger("[engine]", lh))) - wg := sync.WaitGroup{} - wg.Add(1) +// Custom event type that will be send over the event stream. +type MyCustomEvent struct { + msg string +} - // Subscribe to a various list of events that are being broadcast by - // the engine, but also published by you. - eventSub := e.EventStream.Subscribe(func(event any) { - switch evt := event.(type) { - case *actor.DeadLetterEvent: - fmt.Printf("deadletter event to [%s] msg: %s\n", evt.Target, evt.Message) - case *actor.ActivationEvent: - fmt.Println("process is active", evt.PID) - case *actor.TerminationEvent: - fmt.Println("process terminated:", evt.PID) - wg.Done() - default: - fmt.Println("received event", evt) +// Spawn 2 actors and subscribe them to the event stream. +// When we call engine.PublishEvent both actors will be notified. +func main() { + e := actor.NewEngine() + actorA := e.SpawnFunc(func(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.Started: + fmt.Println("actor A started") + case MyCustomEvent: + fmt.Printf("actorA: event => %+v\n", msg) } - }) + }, "actor_a") + // Subscribe the actor to the event stream from outside of the actor itself. + e.Subscribe(actorA) - pid := e.SpawnFunc(func(c *actor.Context) { + actorB := e.SpawnFunc(func(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: - fmt.Println("started") - _ = msg + fmt.Println("actor B started") + // Subscribe the actor to the event stream from inside the actor itself. + c.Engine().Subscribe(c.PID()) + case MyCustomEvent: + fmt.Printf("actorB: event => %+v\n", msg) } - }, "foo") + }, "actor_b") - deadPID := actor.NewPID("local", "bar") - e.Send(deadPID, "hello") - // Publish anything to the stream. - e.EventStream.Publish([]byte("some dirty bytes")) - e.Poison(pid) + // Unsubscribing both actors from the event stream. + defer func() { + e.Unsubscribe(actorA) + e.Unsubscribe(actorB) + }() - // Unsubscribe from the event stream - defer e.EventStream.Unsubscribe(eventSub) - wg.Wait() + time.Sleep(time.Millisecond) + e.BroadcastEvent(MyCustomEvent{msg: "Hello World!"}) + time.Sleep(time.Millisecond) } diff --git a/examples/remote/client/main.go b/examples/remote/client/main.go index f04b0c5..c8a3323 100644 --- a/examples/remote/client/main.go +++ b/examples/remote/client/main.go @@ -9,9 +9,8 @@ import ( ) func main() { - e := actor.NewEngine() - r := remote.New(e, remote.Config{ListenAddr: "127.0.0.1:3000"}) - e.WithRemote(r) + r := remote.New(remote.Config{ListenAddr: "127.0.0.1:3000"}) + e := actor.NewEngine(actor.EngineOptRemote(r)) pid := actor.NewPID("127.0.0.1:4000", "server") for { diff --git a/examples/remote/server/main.go b/examples/remote/server/main.go index e2d4af2..8741c9a 100644 --- a/examples/remote/server/main.go +++ b/examples/remote/server/main.go @@ -26,9 +26,8 @@ func (f *server) Receive(ctx *actor.Context) { } func main() { - e := actor.NewEngine() - r := remote.New(e, remote.Config{ListenAddr: "127.0.0.1:4000"}) - e.WithRemote(r) + r := remote.New(remote.Config{ListenAddr: "127.0.0.1:4000"}) + e := actor.NewEngine(actor.EngineOptRemote(r)) e.Spawn(newServer, "server") select {} diff --git a/remote/remote.go b/remote/remote.go index b2ef0b7..565ce85 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -10,6 +10,7 @@ import ( "storj.io/drpc/drpcserver" ) +// Config holds the remote configuration. type Config struct { ListenAddr string Logger log.Logger @@ -23,10 +24,9 @@ type Remote struct { logger log.Logger } -// New creates a new "Remote" object given an engine and a Config. -func New(e *actor.Engine, cfg Config) *Remote { +// New creates a new "Remote" object given a Config. +func New(cfg Config) *Remote { r := &Remote{ - engine: e, config: cfg, logger: cfg.Logger, } @@ -34,7 +34,8 @@ func New(e *actor.Engine, cfg Config) *Remote { return r } -func (r *Remote) Start() { +func (r *Remote) Start(e *actor.Engine) { + r.engine = e ln, err := net.Listen("tcp", r.config.ListenAddr) if err != nil { panic("failed to listen: " + err.Error()) diff --git a/remote/remote_test.go b/remote/remote_test.go index 9273320..e3bccf3 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -93,10 +93,42 @@ func TestRequestResponse(t *testing.T) { assert.Equal(t, resp.(*TestMessage).Data, []byte("foo")) } +func TestEventStream(t *testing.T) { + // Events should work over the wire from the get go. + // Which is just insane, huh? + var ( + engine = makeRemoteEngine(getRandomLocalhostAddr()) + wg = sync.WaitGroup{} + ) + wg.Add(2) + + engine.SpawnFunc(func(c *actor.Context) { + switch c.Message().(type) { + case actor.Started: + c.Engine().Subscribe(c.PID()) + case *TestMessage: + fmt.Println("actor (a) received event") + wg.Done() + } + }, "actor_a") + + engine.SpawnFunc(func(c *actor.Context) { + switch c.Message().(type) { + case actor.Started: + c.Engine().Subscribe(c.PID()) + case *TestMessage: + fmt.Println("actor (b) received event") + wg.Done() + } + }, "actor_b") + time.Sleep(time.Millisecond) + engine.BroadcastEvent(&TestMessage{Data: []byte("testevent")}) + wg.Wait() +} + func makeRemoteEngine(listenAddr string) *actor.Engine { - e := actor.NewEngine() - r := New(e, Config{ListenAddr: listenAddr}) - e.WithRemote(r) + r := New(Config{ListenAddr: listenAddr}) + e := actor.NewEngine(actor.EngineOptRemote(r)) return e }