Skip to content

Commit

Permalink
ping pong TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Dec 12, 2023
1 parent 97f4558 commit 7fdf5af
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 14 deletions.
2 changes: 1 addition & 1 deletion actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (d *deadLetter) Receive(ctx *Context) {
case Initialized:
slog.Debug("default deadletter actor initialized")
case *DeadLetterEvent:
slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg),
slog.Warn("[DEADLETTER]", "msg", reflect.TypeOf(msg.Message),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
slog.Error("unknown message arrived at deadletter", "msg", msg)
Expand Down
3 changes: 2 additions & 1 deletion actor/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actor

import (
"context"
"math"
"math/rand"
"strconv"
"sync"
Expand All @@ -20,7 +21,7 @@ func NewResponse(e *Engine, timeout time.Duration) *Response {
engine: e,
result: make(chan any, 1),
timeout: timeout,
pid: NewPID(e.address, "response", strconv.Itoa(rand.Intn(100000))),
pid: NewPID(e.address, "response", strconv.Itoa(rand.Intn(math.MaxInt))),
}
}

Expand Down
4 changes: 0 additions & 4 deletions cluster/agent.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cluster

import (
fmt "fmt"
"log/slog"

"github.com/anthdm/hollywood/actor"
Expand Down Expand Up @@ -33,9 +32,6 @@ func (a *Agent) handleMembers(members []*Member) {
joined := NewMemberSet(members...).Except(a.members.Slice())
left := a.members.Except(members)

fmt.Println("joined", joined)
fmt.Println("left", left)

for _, member := range joined {
a.memberJoin(member)
}
Expand Down
15 changes: 7 additions & 8 deletions cluster/selfmanaged.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cluster

import (
fmt "fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -63,25 +62,25 @@ func (s *SelfManaged) Receive(c *actor.Context) {
s.cluster.engine.Send(s.cluster.PID(), members)
}
case memberPing:
fmt.Println("pinging all the members", s.members.Len())
// fmt.Println("pinging all the members", s.members.Len())
s.members.ForEach(func(member *Member) bool {
ping := &actor.Ping{
From: c.PID(),
}
_, err := c.Request(memberToProviderPID(member), ping, time.Millisecond*100).Result()
pong, err := c.Request(memberToProviderPID(member), ping, time.Millisecond*1000).Result()
if err != nil {
slog.Error("member ping failed", "err", err, "memberID", member.ID)
s.removeMember(member)
}
// TODO: Something is not quite right here!
// if _, ok := pong.(*actor.Pong); !ok {
// slog.Error("member ping failed", "err", err, "memberID", member.ID)
// s.removeMember(member)
// }
if _, ok := pong.(*actor.Pong); !ok {
slog.Error("member ping failed", "err", err, "memberID", member.ID)
s.removeMember(member)
}
return true
})
case *actor.Ping:
// c.Respond(&actor.Pong{From: c.PID()})
c.Respond(&actor.Pong{From: c.PID()})
}
}

Expand Down

0 comments on commit 7fdf5af

Please sign in to comment.