diff --git a/actor/context.go b/actor/context.go index 03bc749..4a54967 100644 --- a/actor/context.go +++ b/actor/context.go @@ -1,10 +1,10 @@ package actor import ( + "log/slog" "strings" "time" - "github.com/anthdm/hollywood/log" "github.com/anthdm/hollywood/safemap" ) @@ -19,7 +19,6 @@ type Context struct { // when the child dies. parentCtx *Context children *safemap.SafeMap[string, *PID] - logger log.Logger } func newContext(e *Engine, pid *PID) *Context { @@ -27,7 +26,6 @@ func newContext(e *Engine, pid *PID) *Context { engine: e, pid: pid, children: safemap.New[string, *PID](), - logger: e.logger.SubLogger("[context]"), } } @@ -45,7 +43,7 @@ func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response { // Respond will sent the given message to the sender of the current received message. func (c *Context) Respond(msg any) { if c.sender == nil { - c.logger.Warnw("context got no sender", "func", "Respond", "pid", c.PID()) + slog.Warn("context got no sender", "func", "Respond", "pid", c.PID()) return } c.engine.Send(c.sender, msg) diff --git a/actor/deadletter.go b/actor/deadletter.go index b5fda1d..8e034e6 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -1,17 +1,14 @@ package actor import ( - fmt "fmt" + "log/slog" "reflect" - - "github.com/anthdm/hollywood/log" ) // type deadLetter struct { - logger log.Logger - pid *PID + pid *PID } func newDeadLetter() Receiver { @@ -29,17 +26,15 @@ func (d *deadLetter) Receive(ctx *Context) { switch msg := ctx.Message().(type) { case Started: // intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise. - d.logger = ctx.Engine().logger.SubLogger("[deadletter]") - d.logger.Debugw("default deadletter actor started") + slog.Debug("default deadletter actor started") case Stopped: - d.logger.Debugw("default deadletter actor stopped") + slog.Debug("default deadletter actor stopped") case Initialized: - d.logger.Debugw("default deadletter actor initialized") + slog.Debug("default deadletter actor initialized") case *DeadLetterEvent: - fmt.Println("received deadletter", msg) - d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg), + slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg), "sender", msg.Sender, "target", msg.Target, "msg", msg.Message) default: - d.logger.Errorw("unknown message arrived", "msg", msg) + slog.Error("unknown message arrived at deadletter", "msg", msg) } } diff --git a/actor/deadletter_test.go b/actor/deadletter_test.go index ab583b5..59ec3f4 100644 --- a/actor/deadletter_test.go +++ b/actor/deadletter_test.go @@ -4,12 +4,10 @@ import ( "bytes" "fmt" "log/slog" - "os" "sync" "testing" "time" - "github.com/anthdm/hollywood/log" "github.com/stretchr/testify/assert" ) @@ -18,8 +16,10 @@ import ( // received the message. func TestDeadLetterDefault(t *testing.T) { logBuffer := SafeBuffer{} - lh := log.NewHandler(&logBuffer, log.TextFormat, slog.LevelDebug) - e, err := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh))) + lh := slog.NewTextHandler(&logBuffer, nil) + logger := slog.New(lh) + slog.SetDefault(logger) + e, err := NewEngine() assert.NoError(t, err) a1 := e.Spawn(newTestActor, "a1") assert.NotNil(t, a1) @@ -39,9 +39,7 @@ func TestDeadLetterDefault(t *testing.T) { // received the message. // It is using the custom deadletter receiver below. func TestDeadLetterCustom(t *testing.T) { - lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) e, err := NewEngine( - EngineOptLogger(log.NewLogger("[engine]", lh)), EngineOptDeadletter(newCustomDeadLetter)) assert.NoError(t, err) a1 := e.Spawn(newTestActor, "a1") diff --git a/actor/engine.go b/actor/engine.go index 2f1bd9e..872ef21 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -5,14 +5,12 @@ import ( reflect "reflect" "sync" "time" - - "github.com/anthdm/hollywood/log" ) type Remoter interface { Address() string Send(*PID, any, *PID) - Start(*Engine, log.Logger) error + Start(*Engine) error } // Producer is any function that can return a Receiver @@ -31,7 +29,6 @@ type Engine struct { remote Remoter deadLetter *PID eventStream *PID - logger log.Logger initErrors []error } @@ -56,26 +53,18 @@ func NewEngine(opts ...func(*Engine)) (*Engine, error) { e.eventStream = e.Spawn(NewEventStream(), "eventstream") // if no deadletter is registered, we will register the default deadletter from deadletter.go if e.deadLetter == nil { - e.logger.Debugw("no deadletter receiver set, registering default") e.deadLetter = e.Spawn(newDeadLetter, "deadletter") } return e, nil } -// EngineOptLogger configured the engine with a logger from the internal log package -func EngineOptLogger(logger log.Logger) func(*Engine) { - return func(e *Engine) { - e.logger = logger - } -} - // TODO: Doc func EngineOptRemote(r Remoter) func(*Engine) { return func(e *Engine) { e.remote = r e.address = r.Address() // TODO: potential error not handled here - r.Start(e, e.logger) + r.Start(e) } } @@ -101,7 +90,7 @@ func EngineOptDeadletter(d Producer) func(*Engine) { func (e *Engine) WithRemote(r Remoter) { e.remote = r e.address = r.Address() - r.Start(e, e.logger) + r.Start(e) } // Spawn spawns a process that will producer by the given Producer and diff --git a/actor/process.go b/actor/process.go index a92c8c7..af176b8 100644 --- a/actor/process.go +++ b/actor/process.go @@ -2,11 +2,10 @@ package actor import ( "fmt" + "log/slog" "runtime/debug" "sync" "time" - - "github.com/anthdm/hollywood/log" ) type Envelope struct { @@ -32,7 +31,6 @@ type process struct { restarts int32 mbuffer []Envelope - logger log.Logger } func newProcess(e *Engine, opts Opts) *process { @@ -44,7 +42,6 @@ func newProcess(e *Engine, opts Opts) *process { Opts: opts, context: ctx, mbuffer: nil, - logger: e.logger.SubLogger(opts.Name), } p.inbox.Start(p) return p @@ -129,7 +126,7 @@ func (p *process) Start() { p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) p.context.engine.BroadcastEvent(ActorStartedEvent{PID: p.pid}) - p.logger.Debugw("actor started", "pid", p.pid) + slog.Debug("actor started", "pid", p.pid) // If we have messages in our buffer, invoke them. if len(p.mbuffer) > 0 { p.Invoke(p.mbuffer) @@ -144,7 +141,7 @@ func (p *process) tryRestart(v any) { // back up. NOTE: not sure if that is the best option. What if that // node never comes back up again? if msg, ok := v.(*InternalError); ok { - p.logger.Errorw(msg.From, "err", msg.Err) + slog.Error(msg.From, "err", msg.Err) time.Sleep(p.Opts.RestartDelay) p.Start() return @@ -154,7 +151,7 @@ func (p *process) tryRestart(v any) { // If we reach the max restarts, we shutdown the inbox and clean // everything up. if p.restarts == p.MaxRestarts { - p.logger.Errorw("max restarts exceeded, shutting down...", + slog.Error("max restarts exceeded, shutting down...", "pid", p.pid, "restarts", p.restarts) p.cleanup(nil) return @@ -162,7 +159,7 @@ func (p *process) tryRestart(v any) { p.restarts++ // Restart the process after its restartDelay - p.logger.Errorw("actor restarting", + slog.Error("actor restarting", "n", p.restarts, "maxRestarts", p.MaxRestarts, "pid", p.pid, @@ -196,7 +193,7 @@ func (p *process) cleanup(wg *sync.WaitGroup) { proc.Shutdown(wg) } } - p.logger.Debugw("shutdown", "pid", p.pid) + slog.Debug("shutdown", "pid", p.pid) p.context.engine.BroadcastEvent(ActorStoppedEvent{PID: p.pid}) if wg != nil { wg.Done() diff --git a/actor/registry.go b/actor/registry.go index 317b184..04f9b1e 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -1,9 +1,8 @@ package actor import ( + "log/slog" "sync" - - "github.com/anthdm/hollywood/log" ) const LocalLookupAddr = "local" @@ -12,14 +11,12 @@ type Registry struct { mu sync.RWMutex lookup map[string]Processer engine *Engine - logger log.Logger } func newRegistry(e *Engine) *Registry { return &Registry{ lookup: make(map[string]Processer, 1024), engine: e, - logger: e.logger.SubLogger("[registry]"), } } @@ -56,7 +53,7 @@ func (r *Registry) add(proc Processer) { defer r.mu.Unlock() id := proc.PID().ID if _, ok := r.lookup[id]; ok { - r.logger.Warnw("process already registered", + slog.Warn("process already registered", "pid", proc.PID(), ) return diff --git a/examples/chat/client/main.go b/examples/chat/client/main.go index 8a5747a..d4b8673 100644 --- a/examples/chat/client/main.go +++ b/examples/chat/client/main.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/examples/chat/types" - "github.com/anthdm/hollywood/log" "github.com/anthdm/hollywood/remote" "log/slog" "math/rand" @@ -55,7 +54,7 @@ func main() { rem := remote.New(remote.Config{ ListenAddr: *listenAt, }) - e, err := actor.NewEngine(actor.EngineOptLogger(log.Default()), actor.EngineOptRemote(rem)) + e, err := actor.NewEngine(actor.EngineOptRemote(rem)) if err != nil { slog.Error("failed to create engine", "err", err) os.Exit(1) diff --git a/examples/helloworld/main.go b/examples/helloworld/main.go index 281c54b..f468332 100644 --- a/examples/helloworld/main.go +++ b/examples/helloworld/main.go @@ -2,10 +2,6 @@ package main import ( "fmt" - "github.com/anthdm/hollywood/log" - "log/slog" - "os" - "github.com/anthdm/hollywood/actor" ) @@ -31,8 +27,8 @@ func (f *foo) Receive(ctx *actor.Context) { } func main() { - lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug) - engine, err := actor.NewEngine(actor.EngineOptLogger(log.NewLogger("[engine]", lh))) + + engine, err := actor.NewEngine() if err != nil { panic(err) } diff --git a/log/log.go b/log/log.go deleted file mode 100644 index d2fbffa..0000000 --- a/log/log.go +++ /dev/null @@ -1,97 +0,0 @@ -package log - -import ( - "io" - "log/slog" - "os" -) - -// Logger keeps track of the logger. The baselogger points to the original logger -// and slogger is a reference to the current logger -type Logger struct { - slogger *slog.Logger - baselogger *slog.Logger -} - -type LoggerFormat uint32 - -const ( - JsonFormat LoggerFormat = iota - TextFormat -) - -// NewLogger creates a new logger with the given name and handler -func NewLogger(name string, handler slog.Handler) Logger { - logger := slog.New(handler) - return Logger{ - slogger: logger.With("log", name), - baselogger: logger, - } -} - -// SubLogger returns a new logger with the given name as a sublogger -func (l Logger) SubLogger(name string) Logger { - if l.slogger == nil { // no-op logger - return Logger{} - } - return Logger{ - slogger: l.baselogger.With("log", name), - baselogger: l.baselogger, - } -} - -// Default returns a logger that logs to stdout with the -// TextFormat and log level Info. This is the recommended logger to use -// You can supply your own logger if you want to, using NewLogger and NewHandler -func Default() Logger { - return NewLogger("[engine]", NewHandler(os.Stdout, TextFormat, slog.LevelInfo)) -} - -// Debug returns a logger that logs to stdout with the -// TextFormat and log level Debug. This is the recommended logger to use when debugging. -func Debug() Logger { - return NewLogger("[engine]", NewHandler(os.Stdout, TextFormat, slog.LevelDebug)) -} - -func NewHandler(w io.Writer, format LoggerFormat, loglevel slog.Level) slog.Handler { - switch format { - case JsonFormat: - return slog.NewJSONHandler(w, &slog.HandlerOptions{ - Level: loglevel, - }) - case TextFormat: - return slog.NewTextHandler(w, &slog.HandlerOptions{ - Level: loglevel, - }) - default: - panic("unknown format") // can't happen - } -} - -func (l Logger) Infow(msg string, args ...any) { - if l.slogger == nil { - return - } - l.slogger.Info(msg, args...) -} - -func (l Logger) Debugw(msg string, args ...any) { - if l.slogger == nil { - return - } - l.slogger.Debug(msg, args...) -} - -func (l Logger) Warnw(msg string, args ...any) { - if l.slogger == nil { - return - } - l.slogger.Warn(msg, args...) -} - -func (l Logger) Errorw(msg string, args ...any) { - if l.slogger == nil { - return - } - l.slogger.Error(msg, args...) -} diff --git a/remote/remote.go b/remote/remote.go index 07934d2..3cd1e17 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -3,12 +3,12 @@ package remote import ( "context" "fmt" + "log/slog" "net" "sync" "sync/atomic" "github.com/anthdm/hollywood/actor" - "github.com/anthdm/hollywood/log" "storj.io/drpc/drpcmux" "storj.io/drpc/drpcserver" ) @@ -24,7 +24,6 @@ type Remote struct { config Config streamReader *streamReader streamRouterPID *actor.PID - logger log.Logger stopCh chan struct{} // Stop closes this channel to signal the remote to stop listening. stopWg *sync.WaitGroup state atomic.Uint32 @@ -47,18 +46,17 @@ func New(cfg Config) *Remote { return r } -func (r *Remote) Start(e *actor.Engine, logger log.Logger) error { +func (r *Remote) Start(e *actor.Engine) error { if r.state.Load() != stateInitialized { return fmt.Errorf("remote already started") } r.state.Store(stateRunning) - r.logger = logger r.engine = e ln, err := net.Listen("tcp", r.config.ListenAddr) if err != nil { panic("failed to listen: " + err.Error()) } - r.logger.Debugw("listening", "addr", r.config.ListenAddr) + slog.Debug("listening", "addr", r.config.ListenAddr) mux := drpcmux.New() err = DRPCRegisterRemote(mux, r.streamReader) if err != nil { @@ -66,8 +64,8 @@ func (r *Remote) Start(e *actor.Engine, logger log.Logger) error { } s := drpcserver.New(mux) - r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine, r.logger), "router", actor.WithInboxSize(1024*1024)) - r.logger.Infow("server started", "listenAddr", r.config.ListenAddr) + r.streamRouterPID = r.engine.Spawn(newStreamRouter(r.engine), "router", actor.WithInboxSize(1024*1024)) + slog.Info("server started", "listenAddr", r.config.ListenAddr) r.stopWg = &sync.WaitGroup{} r.stopWg.Add(1) r.stopCh = make(chan struct{}) @@ -76,15 +74,14 @@ func (r *Remote) Start(e *actor.Engine, logger log.Logger) error { defer r.stopWg.Done() err := s.Serve(ctx, ln) if err != nil { - r.logger.Errorw("drpcserver", "err", err) + slog.Error("drpcserver", "err", err) } else { - r.logger.Infow("drpcserver stopped") + slog.Debug("drpcserver stopped") } }() // wait for stopCh to be closed go func() { <-r.stopCh - r.logger.Debugw("cancelling context") cancel() }() return nil @@ -93,13 +90,13 @@ func (r *Remote) Start(e *actor.Engine, logger log.Logger) error { // Stop will stop the remote from listening. func (r *Remote) Stop() *sync.WaitGroup { if r.state.Load() != stateRunning { - r.logger.Warnw("remote already stopped but stop was called", "state", r.state.Load()) + slog.Warn("remote already stopped but stop was called", "state", r.state.Load()) return &sync.WaitGroup{} // return empty waitgroup so the caller can still wait without panicking. } - r.logger.Debugw("stopping remote") + slog.Debug("stopping remote") r.state.Store(stateStopped) r.stopCh <- struct{}{} - r.logger.Debugw("stop signal sent") + slog.Debug("stop signal sent") return r.stopWg } diff --git a/remote/remote_test.go b/remote/remote_test.go index 2ed1a80..e273600 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -2,7 +2,6 @@ package remote import ( "fmt" - "github.com/anthdm/hollywood/log" "math/rand" "net" "sync" @@ -158,11 +157,11 @@ func TestWeird(t *testing.T) { } }, "weirdactor") // let's start the remote once more. this should do nothing. - err = ra.Start(a, log.Debug()) + err = ra.Start(a) assert.Error(t, err) - err = ra.Start(a, log.Debug()) + err = ra.Start(a) assert.Error(t, err) - err = ra.Start(a, log.Debug()) + err = ra.Start(a) assert.Error(t, err) // Now stop it a few times to make sure it doesn't freeze or panic: ra.Stop().Wait() @@ -180,7 +179,7 @@ func makeRemoteEngine(listenAddr string) (*actor.Engine, *Remote, error) { case false: e, err = actor.NewEngine(actor.EngineOptRemote(r)) case true: - e, err = actor.NewEngine(actor.EngineOptLogger(log.Debug()), actor.EngineOptRemote(r)) + e, err = actor.NewEngine(actor.EngineOptRemote(r)) } if err != nil { return nil, nil, fmt.Errorf("actor.NewEngine: %w", err) diff --git a/remote/stream_reader.go b/remote/stream_reader.go index 458c06d..8d2e412 100644 --- a/remote/stream_reader.go +++ b/remote/stream_reader.go @@ -3,9 +3,9 @@ package remote import ( "context" "errors" + "log/slog" "github.com/anthdm/hollywood/actor" - "github.com/anthdm/hollywood/log" ) type streamReader struct { @@ -13,21 +13,18 @@ type streamReader struct { remote *Remote deserializer Deserializer - logger log.Logger } func newStreamReader(r *Remote) *streamReader { return &streamReader{ remote: r, deserializer: ProtoSerializer{}, - logger: r.logger.SubLogger("[stream_reader]"), } } func (r *streamReader) Receive(stream DRPCRemote_ReceiveStream) error { - defer func() { - r.logger.Debugw("terminated") - }() + + defer slog.Debug("streamreader terminated") for { envelope, err := stream.Recv() @@ -35,7 +32,7 @@ func (r *streamReader) Receive(stream DRPCRemote_ReceiveStream) error { if errors.Is(err, context.Canceled) { break } - r.logger.Errorw("receive", "err", err) + slog.Error("streamReader receive", "err", err) return err } @@ -43,7 +40,7 @@ func (r *streamReader) Receive(stream DRPCRemote_ReceiveStream) error { tname := envelope.TypeNames[msg.TypeNameIndex] payload, err := r.deserializer.Deserialize(msg.Data, tname) if err != nil { - r.logger.Errorw("deserialize", "err", err) + slog.Error("streamReader deserialize", "err", err) return err } target := envelope.Targets[msg.TargetIndex] diff --git a/remote/stream_router.go b/remote/stream_router.go index c99f8c2..ddd7ec3 100644 --- a/remote/stream_router.go +++ b/remote/stream_router.go @@ -2,7 +2,7 @@ package remote import ( "github.com/anthdm/hollywood/actor" - "github.com/anthdm/hollywood/log" + "log/slog" ) type streamDeliver struct { @@ -20,15 +20,13 @@ type streamRouter struct { // streams is a map of remote address to stream writer pid. streams map[string]*actor.PID pid *actor.PID - logger log.Logger } -func newStreamRouter(e *actor.Engine, l log.Logger) actor.Producer { +func newStreamRouter(e *actor.Engine) actor.Producer { return func() actor.Receiver { return &streamRouter{ streams: make(map[string]*actor.PID), engine: e, - logger: l.SubLogger("[stream_router]"), } } } @@ -47,7 +45,7 @@ func (s *streamRouter) Receive(ctx *actor.Context) { func (s *streamRouter) handleTerminateStream(msg terminateStream) { streamWriterPID := s.streams[msg.address] delete(s.streams, msg.address) - s.logger.Debugw("terminating stream", + slog.Debug("terminating stream", "remote", msg.address, "pid", streamWriterPID, ) @@ -62,12 +60,8 @@ func (s *streamRouter) deliverStream(msg *streamDeliver) { swpid, ok = s.streams[address] if !ok { - swlogger := s.logger.SubLogger("[stream_writer]") - swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address, swlogger)) + swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address)) s.streams[address] = swpid - s.logger.Debugw("new stream route", - "pid", swpid, - ) } s.engine.Send(swpid, msg) } diff --git a/remote/stream_writer.go b/remote/stream_writer.go index 1d05624..68d4058 100644 --- a/remote/stream_writer.go +++ b/remote/stream_writer.go @@ -4,12 +4,12 @@ import ( "context" "errors" "io" + "log/slog" "net" "sync" "time" "github.com/anthdm/hollywood/actor" - "github.com/anthdm/hollywood/log" "storj.io/drpc/drpcconn" ) @@ -28,10 +28,9 @@ type streamWriter struct { pid *actor.PID inbox actor.Inboxer serializer Serializer - logger log.Logger } -func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, logger log.Logger) actor.Processer { +func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string) actor.Processer { return &streamWriter{ writeToAddr: address, engine: e, @@ -39,7 +38,6 @@ func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, logger lo inbox: actor.NewInbox(streamWriterBatchSize), pid: actor.NewPID(e.Address(), "stream", address), serializer: ProtoSerializer{}, - logger: logger, } } @@ -72,7 +70,7 @@ func (s *streamWriter) Invoke(msgs []actor.Envelope) { b, err := s.serializer.Serialize(stream.msg) if err != nil { - s.logger.Errorw("serialize", "err", err) + slog.Error("serialize", "err", err) continue } @@ -96,7 +94,7 @@ func (s *streamWriter) Invoke(msgs []actor.Envelope) { _ = s.conn.Close() return } - s.logger.Errorw("failed sending message", + slog.Error("failed sending message", "err", err, ) } @@ -113,7 +111,7 @@ func (s *streamWriter) init() { for { rawconn, err = net.Dial("tcp", s.writeToAddr) if err != nil { - s.logger.Errorw("net.Dial", "err", err, "remote", s.writeToAddr) + slog.Error("net.Dial", "err", err, "remote", s.writeToAddr) time.Sleep(delay) continue } @@ -132,19 +130,19 @@ func (s *streamWriter) init() { stream, err := client.Receive(context.Background()) if err != nil { - s.logger.Errorw("receive", "err", err, "remote", s.writeToAddr) + slog.Error("receive", "err", err, "remote", s.writeToAddr) } s.stream = stream s.conn = conn - s.logger.Debugw("connected", + slog.Debug("connected", "remote", s.writeToAddr, ) go func() { <-s.conn.Closed() - s.logger.Debugw("lost connection", + slog.Debug("lost connection", "remote", s.writeToAddr, ) s.Shutdown(nil)