Skip to content

Commit

Permalink
Fixchat take 2 (#83)
Browse files Browse the repository at this point in the history
* clean up the chat example.

* tweaks.

* wait for the poison to take effect before quitting.

* tracking clients and users in separate maps
  • Loading branch information
perbu authored Dec 4, 2023
1 parent 1142617 commit 2338c21
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
30 changes: 20 additions & 10 deletions examples/chat/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,54 @@ import (
"bufio"
"flag"
"fmt"
"log/slog"
"os"

"github.com/anthdm/hollywood/log"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/examples/chat/types"
"github.com/anthdm/hollywood/log"
"github.com/anthdm/hollywood/remote"
"log/slog"
"math/rand"
"os"
)

type client struct {
username string
serverPID *actor.PID
logger *slog.Logger
}

func newClient(username string, serverPID *actor.PID) actor.Producer {
return func() actor.Receiver {
return &client{
username: username,
serverPID: serverPID,
logger: slog.Default(),
}
}
}

func (c *client) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case *types.Message:
fmt.Printf("username: %s :: %s\n", msg.Username, msg.Msg)
fmt.Printf("%s: %s\n", msg.Username, msg.Msg)
case actor.Started:
ctx.Send(c.serverPID, &types.Connect{
Username: c.username,
})
case actor.Stopped:
c.logger.Info("client stopped")
}
}

func main() {
var (
listenAt = flag.String("listen", "127.0.0.1:3000", "")
connectTo = flag.String("connect", "127.0.0.1:4000", "")
username = flag.String("username", "", "")
listenAt = flag.String("listen", "", "specify address to listen to, will pick a random port if not specified")
connectTo = flag.String("connect", "127.0.0.1:4000", "the address of the server to connect to")
username = flag.String("username", os.Getenv("USER"), "")
)
flag.Parse()

if *listenAt == "" {
*listenAt = fmt.Sprintf("127.0.0.1:%d", rand.Int31n(50000)+10000)
}
rem := remote.New(remote.Config{
ListenAddr: *listenAt,
})
Expand All @@ -64,13 +68,17 @@ func main() {
clientPID = e.Spawn(newClient(*username, serverPID), "client")
scanner = bufio.NewScanner(os.Stdin)
)
fmt.Println("Type 'quit' and press return to exit.")
for scanner.Scan() {
msg := &types.Message{
Msg: scanner.Text(),
Username: *username,
}
// We use SendWithSender here so the server knows who
// is sending the message.
if msg.Msg == "quit" {
break
}
e.SendWithSender(serverPID, msg, clientPID)
}
if err := scanner.Err(); err != nil {
Expand All @@ -80,4 +88,6 @@ func main() {
// When breaked out of the loop on error let the server know
// we need to disconnect.
e.SendWithSender(serverPID, &types.Disconnect{}, clientPID)
e.Poison(clientPID).Wait()
slog.Info("client disconnected")
}
48 changes: 36 additions & 12 deletions examples/chat/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,68 @@ import (
"github.com/anthdm/hollywood/remote"
)

type clientMap map[string]*actor.PID
type userMap map[string]string

type server struct {
clients map[*actor.PID]string
clients clientMap // key: address value: *pid
users userMap // key: address value: username
logger *slog.Logger
}

func newServer() actor.Receiver {
return &server{
clients: make(map[*actor.PID]string),
clients: make(clientMap),
users: make(userMap),
logger: slog.Default(),
}
}

func (s *server) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case *types.Message:
s.logger.Info("message received", "msg", msg.Msg, "from", ctx.Sender())
s.handleMessage(ctx)
case *types.Disconnect:
username, ok := s.clients[ctx.Sender()]
cAddr := ctx.Sender().GetAddress()
pid, ok := s.clients[cAddr]
if !ok {
s.logger.Warn("unknown client disconnected", "client", pid.Address)
return
}
username, ok := s.users[cAddr]
if !ok {
// ignore a non existing client
s.logger.Warn("unknown user disconnected", "client", pid.Address)
return
}
delete(s.clients, ctx.Sender())
slog.Info("client disconnected",
"pid", ctx.Sender(),
"username", username)
s.logger.Info("client disconnected", "username", username)
delete(s.clients, cAddr)
delete(s.users, username)
case *types.Connect:
s.clients[ctx.Sender()] = msg.Username
cAddr := ctx.Sender().GetAddress()
if _, ok := s.clients[cAddr]; ok {
s.logger.Warn("client already connected", "client", ctx.Sender().GetID())
return
}
if _, ok := s.users[cAddr]; ok {
s.logger.Warn("user already connected", "client", ctx.Sender().GetID())
return
}
s.clients[cAddr] = ctx.Sender()
s.users[cAddr] = msg.Username
slog.Info("new client connected",
"pid", ctx.Sender(),
"id", ctx.Sender().GetID(), "addr", ctx.Sender().GetAddress(), "sender", ctx.Sender(),
"username", msg.Username,
)
}
}

// handle the incoming message by broadcasting it to all connected clients.
func (s *server) handleMessage(ctx *actor.Context) {
for pid := range s.clients {
// dont send message to ourselves
for _, pid := range s.clients {
// dont send message to the place where it came from.
if !pid.Equals(ctx.Sender()) {
s.logger.Info("forwarding message", "pid", pid.ID, "addr", pid.Address, "msg", ctx.Message())
ctx.Forward(pid)
}
}
Expand Down

0 comments on commit 2338c21

Please sign in to comment.