Skip to content

Commit

Permalink
Merge branch 'master' into fix/updateGoMod
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinMontmirail authored Jan 2, 2024
2 parents c591fbd + da5c2fd commit 1158ad8
Show file tree
Hide file tree
Showing 22 changed files with 935 additions and 398 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ build:
go build -o bin/metrics examples/metrics/main.go
go build -o bin/chatserver examples/chat/server/main.go
go build -o bin/chatclient examples/chat/client/main.go
go build -o bin/cluster examples/cluster/member_1/main.go
go build -o bin/cluster examples/cluster/member_2/main.go
go build -o bin/cluster_member_1 examples/cluster/member_1/main.go
go build -o bin/cluster_member_2 examples/cluster/member_2/main.go

bench:
go run ./_bench/.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ large number of concurrent users and complex interactions.

## Features

- guaranteed message delivery on actor failure (buffer mechanism)
- fire & forget or request & response messaging, or both.
- Guaranteed message delivery on actor failure (buffer mechanism)
- Fire & forget or request & response messaging, or both
- High performance dRPC as the transport layer
- Optimized proto buffers without reflection
- lightweight and highly customizable
- cluster support [wip]
- Lightweight and highly customizable
- Cluster support for writing distributed self discovering actors

# Benchmarks

Expand Down
4 changes: 2 additions & 2 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type Context struct {
engine *Engine
receiver Receiver
message any
// the context of the parent, if this is the context of a child.
// we need this so we can remove the child from the parent Context
// the context of the parent if we are a child.
// we need this parentCtx, so we can remove the child from the parent Context
// when the child dies.
parentCtx *Context
children *safemap.SafeMap[string, *PID]
Expand Down
19 changes: 16 additions & 3 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ import (
"github.com/stretchr/testify/require"
)

func TestChildEventNoRaceCondition(t *testing.T) {
e, err := NewEngine(nil)
assert.Nil(t, err)

parentPID := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
child := c.SpawnChildFunc(func(childctx *Context) {
}, "child")
c.engine.Subscribe(child)
}
}, "parent")
e.Poison(parentPID).Wait()
}

