Skip to content

Commit

Permalink
make the links a bit nicer.
Browse files Browse the repository at this point in the history
document how to pass arguments to actor creation.

closes: #96
  • Loading branch information
perbu committed Dec 9, 2023
2 parents 555a0f1 + edb94a8 commit 8d96f46
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 23 deletions.
43 changes: 29 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ advertising brokers, trading engines, etc... It can handle **10 million messages
The Actor Model is a computational model used to build highly concurrent and distributed systems. It was introduced by
Carl Hewitt in 1973 as a way to handle complex systems in a more scalable and fault-tolerant manner.

In the Actor Model, the basic building block is an actor, called receiver in Hollywood, which is an independent unit of
computation that communicates with other actors by exchanging messages. Each actor has its own state and behavior, and
In the Actor Model, the basic building block is an actor, sometimes referred to as a receiver in Hollywood,
which is an independent unit of computation that communicates with other actors by exchanging messages.
Each actor has its own state and behavior, and
can only communicate with other actors by sending messages. This message-passing paradigm allows for a highly
decentralized and fault-tolerant system, as actors can continue to operate independently even if other actors fail or
become unavailable.
Expand All @@ -28,7 +29,7 @@ large number of concurrent users and complex interactions.

## Features

- guaranteed message delivery on receiver failure (buffer mechanism)
- guaranteed message delivery on actor failure (buffer mechanism)
- fire & forget or request & response messaging, or both.
- High performance dRPC as the transport layer
- Optimized proto buffers without reflection
Expand Down Expand Up @@ -62,14 +63,14 @@ compiler.
## Hello world.

Let's go through a Hello world message. The complete example is available in the
***[hello world](examples/helloworld)*** folder. Let's start in main:
[hello world](examples/helloworld) folder. Let's start in main:
```go
engine, err := actor.NewEngine()
```
This creates a new engine. The engine is the core of Hollywood. It's responsible for spawning actors, sending messages
and handling the lifecycle of actors. If Hollywood fails to create the engine it'll return an error.

Next we'll need to create an actor. These are some times referred to as Receivers after the interface they must
Next we'll need to create an actor. These are some times referred to as `Receivers` after the interface they must
implement. Let's create a new actor that will print a message when it receives a message.

```go
Expand Down Expand Up @@ -136,16 +137,15 @@ The **[examples](https://examples)** folder is the best place to learn and
explore Hollywood further.


## Spawning receivers (actors)
## Spawning actors

When you spawn an actor you'll need to provide a function that returns a new actor. As the actor is spawn there are a few
tunable options you can provide.

### With default configuration
```go
e.Spawn(newFoo, "myactorname")
```
The options should be pretty self explanatory. You can set the maximum number of restarts, which tells the engine
how many times the given actor should be restarted in case of panic, the size of the inbox, which sets a limit on how
and unprocessed messages the inbox can hold before it will start to block, and finally you can set a list of tags.
Tags are used to filter actors when you want to send a message to a group of actors.

### Passing arguments to the constructor

Expand Down Expand Up @@ -182,6 +182,11 @@ pid := engine.Spawn(newCustomNameResponder("anthony"), "name-responder")
)
)
```
The options should be pretty self explanatory. You can set the maximum number of restarts, which tells the engine
how many times the given actor should be restarted in case of panic, the size of the inbox, which sets a limit on how
and unprocessed messages the inbox can hold before it will start to block, and finally you can set a list of tags.
Tags are used to filter actors when you want to send a message to a group of actors.

