Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote unreachable event when message fails to be sent remotely #172

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Remote struct {
addr string
engine *actor.Engine
config Config
streamReader *streamReader
streamRouterPID *actor.PID
stopCh chan struct{} // Stop closes this channel to signal the remote to stop listening.
stopWg *sync.WaitGroup
Expand All @@ -57,7 +56,6 @@ func New(addr string, config Config) *Remote {
config: config,
}
r.state.Store(stateInitialized)
r.streamReader = newStreamReader(r)
return r
}

Expand All @@ -81,7 +79,7 @@ func (r *Remote) Start(e *actor.Engine) error {
}
slog.Debug("listening", "addr", r.addr)
mux := drpcmux.New()
err = DRPCRegisterRemote(mux, r.streamReader)
err = DRPCRegisterRemote(mux, newStreamReader(r))
if err != nil {
return fmt.Errorf("failed to register remote: %w", err)
}
Expand Down
31 changes: 31 additions & 0 deletions remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,37 @@ func TestWeird(t *testing.T) {
wg.Wait() // wait for the actor to stop.
}

func TestStreamWriterRemoteUnreachableEvent(t *testing.T) {
a, _, err := makeRemoteEngine(getRandomLocalhostAddr())
assert.NoError(t, err)
b, rb, err := makeRemoteEngine(getRandomLocalhostAddr())
assert.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)

a.SpawnFunc(func(c *actor.Context) {
switch c.Message().(type) {
case actor.Started:
c.Engine().Subscribe(c.PID())
case actor.RemoteUnreachableEvent:
wg.Done()
}
}, "listener")

bPID := b.SpawnFunc(func(c *actor.Context) {
switch c.Message().(type) {
case actor.Started:
c.Engine().Subscribe(c.PID())
case *TestMessage:
rb.Stop()
}
}, "listener")

a.Send(bPID, &TestMessage{Data: []byte("test")})
wg.Wait()
}

func makeRemoteEngine(listenAddr string) (*actor.Engine, *Remote, error) {
var e *actor.Engine
r := New(listenAddr, NewConfig())
Expand Down
16 changes: 6 additions & 10 deletions remote/stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type streamDeliver struct {
msg any
}

type terminateStream struct {
address string
}

type streamRouter struct {
engine *actor.Engine
// streams is a map of remote address to stream writer pid.
Expand All @@ -41,16 +37,16 @@ func (s *streamRouter) Receive(ctx *actor.Context) {
s.pid = ctx.PID()
case *streamDeliver:
s.deliverStream(msg)
case terminateStream:
case actor.RemoteUnreachableEvent:
s.handleTerminateStream(msg)
}
}

func (s *streamRouter) handleTerminateStream(msg terminateStream) {
streamWriterPID := s.streams[msg.address]
delete(s.streams, msg.address)
slog.Debug("terminating stream",
"remote", msg.address,
func (s *streamRouter) handleTerminateStream(msg actor.RemoteUnreachableEvent) {
streamWriterPID := s.streams[msg.ListenAddr]
delete(s.streams, msg.ListenAddr)
slog.Debug("stream terminated",
"remote", msg.ListenAddr,
"pid", streamWriterPID,
)
}
Expand Down
11 changes: 5 additions & 6 deletions remote/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func (s *streamWriter) init() {
)
for i := 0; i < maxRetries; i++ {
// Here we try to connect to the remote address.
// Todo: can we make an Event here in case of failure?
switch s.tlsConfig {
case nil:
rawconn, err = net.Dial("tcp", s.writeToAddr)
Expand All @@ -142,10 +141,6 @@ func (s *streamWriter) init() {
// We could not reach the remote after retrying N times. Hence, shutdown the stream writer.
// and notify RemoteUnreachableEvent.
if rawconn == nil {
evt := actor.RemoteUnreachableEvent{
ListenAddr: s.writeToAddr,
}
s.engine.BroadcastEvent(evt)
s.Shutdown(nil)
return
}
Expand Down Expand Up @@ -183,8 +178,12 @@ func (s *streamWriter) init() {
}()
}

// TODO: is there a way that stream router can listen to event stream
// instead of sending the event itself?
func (s *streamWriter) Shutdown(wg *sync.WaitGroup) {
s.engine.Send(s.routerPID, terminateStream{address: s.writeToAddr})
evt := actor.RemoteUnreachableEvent{ListenAddr: s.writeToAddr}
s.engine.Send(s.routerPID, evt)
s.engine.BroadcastEvent(evt)
if s.stream != nil {
s.stream.Close()
}
Expand Down
Loading