Skip to content

Commit

Permalink
proper engine config
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 9, 2024
1 parent da5c2fd commit 7c72c5f
Show file tree
Hide file tree
Showing 30 changed files with 95 additions and 78 deletions.
14 changes: 7 additions & 7 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestChildEventNoRaceCondition(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
assert.Nil(t, err)

parentPID := e.SpawnFunc(func(c *Context) {
Expand All @@ -30,7 +30,7 @@ func TestContextSendRepeat(t *testing.T) {
mu sync.Mutex
sr SendRepeater
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)

Expand All @@ -57,7 +57,7 @@ func TestSpawnChildPID(t *testing.T) {
childfn = func(c *Context) {}
expectedPID = NewPID(LocalLookupAddr, "parent/1/child/1")
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)
e.SpawnFunc(func(c *Context) {
Expand All @@ -77,7 +77,7 @@ func TestChild(t *testing.T) {
var (
wg = sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)
e.SpawnFunc(func(c *Context) {
Expand All @@ -99,7 +99,7 @@ func TestParent(t *testing.T) {
wg = sync.WaitGroup{}
parent = NewPID(LocalLookupAddr, "foo/bar/baz")
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)

Expand All @@ -123,7 +123,7 @@ func TestParent(t *testing.T) {
}

func TestGetPID(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
Expand All @@ -142,7 +142,7 @@ func TestSpawnChild(t *testing.T) {
var (
wg = sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)
childFunc := func(c *Context) {
Expand Down
2 changes: 1 addition & 1 deletion actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// TestDeadLetterCustom tests the custom deadletter handling.
// It is using the custom deadletter receiver defined inline.
func TestDeadLetterCustom(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down
28 changes: 20 additions & 8 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"
)

// Remoter is an interface that abstract a remote that is tied to an engine.
type Remoter interface {
Address() string
Send(*PID, any, *PID)
Expand All @@ -34,19 +35,30 @@ type Engine struct {

// EngineConfig holds the configuration of the engine.
type EngineConfig struct {
Remote Remoter
remote Remoter
}

// NewEngine returns a new actor Engine.
// No mandatory arguments, but you can pass in a EngineConfig struct to configure the engine
func NewEngine(opts *EngineConfig) (*Engine, error) {
// NewEngineConfig returns a new default EngineConfig.
func NewEngineConfig() EngineConfig {
return EngineConfig{}
}

// WithRemote sets the remote which will configure the engine so its capable
// to send and receive messages over the network.
func (config EngineConfig) WithRemote(remote Remoter) EngineConfig {
config.remote = remote
return config
}

// NewEngine returns a new actor Engine given an EngineConfig.
func NewEngine(config EngineConfig) (*Engine, error) {
e := &Engine{}
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.address = LocalLookupAddr
if opts != nil && opts.Remote != nil {
e.remote = opts.Remote
e.address = opts.Remote.Address()
err := opts.Remote.Start(e)
if config.remote != nil {
e.remote = config.remote
e.address = config.remote.Address()
err := config.remote.Start(e)
if err != nil {
return nil, fmt.Errorf("failed to start remote: %w", err)
}
Expand Down
36 changes: 18 additions & 18 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newTickReceiver(wg *sync.WaitGroup) Producer {
}

func TestRegistryGetPID(t *testing.T) {
e, _ := NewEngine(nil)
e, _ := NewEngine(NewEngineConfig())
expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1"))
expectedPID2 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("2"))
pid := e.Registry.GetPID("foo", "1")
Expand All @@ -47,15 +47,15 @@ func TestRegistryGetPID(t *testing.T) {
}

func TestSendToNilPID(t *testing.T) {
e, _ := NewEngine(nil)
e, _ := NewEngine(NewEngineConfig())
e.Send(nil, "foo")
}

func TestSendRepeat(t *testing.T) {
var (
wg = &sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)
pid := e.Spawn(newTickReceiver(wg), "test")
Expand All @@ -65,7 +65,7 @@ func TestSendRepeat(t *testing.T) {
}

func TestRestartsMaxRestarts(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
restarts := 2
type payload struct {
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestProcessInitStartOrder(t *testing.T) {
wg = sync.WaitGroup{}
started, init bool
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
pid := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
Expand All @@ -117,7 +117,7 @@ func TestProcessInitStartOrder(t *testing.T) {
}

func TestRestarts(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg := sync.WaitGroup{}
type payload struct {
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestSendWithSender(t *testing.T) {
sender = NewPID("local", "sender")
wg = sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)

Expand All @@ -167,7 +167,7 @@ func TestSendWithSender(t *testing.T) {
}

func TestSendMsgRaceCon(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg := sync.WaitGroup{}

Expand All @@ -187,7 +187,7 @@ func TestSendMsgRaceCon(t *testing.T) {
}

func TestSpawn(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg := sync.WaitGroup{}

Expand All @@ -205,7 +205,7 @@ func TestSpawn(t *testing.T) {
}

func TestSpawnDuplicateId(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg := sync.WaitGroup{}
pid1 := e.Spawn(NewTestProducer(t, func(t *testing.T, ctx *Context) {}), "dummy")
Expand All @@ -220,7 +220,7 @@ func TestStopWaitGroup(t *testing.T) {
wg = sync.WaitGroup{}
x = int32(0)
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)

Expand All @@ -242,7 +242,7 @@ func TestStop(t *testing.T) {
var (
wg = sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
for i := 0; i < 4; i++ {
wg.Add(1)
Expand All @@ -268,7 +268,7 @@ func TestPoisonWaitGroup(t *testing.T) {
wg = sync.WaitGroup{}
x = int32(0)
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
wg.Add(1)

Expand All @@ -290,7 +290,7 @@ func TestPoison(t *testing.T) {
var (
wg = sync.WaitGroup{}
)
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
for i := 0; i < 4; i++ {
wg.Add(1)
Expand All @@ -316,7 +316,7 @@ func TestRequestResponse(t *testing.T) {
type responseEvent struct {
d time.Duration
}
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
assert.NoError(t, err)
a := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestRequestResponse(t *testing.T) {
}

func TestPoisonPillPrivate(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
successCh := make(chan struct{}, 1)
failCh := make(chan struct{}, 1)
Expand All @@ -373,7 +373,7 @@ func TestPoisonPillPrivate(t *testing.T) {

// 45.84 ns/op 25 B/op => 13th Gen Intel(R) Core(TM) i9-13900KF
func BenchmarkSendMessageLocal(b *testing.B) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(b, err)
pid := e.SpawnFunc(func(_ *Context) {}, "bench", WithInboxSize(128))

Expand All @@ -386,7 +386,7 @@ func BenchmarkSendMessageLocal(b *testing.B) {
}

func BenchmarkSendWithSenderMessageLocal(b *testing.B) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(b, err)
p := NewTestProducer(nil, func(_ *testing.T, _ *Context) {})
pid := e.Spawn(p, "bench", WithInboxSize(1024*8))
Expand Down
6 changes: 3 additions & 3 deletions actor/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CustomEvent struct {
}

func TestEventStreamLocal(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(2)
Expand Down Expand Up @@ -43,7 +43,7 @@ func TestEventStreamLocal(t *testing.T) {
}

func TestEventStreamActorStartedEvent(t *testing.T) {
e, _ := NewEngine(nil)
e, _ := NewEngine(NewEngineConfig())
wg := sync.WaitGroup{}

wg.Add(1)
Expand All @@ -61,7 +61,7 @@ func TestEventStreamActorStartedEvent(t *testing.T) {
}

func TestEventStreamActorStoppedEvent(t *testing.T) {
e, _ := NewEngine(nil)
e, _ := NewEngine(NewEngineConfig())
wg := sync.WaitGroup{}

wg.Add(1)
Expand Down
5 changes: 3 additions & 2 deletions actor/event_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package actor

import (
"github.com/stretchr/testify/assert"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestDuplicateIdEvent(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
5 changes: 3 additions & 2 deletions actor/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package actor
import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function
// which triggers the panic is at the top of the stack trace.
func Test_CleanTrace(t *testing.T) {
e, err := NewEngine(nil)
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
type triggerPanic struct {
data int
Expand Down
2 changes: 1 addition & 1 deletion actor/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (p fooProc) Invoke([]Envelope) {}
func (p fooProc) Shutdown(*sync.WaitGroup) {}

func TestGetRemoveAdd(t *testing.T) {
e, _ := NewEngine(nil)
e, _ := NewEngine(NewEngineConfig())
reg := newRegistry(e)
eproc := fooProc{}
reg.add(eproc)
Expand Down
13 changes: 6 additions & 7 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ type (
getActive struct{ id string }
)

// Agent is an actor/receiver that is responsible for managing the state
// of the cluster.
type Agent struct {
members *MemberSet
cluster *Cluster

kinds map[string]bool

members *MemberSet
cluster *Cluster
kinds map[string]bool
localKinds map[string]kind

// All the actors that are available cluster wide.
activated map[string]*actor.PID
}
Expand Down Expand Up @@ -142,7 +141,7 @@ func (a *Agent) activate(kind, id, region string) *actor.PID {
activationResp = a.handleActivationRequest(req)
} else {
// Remote activation

//
// TODO: topology hash
resp, err := a.cluster.engine.Request(activatorPID, req, a.cluster.config.requestTimeout).Result()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Cluster struct {
func New(config Config) (*Cluster, error) {
if config.engine == nil {
remote := remote.New(config.listenAddr, nil)
e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote})
e, err := actor.NewEngine(actor.NewEngineConfig().WithRemote(remote))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestMemberLeave(t *testing.T) {
c2Addr := getRandomLocalhostAddr()

remote := remote.New(c2Addr, nil)
e, err := actor.NewEngine(&actor.EngineConfig{Remote: remote})
e, err := actor.NewEngine(actor.NewEngineConfig().WithRemote(remote))
if err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit 7c72c5f

Please sign in to comment.