func TestContextSendRepeat(t *testing.T) {
var (
wg = &sync.WaitGroup{}
Expand Down Expand Up @@ -145,9 +160,7 @@ func TestSpawnChild(t *testing.T) {
}, "parent", WithMaxRestarts(0))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()

assert.Nil(t, e.Registry.get(NewPID("local", "child")))
assert.Nil(t, e.Registry.get(pid))
Expand Down
3 changes: 2 additions & 1 deletion actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Engine struct {
eventStream *PID
}

// EngineConfig holds the configuration of the engine.
type EngineConfig struct {
Remote Remoter
}
Expand Down Expand Up @@ -71,6 +72,7 @@ func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID {
return e.SpawnProc(proc)
}

// SpawnFunc spawns the given function as a stateless receiver/actor.
func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID {
return e.Spawn(newFuncReceiver(f), kind, opts...)
}
Expand Down Expand Up @@ -193,7 +195,6 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea

// Stop will send a non-graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down immediately, once it has processed the poisonPill messsage.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, false, wg...)
}
Expand Down
20 changes: 6 additions & 14 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestRestarts(t *testing.T) {
if msg.data != 10 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messsages after borking.", msg.data)
fmt.Println("finally processed all my messages after borking", msg.data)
wg.Done()
}
}
Expand Down Expand Up @@ -234,9 +234,7 @@ func TestStopWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Stop(pid, pwg)
pwg.Wait()
e.Stop(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -258,9 +256,7 @@ func TestStop(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Stop(pid, stopwg)
stopwg.Wait()
e.Stop(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get nil when looking it up in the registry.
assert.Nil(t, e.Registry.get(pid))
Expand All @@ -286,9 +282,7 @@ func TestPoisonWaitGroup(t *testing.T) {
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Poison(pid, pwg)
pwg.Wait()
e.Poison(pid).Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

Expand All @@ -310,9 +304,7 @@ func TestPoison(t *testing.T) {
}, "foo", WithID(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Poison(pid, stopwg)
stopwg.Wait()
e.Poison(pid).Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get NIL when we try to get it.
assert.Nil(t, e.Registry.get(pid))
Expand Down Expand Up @@ -379,7 +371,7 @@ func TestPoisonPillPrivate(t *testing.T) {
}
}

// 56 ns/op
// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF
func BenchmarkSendMessageLocal(b *testing.B) {
e, err := NewEngine(nil)
require.NoError(b, err)
Expand Down
4 changes: 2 additions & 2 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ActorStartedEvent struct {
}

func (e ActorStartedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor started", []any{"pid", e.PID}
return slog.LevelDebug, "Actor started", []any{"pid", e.PID}
}

// ActorInitializedEvent is broadcasted over the eventStream before an actor
Expand All @@ -45,7 +45,7 @@ type ActorStoppedEvent struct {
}

func (e ActorStoppedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor stopped", []any{"pid", e.PID}
return slog.LevelDebug, "Actor stopped", []any{"pid", e.PID}
}

// ActorRestartedEvent is broadcasted when an actor crashes and gets restarted
Expand Down
52 changes: 36 additions & 16 deletions actor/process.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package actor

import (
"bytes"
"fmt"
"github.com/DataDog/gostackparse"
"log/slog"
"runtime/debug"
"sync"
Expand All @@ -22,6 +24,11 @@ type Processer interface {
Shutdown(*sync.WaitGroup)
}

const (
procStateRunning int32 = iota
procStateStopped
)

type process struct {
Opts

Expand Down Expand Up @@ -150,8 +157,7 @@ func (p *process) tryRestart(v any) {
p.Start()
return
}
stackTrace := debug.Stack()
fmt.Println(string(stackTrace))
stackTrace := cleanTrace(debug.Stack())
// If we reach the max restarts, we shutdown the inbox and clean
// everything up.
if p.restarts == p.MaxRestarts {
Expand All @@ -177,29 +183,22 @@ func (p *process) tryRestart(v any) {
}

func (p *process) cleanup(wg *sync.WaitGroup) {
p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

// We are a child if the parent context is not nil
// No need for a mutex here, cause this is getting called inside the
// the parents children foreach loop, which already locks.
if p.context.parentCtx != nil {
p.context.parentCtx.children.Delete(p.Kind)
}

// We are a parent if we have children running, shutdown all the children.
if p.context.children.Len() > 0 {
children := p.context.Children()
for _, pid := range children {
if wg != nil {
wg.Add(1)
}
proc := p.context.engine.Registry.get(pid)
proc.Shutdown(wg)
p.context.engine.Poison(pid).Wait()
}
}

p.inbox.Stop()
p.context.engine.Registry.Remove(p.pid)
p.context.message = Stopped{}
applyMiddleware(p.context.receiver.Receive, p.Opts.Middleware...)(p.context)

p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid, Timestamp: time.Now()})
if wg != nil {
wg.Done()
Expand All @@ -211,3 +210,24 @@ func (p *process) Send(_ *PID, msg any, sender *PID) {
p.inbox.Send(Envelope{Msg: msg, Sender: sender})
}
func (p *process) Shutdown(wg *sync.WaitGroup) { p.cleanup(wg) }

func cleanTrace(stack []byte) []byte {
goros, err := gostackparse.Parse(bytes.NewReader(stack))
if err != nil {
slog.Error("failed to parse stacktrace", "err", err)
return stack
}
if len(goros) != 1 {
slog.Error("expected only one goroutine", "goroutines", len(goros))
return stack
}
// skip the first frames:
goros[0].Stack = goros[0].Stack[4:]
buf := bytes.NewBuffer(nil)
_, _ = fmt.Fprintf(buf, "goroutine %d [%s]\n", goros[0].ID, goros[0].State)
for _, frame := range goros[0].Stack {
_, _ = fmt.Fprintf(buf, "%s\n", frame.Func)
_, _ = fmt.Fprint(buf, "\t", frame.File, ":", frame.Line, "\n")
}
return buf.Bytes()
}
49 changes: 49 additions & 0 deletions actor/process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package actor

import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function
// which triggers the panic is at the top of the stack trace.
func Test_CleanTrace(t *testing.T) {
e, err := NewEngine(nil)
require.NoError(t, err)
type triggerPanic struct {
data int
}
stopCh := make(chan struct{})
pid := e.SpawnFunc(func(c *Context) {
fmt.Printf("Got message type %T\n", c.Message())
switch c.Message().(type) {
case Started:
c.Engine().Subscribe(c.pid)
case triggerPanic:
panicWrapper()
case ActorRestartedEvent:
m := c.Message().(ActorRestartedEvent)
// split the panic into lines:
lines := bytes.Split(m.Stacktrace, []byte("\n"))
// check that the second line is the panicWrapper function:
if bytes.Contains(lines[1], []byte("panicWrapper")) {
fmt.Println("stack trace contains panicWrapper at the right line")
stopCh <- struct{}{}
}
}
}, "foo", WithMaxRestarts(1))
e.Send(pid, triggerPanic{1})
select {
case <-stopCh:
fmt.Println("test passed")
case <-time.After(time.Second):
t.Error("test timed out. stack trace likely did not contain panicWrapper at the right line")
}
}

func panicWrapper() {
panic("foo")
}
4 changes: 2 additions & 2 deletions cluster/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ActivationDetails struct {

type defaultActivationStrategy struct{}

// DefaultActivationStrategy selects a random member in the cluster.
func DefaultActivationStrategy() defaultActivationStrategy {
// NewDefaultActivationStrategy selects a random member in the cluster.
func NewDefaultActivationStrategy() defaultActivationStrategy {
return defaultActivationStrategy{}
}

Expand Down
Loading

0 comments on commit 1158ad8

Please sign in to comment.