Skip to content

Commit

Permalink
fix context mode
Browse files Browse the repository at this point in the history
  • Loading branch information
wXwcoder committed Aug 12, 2024
1 parent fb3ef08 commit db7c9a8
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 8 deletions.
17 changes: 16 additions & 1 deletion actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Server interface {
Activate(invokeName string) error
// Deactivate called before actor removed by actor manager
Deactivate() error

WithContext() ServerContext
}

Expand All @@ -68,6 +68,12 @@ type ServerContext interface {
// SaveState is impl by ServerImplBase, It saves the state cache of this actor instance to state store component by calling api of daprd.
// Save state is called at two places: 1. On invocation of this actor instance. 2. When new actor starts.
SaveState(context.Context) error

Activate(invokeName string) error
// Deactivate called before actor removed by actor manager
Deactivate() error

WithContext() ServerContext
}

type ReminderCallee interface {
Expand All @@ -93,6 +99,10 @@ type ServerImplBaseCtx struct {
lock sync.RWMutex
}

func (s *ServerImplBaseCtx) WithContext() ServerContext {
return nil
}

// Deprecated: Use ServerImplBaseCtx instead.
func (b *ServerImplBase) SetStateManager(stateManager StateManager) {
b.lock.Lock()
Expand Down Expand Up @@ -183,6 +193,11 @@ func (b *ServerImplBaseCtx) SaveState(ctx context.Context) error {
return nil
}

// Deprecated: Use ServerImplBaseCtx instead.
/*func (b *ServerImplBaseCtx) WithContext() ServerContext {
return b.WithContext()
}*/

// Deprecated: StateManager is deprecated in favour of StateManagerContext.
type StateManager interface {
// Add is to add new state store with @stateName and @value
Expand Down
6 changes: 6 additions & 0 deletions actor/manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ActorContainer interface {
type ActorContainerContext interface {
Invoke(ctx context.Context, methodName string, param []byte) ([]reflect.Value, actorErr.ActorErr)
GetActor() actor.ServerContext
Deactivate() error
}

// DefaultActorContainer contains actor instance and methods type info
Expand All @@ -55,6 +56,11 @@ type DefaultActorContainerContext struct {
serializer codec.Codec
}

func (d *DefaultActorContainerContext) Deactivate() error {
//TODO implement me
return d.actor.Deactivate()
}

// NewDefaultActorContainer creates a new ActorContainer with provider impl actor and serializer.
// Deprecated: use NewDefaultActorContainerContext instead.
//
Expand Down
57 changes: 51 additions & 6 deletions actor/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type ActorManagerContext interface {
DeactivateActor(ctx context.Context, actorID string) actorErr.ActorErr
InvokeReminder(ctx context.Context, actorID, reminderName string, params []byte) actorErr.ActorErr
InvokeTimer(ctx context.Context, actorID, timerName string, params []byte) actorErr.ActorErr
InvokeActors(methodName string, request []byte) actorErr.ActorErr
KillAllActors() actorErr.ActorErr
}

// DefaultActorManagerContext is to manage one type of actor.
Expand All @@ -80,6 +82,8 @@ type DefaultActorManagerContext struct {
// Deprecated: use DefaultActorManagerContext instead.
type DefaultActorManager struct {
ctx ActorManagerContext
// activeActors stores the map actorID -> ActorContainer
activeActors sync.Map
}

// Deprecated: use DefaultActorManagerContext instead.
Expand Down Expand Up @@ -113,6 +117,47 @@ func (m *DefaultActorManager) InvokeTimer(actorID, timerName string, params []by
return m.ctx.InvokeTimer(context.Background(), actorID, timerName, params)
}

func (m *DefaultActorManager) InvokeActors(methodName string, request []byte) actorErr.ActorErr {
m.activeActors.Range(func(key, value interface{}) bool {
return func() bool {
go func() {
defer func() {
if err := recover(); err != nil {
log.Printf("InvokeActors recover, methodName:%s, request:%s", methodName, string(request))
}
}()
out, err := m.InvokeMethod(key.(string), methodName, request)
if err != actorErr.Success {
log.Printf("InvokeActors, methodName:%s, request:%s, out:%s, err:%v", methodName, string(request), string(out), err)
}
}()
return true
}()
})
return actorErr.Success
}

func (m *DefaultActorManager) KillAllActors() actorErr.ActorErr {
var actorIds []string
m.activeActors.Range(func(key, value interface{}) bool {
return func() bool {
actorIds = append(actorIds, key.(string))
return true
}()
})
for _, actorId := range actorIds {
func() {
defer func() {
if err := recover(); err != nil {
log.Printf("KillAllActors recover, actorId:%s", actorId)
}
}()
m.DeactivateActor(actorId)
}()
}
return actorErr.Success
}

func NewDefaultActorManagerContext(serializerType string) (ActorManagerContext, actorErr.ActorErr) {
serializer, err := codec.GetActorCodec(serializerType)
if err != nil {
Expand Down Expand Up @@ -141,7 +186,7 @@ func (m *DefaultActorManagerContext) getAndCreateActorContainerIfNotExist(ctx co
return nil, actorErr.ErrSaveStateFailed
}
// save state of this actor
err = newContainer.GetActor().SaveState()
err = newContainer.GetActor().SaveState(ctx)
if err != nil {
return nil, actorErr.ErrSaveStateFailed
}
Expand Down Expand Up @@ -198,7 +243,7 @@ func (m *DefaultActorManagerContext) DeactivateActor(_ context.Context, actorID
if !ok {
return actorErr.ErrActorIDNotFound
}
actor.(ActorContainer).Deactivate()
//actor.(ActorContainerContext).Deactivate()
m.activeActors.Delete(actorID)
return actorErr.Success
}
Expand Down Expand Up @@ -244,7 +289,7 @@ func (m *DefaultActorManagerContext) InvokeTimer(ctx context.Context, actorID, t
return aerr
}

func (m *DefaultActorManager) InvokeActors(methodName string, request []byte) actorErr.ActorErr {
func (m *DefaultActorManagerContext) InvokeActors(methodName string, request []byte) actorErr.ActorErr {
m.activeActors.Range(func(key, value interface{}) bool {
return func() bool {
go func() {
Expand All @@ -253,7 +298,7 @@ func (m *DefaultActorManager) InvokeActors(methodName string, request []byte) ac
log.Printf("InvokeActors recover, methodName:%s, request:%s", methodName, string(request))
}
}()
out, err := m.InvokeMethod(key.(string), methodName, request)
out, err := m.InvokeMethod(context.Background(), key.(string), methodName, request)
if err != actorErr.Success {
log.Printf("InvokeActors, methodName:%s, request:%s, out:%s, err:%v", methodName, string(request), string(out), err)
}
Expand All @@ -264,7 +309,7 @@ func (m *DefaultActorManager) InvokeActors(methodName string, request []byte) ac
return actorErr.Success
}

func (m *DefaultActorManager) KillAllActors() actorErr.ActorErr {
func (m *DefaultActorManagerContext) KillAllActors() actorErr.ActorErr {
var actorIds []string
m.activeActors.Range(func(key, value interface{}) bool {
return func() bool {
Expand All @@ -279,7 +324,7 @@ func (m *DefaultActorManager) KillAllActors() actorErr.ActorErr {
log.Printf("KillAllActors recover, actorId:%s", actorId)
}
}()
m.DeactivateActor(actorId)
m.DeactivateActor(context.Background(), actorId)
}()
}
return actorErr.Success
Expand Down
12 changes: 11 additions & 1 deletion actor/runtime/actor_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

// Deprecated: use ActorRunTimeContext instead.
type ActorRunTime struct {
ctx *ActorRunTimeContext
ctx *ActorRunTimeContext
actorManagers sync.Map
}

type ActorRunTimeContext struct {
Expand Down Expand Up @@ -130,6 +131,15 @@ func (r *ActorRunTimeContext) InvokeTimer(ctx context.Context, actorTypeName, ac
return mng.InvokeTimer(ctx, actorID, timerName, params)
}

func (r *ActorRunTimeContext) KillAllActors(actorTypeName string) actorErr.ActorErr {
fmt.Println("KillAllActors:", actorTypeName)
targetManager, ok := r.actorManagers.Load(actorTypeName)
if !ok {
return actorErr.ErrActorTypeNotFound
}
return targetManager.(manager.ActorManagerContext).KillAllActors()
}

// Deprecated: use ActorRunTimeContext instead.
func (r *ActorRunTime) RegisterActorFactory(f actor.Factory, opt ...config.Option) {
r.ctx.RegisterActorFactory(func() actor.ServerContext { return f().WithContext() }, opt...)
Expand Down

0 comments on commit db7c9a8

Please sign in to comment.