Skip to content

Commit

Permalink
Merge branch 'master' into fix/updateGoMod
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinMontmirail authored Dec 31, 2023
2 parents 9b48e14 + 54133c9 commit c591fbd
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 46 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ Any event that fulfills the `actor.LogEvent` interface will be logged to the def
message and the attributes of the event set by the `actor.LogEvent` `log()` method.

### List of internal system events
* `actor.ActorInitializedEvent`, an actor has been initialized but did not processed its `actor.Started message`
* `actor.ActorStartedEvent`, an actor has started
* `actor.ActorStoppedEvent`, an actor has stopped
* `actor.DeadLetterEvent`, a message was not delivered to an actor
Expand Down
10 changes: 10 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ func newTickReceiver(wg *sync.WaitGroup) Producer {
}
}

func TestRegistryGetPID(t *testing.T) {
e, _ := NewEngine(nil)
expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1"))
expectedPID2 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("2"))
pid := e.Registry.GetPID("foo", "1")
assert.True(t, pid.Equals(expectedPID1))
pid = e.Registry.GetPID("foo", "2")
assert.True(t, pid.Equals(expectedPID2))
}

func TestSendToNilPID(t *testing.T) {
e, _ := NewEngine(nil)
e.Send(nil, "foo")
Expand Down
11 changes: 11 additions & 0 deletions actor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func (e ActorStartedEvent) Log() (slog.Level, string, []any) {
return slog.LevelInfo, "Actor started", []any{"pid", e.PID}
}

// ActorInitializedEvent is broadcasted over the eventStream before an actor
// received and processed its started event.
type ActorInitializedEvent struct {
PID *PID
Timestamp time.Time
}

func (e ActorInitializedEvent) Log() (slog.Level, string, []any) {
return slog.LevelDebug, "Actor initialized", []any{"pid", e.PID}
}

// ActorStoppedEvent is broadcasted over the eventStream each time
// a process is terminated.
type ActorStoppedEvent struct {
Expand Down
1 change: 1 addition & 0 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (p *process) Start() {
}()
p.context.message = Initialized{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
p.context.engine.BroadcastEvent(ActorInitializedEvent{PID: p.pid, Timestamp: time.Now()})

p.context.message = Started{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
Expand Down
10 changes: 10 additions & 0 deletions actor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ func newRegistry(e *Engine) *Registry {
}
}

// GetPID returns the process id associated for the given kind and its id.
// GetPID returns nil if the process was not found.
func (r *Registry) GetPID(kind, id string) *PID {
proc := r.getByID(kind + pidSeparator + id)
if proc != nil {
return proc.PID()
}
return nil
}

// Remove removes the given PID from the registry.
func (r *Registry) Remove(pid *PID) {
r.mu.Lock()
Expand Down
56 changes: 36 additions & 20 deletions examples/persistance/main.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path"
"regexp"
"sync"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/redis/go-redis/v9"
)

type Storer interface {
Store(key string, data []byte) error
Load(key string) ([]byte, error)
}

func WithPersistance(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc {
func WithPersistence(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc {
return func(next actor.ReceiveFunc) actor.ReceiveFunc {
return func(c *actor.Context) {
switch c.Message().(type) {
Expand Down Expand Up @@ -114,23 +115,31 @@ func (p *PlayerState) State() ([]byte, error) {
return json.Marshal(state)
}

type RedisStore struct {
client *redis.Client
type fileStore struct {
path string
}

func newRedisStore(c *redis.Client) *RedisStore {
return &RedisStore{
client: c,
func newFileStore() *fileStore {
// make a tmp dir:
tmpdir := "/tmp/persistenceexample"
err := os.Mkdir(tmpdir, 0755)
if err != nil && !os.IsExist(err) {
log.Fatal(err)
}
return &fileStore{
path: tmpdir,
}
}

func (r *RedisStore) Store(key string, state []byte) error {
return r.client.Set(context.TODO(), key, state, 0).Err()
// Store the state in a file name key
func (r *fileStore) Store(key string, state []byte) error {
key = safeFileName(key)
return os.WriteFile(path.Join(r.path, key), state, 0755)
}

func (r *RedisStore) Load(key string) ([]byte, error) {
val, err := r.client.Get(context.TODO(), key).Result()
return []byte(val), err
func (r *fileStore) Load(key string) ([]byte, error) {
key = safeFileName(key)
return os.ReadFile(path.Join(r.path, key))
}

func main() {
Expand All @@ -139,13 +148,12 @@ func main() {
log.Fatal(err)
}
var (
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
store = newRedisStore(redisClient)
pid = e.Spawn(newPlayerState(100, "James"), "playerState", actor.WithMiddleware(WithPersistance(store)))
store = newFileStore()
pid = e.Spawn(
newPlayerState(100, "James"),
"playerState",
actor.WithMiddleware(WithPersistence(store)),
actor.WithID("james"))
)
time.Sleep(time.Second * 1)
e.Send(pid, TakeDamage{Amount: 9})
Expand All @@ -154,3 +162,11 @@ func main() {
e.Poison(pid, wg)
wg.Wait()
}

var safeRx = regexp.MustCompile(`[^a-zA-Z0-9]`)

// safeFileName replaces all characters azAZ09 with _
func safeFileName(s string) string {
res := safeRx.ReplaceAllString(s, "_")
return res
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/grandcat/zeroconf v1.0.0
github.com/planetscale/vtprotobuf v0.5.0
github.com/prometheus/client_golang v1.18.0
github.com/redis/go-redis/v9 v9.3.1
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
Expand All @@ -17,7 +16,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand Down Expand Up @@ -48,8 +48,6 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds=
github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
41 changes: 21 additions & 20 deletions safemap/safemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,45 @@ package safemap
import "sync"

type SafeMap[K comparable, V any] struct {
data sync.Map
mu sync.RWMutex
data map[K]V
}

func New[K comparable, V any]() *SafeMap[K, V] {
return &SafeMap[K, V]{
data: sync.Map{},
data: make(map[K]V),
}
}

func (s *SafeMap[K, V]) Set(k K, v V) {
s.data.Store(k, v)
s.mu.Lock()
defer s.mu.Unlock()
s.data[k] = v
}

func (s *SafeMap[K, V]) Get(k K) (V, bool) {
val, ok := s.data.Load(k)
var zero V
if !ok {
return zero, false
}
return val.(V), ok
s.mu.RLock()
defer s.mu.RUnlock()
val, ok := s.data[k]
return val, ok
}

func (s *SafeMap[K, V]) Delete(k K) {
s.data.Delete(k)
s.mu.Lock()
defer s.mu.Unlock()
delete(s.data, k)
}

func (s *SafeMap[K, V]) Len() int {
count := 0
s.data.Range(func(_, _ interface{}) bool {
count++
return true
})
return count
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.data)
}

func (s *SafeMap[K, V]) ForEach(f func(K, V)) {
s.data.Range(func(key, value interface{}) bool {
f(key.(K), value.(V))
return true
})
s.mu.RLock()
defer s.mu.RUnlock()
for k, v := range s.data {
f(k, v)
}
}

0 comments on commit c591fbd

Please sign in to comment.