diff --git a/remote/remote.go b/remote/remote.go index e4711f9..8521071 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -36,7 +36,6 @@ type Remote struct { addr string engine *actor.Engine config Config - streamReader *streamReader streamRouterPID *actor.PID stopCh chan struct{} // Stop closes this channel to signal the remote to stop listening. stopWg *sync.WaitGroup @@ -57,7 +56,6 @@ func New(addr string, config Config) *Remote { config: config, } r.state.Store(stateInitialized) - r.streamReader = newStreamReader(r) return r } @@ -81,7 +79,7 @@ func (r *Remote) Start(e *actor.Engine) error { } slog.Debug("listening", "addr", r.addr) mux := drpcmux.New() - err = DRPCRegisterRemote(mux, r.streamReader) + err = DRPCRegisterRemote(mux, newStreamReader(r)) if err != nil { return fmt.Errorf("failed to register remote: %w", err) } diff --git a/remote/remote_test.go b/remote/remote_test.go index 0564c8d..aabb59b 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -215,6 +215,37 @@ func TestWeird(t *testing.T) { wg.Wait() // wait for the actor to stop. } +func TestStreamWriterRemoteUnreachableEvent(t *testing.T) { + a, _, err := makeRemoteEngine(getRandomLocalhostAddr()) + assert.NoError(t, err) + b, rb, err := makeRemoteEngine(getRandomLocalhostAddr()) + assert.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(1) + + a.SpawnFunc(func(c *actor.Context) { + switch c.Message().(type) { + case actor.Started: + c.Engine().Subscribe(c.PID()) + case actor.RemoteUnreachableEvent: + wg.Done() + } + }, "listener") + + bPID := b.SpawnFunc(func(c *actor.Context) { + switch c.Message().(type) { + case actor.Started: + c.Engine().Subscribe(c.PID()) + case *TestMessage: + rb.Stop() + } + }, "listener") + + a.Send(bPID, &TestMessage{Data: []byte("test")}) + wg.Wait() +} + func makeRemoteEngine(listenAddr string) (*actor.Engine, *Remote, error) { var e *actor.Engine r := New(listenAddr, NewConfig()) diff --git a/remote/stream_router.go b/remote/stream_router.go index 62883b9..00f9f4a 100644 --- a/remote/stream_router.go +++ b/remote/stream_router.go @@ -13,10 +13,6 @@ type streamDeliver struct { msg any } -type terminateStream struct { - address string -} - type streamRouter struct { engine *actor.Engine // streams is a map of remote address to stream writer pid. @@ -41,16 +37,16 @@ func (s *streamRouter) Receive(ctx *actor.Context) { s.pid = ctx.PID() case *streamDeliver: s.deliverStream(msg) - case terminateStream: + case actor.RemoteUnreachableEvent: s.handleTerminateStream(msg) } } -func (s *streamRouter) handleTerminateStream(msg terminateStream) { - streamWriterPID := s.streams[msg.address] - delete(s.streams, msg.address) - slog.Debug("terminating stream", - "remote", msg.address, +func (s *streamRouter) handleTerminateStream(msg actor.RemoteUnreachableEvent) { + streamWriterPID := s.streams[msg.ListenAddr] + delete(s.streams, msg.ListenAddr) + slog.Debug("stream terminated", + "remote", msg.ListenAddr, "pid", streamWriterPID, ) } diff --git a/remote/stream_writer.go b/remote/stream_writer.go index eb84410..ab237da 100644 --- a/remote/stream_writer.go +++ b/remote/stream_writer.go @@ -117,7 +117,6 @@ func (s *streamWriter) init() { ) for i := 0; i < maxRetries; i++ { // Here we try to connect to the remote address. - // Todo: can we make an Event here in case of failure? switch s.tlsConfig { case nil: rawconn, err = net.Dial("tcp", s.writeToAddr) @@ -142,10 +141,6 @@ func (s *streamWriter) init() { // We could not reach the remote after retrying N times. Hence, shutdown the stream writer. // and notify RemoteUnreachableEvent. if rawconn == nil { - evt := actor.RemoteUnreachableEvent{ - ListenAddr: s.writeToAddr, - } - s.engine.BroadcastEvent(evt) s.Shutdown(nil) return } @@ -183,8 +178,12 @@ func (s *streamWriter) init() { }() } +// TODO: is there a way that stream router can listen to event stream +// instead of sending the event itself? func (s *streamWriter) Shutdown(wg *sync.WaitGroup) { - s.engine.Send(s.routerPID, terminateStream{address: s.writeToAddr}) + evt := actor.RemoteUnreachableEvent{ListenAddr: s.writeToAddr} + s.engine.Send(s.routerPID, evt) + s.engine.BroadcastEvent(evt) if s.stream != nil { s.stream.Close() }