### As a stateless function
Actors without state can be spawned as a function, because its quick and simple.
```go
Expand All @@ -198,10 +203,14 @@ e.SpawnFunc(func(c *actor.Context) {
Actors can communicate with each other over the network with the Remote package.
This works the same as local actors but "over the wire". Hollywood supports serialization with protobuf.

***[Remote actor examples](examples/remote)***
Look at the [Remote actor examples](examples/remote) and the [Chat client & Server](examples/chat) for more information.

## Eventstream

In a production system thing will eventually go wrong. Actors will crash, machines will fail, messages will end up in
the deadletter queue. You can build software that can handle these events in a graceful and predictable way by using
the event stream.

The Eventstream is a powerful abstraction that allows you to build flexible and pluggable systems without dependencies.

1. Subscribe any actor to a various list of system events
Expand All @@ -214,15 +223,21 @@ deliver a message to an actor it will send a `DeadLetterEvent` to the event stre
Any event that fulfills the `actor.LogEvent` interface will be logged to the default logger, with the severity level,
message and the attributes of the event set by the `actor.LogEvent` `log()` method.

You can find more in-depth information on how to use the Eventstream in your application in the Eventstream ***[examples](examples/eventstream)***

### List of internal system events
* `ActorStartedEvent`, an actor has started
* `ActorStoppedEvent`, an actor has stopped
* `DeadLetterEvent`, a message was not delivered to an actor
* `ActorRestartedEvent`, an actor has restarted after a crash/panic.

Have a look at the `actor/events.go` file for more information on the events.
### Eventstream example

There is a [eventstream monitoring example](examples/eventstream-monitor) which shows you how to use the event stream.
It features two actors, one is unstable and will crash every second. The other actor is subscribed to the event stream
and maintains a few counters for different events such as crashes, etc.

The application will run for a few seconds and the poison the unstable actor. It'll then query the monitor with a
request. As actors are floating around inside the engine, this is the way you interact with them. main will then print
the result of the query and the application will exit.

## Customizing the Engine

Expand Down
2 changes: 1 addition & 1 deletion actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestDeadLetterCustom(t *testing.T) {
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Initialized:
c.engine.BroadcastEvent(EventSub{c.pid})
c.engine.Subscribe(c.PID())
case DeadLetterEvent:
wg.Done()
}
Expand Down
4 changes: 2 additions & 2 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {

// Subscribe will subscribe the given PID to the event stream.
func (e *Engine) Subscribe(pid *PID) {
e.Send(e.eventStream, EventSub{pid: pid})
e.Send(e.eventStream, eventSub{pid: pid})
}

// Unsubscribe will un subscribe the given PID from the event stream.
func (e *Engine) Unsubscribe(pid *PID) {
e.Send(e.eventStream, EventUnsub{pid: pid})
e.Send(e.eventStream, eventUnsub{pid: pid})
}

func (e *Engine) isLocalMessage(pid *PID) bool {
Expand Down
12 changes: 6 additions & 6 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"log/slog"
)

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
// eventSub is the message that will be send to subscribe to the event stream.
type eventSub struct {
pid *PID
}

// EventUnSub is the message that will be send to unsubscribe from the event stream.
type EventUnsub struct {
type eventUnsub struct {
pid *PID
}

Expand All @@ -28,13 +28,13 @@ func NewEventStream() Producer {
}

// Receive for the event stream. All system-wide events are sent here.
// Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events),
// Some events are specially handled, such as eventSub, EventUnSub (for subscribing to events),
// DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent
func (e *EventStream) Receive(c *Context) {
switch msg := c.Message().(type) {
case EventSub:
case eventSub:
e.subs[msg.pid] = true
case EventUnsub:
case eventUnsub:
delete(e.subs, msg.pid)
default:
// check if we should log the event, if so, log it with the relevant level, message and attributes
Expand Down
56 changes: 56 additions & 0 deletions actor/inbox_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,57 @@
package actor

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestScheduler(t *testing.T) {
var executed atomic.Bool
scheduler := NewScheduler(10)
scheduler.Schedule(func() {
executed.Store(true)
})
runtime.Gosched()
if !executed.Load() {
t.Errorf("Expected the function to be executed")
}
}

func TestInboxSendAndProcess(t *testing.T) {
inbox := NewInbox(10)
processedMessages := make(chan Envelope, 10)
mockProc := MockProcesser{
processFunc: func(envelopes []Envelope) {
for _, e := range envelopes {
processedMessages <- e
}
},
}
inbox.Start(mockProc)
msg := Envelope{}
inbox.Send(msg)
select {
case <-processedMessages: // Message processed
case <-time.After(time.Millisecond):
t.Errorf("Message was not processed in time")
}

inbox.Stop()
}

type MockProcesser struct {
processFunc func([]Envelope)
}

func (m MockProcesser) Start() {}
func (m MockProcesser) PID() *PID {
return nil
}
func (m MockProcesser) Send(*PID, any, *PID) {}
func (m MockProcesser) Invoke(envelopes []Envelope) {
m.processFunc(envelopes)
}
func (m MockProcesser) Shutdown(_ *sync.WaitGroup) {}
95 changes: 95 additions & 0 deletions examples/eventstream-monitor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"fmt"
"time"

"github.com/anthdm/hollywood/actor"
)

type monitor struct {
crashes int
starts int
stops int
deadletters int
msg string
}

type query struct {
crashes int
starts int
stops int
deadletters int
}

func newMonitor() actor.Receiver {
return &monitor{}

}
func (m *monitor) Receive(c *actor.Context) {
switch c.Message().(type) {
case actor.Initialized:
c.Engine().Subscribe(c.PID())
case actor.ActorRestartedEvent:
m.crashes++
case actor.ActorStartedEvent:
m.starts++
case actor.ActorStoppedEvent:
m.stops++
case actor.DeadLetterEvent:
m.deadletters++
case query:
c.Respond(query{
crashes: m.crashes,
starts: m.starts,
stops: m.stops,
deadletters: m.deadletters,
})
}

}

type customMessage struct{}
type unstableActor struct {
restarts int
spawnTime time.Time
}

func newUnstableActor() actor.Receiver {
return &unstableActor{}
}
func (m *unstableActor) Receive(c *actor.Context) {
switch c.Message().(type) {
case actor.Initialized:
m.spawnTime = time.Now()
case actor.Started:
fmt.Println("actor started")
case customMessage:
if time.Since(m.spawnTime) > time.Second { // We should crash once per second.
panic("my time has come")
}

}
}

func main() {
e, _ := actor.NewEngine()
// Spawn a monitor actor and then an unstable actor.
monitor := e.Spawn(newMonitor, "monitor")
ua := e.Spawn(newUnstableActor, "unstable_actor", actor.WithMaxRestarts(10000))
repeater := e.SendRepeat(ua, customMessage{}, time.Millisecond*20)
time.Sleep(time.Second * 5)
repeater.Stop()
e.Poison(ua).Wait()
res, err := e.Request(monitor, query{}, time.Second).Result()
if err != nil {
fmt.Println("Query", err)
}
q := res.(query)
fmt.Printf("Observed %d crashes\n", q.crashes)
fmt.Printf("Observed %d starts\n", q.starts)
fmt.Printf("Observed %d stops\n", q.stops)
fmt.Printf("Observed %d deadletters\n", q.deadletters)
e.Poison(monitor).Wait() // the monitor will output stats on stop.
fmt.Println("done")
}

0 comments on commit 8d96f46

Please sign in to comment.