Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket-example #29

Closed
wants to merge 10 commits into from
65 changes: 65 additions & 0 deletions examples/websocket/handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"fmt"
"math/rand"
"strconv"
"time"

"github.com/anthdm/hollywood/actor"
"golang.org/x/net/websocket"
)

const (
MIN = 1
MAX = 300
)

type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan struct{})

func HandleFunc(f handleWithPid) websocket.Handler {
return func(c *websocket.Conn) {
defer fmt.Println("web socket is deleted")
// If we can't write to the socket later and we get-
// a "write error", we break this (websocket) scope.
//
// Generating a new goblin-process for websocket.
// Getting a quitChanel to break websocket handle scope.

pid, quitCh := f(c)
fmt.Println("new goblin-process is spawned: ", pid.ID)

for {
// Waiting for the "break call" from goblin-proces.
<-*quitCh

// Kill the websocket handling scope..
break
}
}
}

func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) {
// Create unique pid with salting for new goblins-processes.
now := time.Now()
rand.Seed(now.UnixNano())

randNum := rand.Intn(MAX-MIN+1) + MIN
uniquePid := strconv.Itoa(randNum + now.Nanosecond())

// Spawn a new goblin-process for incoming websocket.
pid := engine.Spawn(newGoblin, uniquePid)

// Create a channel to break the websocket handling scope.
quitCh := make(chan struct{})

// Send datas the created goblin-process.
// Than goblin-process will hold (the websoc-
// ket connection) and (the quitCh pointer).
engine.Send(pid, &initValues{
ws: ws,
quitCh: &quitCh,
})

return pid, &quitCh
}
206 changes: 206 additions & 0 deletions examples/websocket/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"fmt"
"io"
"log"
"net/http"
"sync"

"github.com/anthdm/hollywood/actor"
"golang.org/x/net/websocket"
)

var (
engine *actor.Engine
hodorProcessId *actor.PID
)

// It is storing goblin-process and coressponding websocket identities.
// Also it broadcasts messages to all goblin-processes.
func (f *hodorStorage) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case actor.Started:
fmt.Println("[HODOR] storage has started, id:", ctx.PID().ID)

//----------------------------
//-----CAUSES RACE COND.------
//----------------------------
// case *letterToHodor:
// fmt.Println("[HODOR] message has received from:", ctx.PID())
//----------------------------
//-----CAUSES RACE COND.------
//----------------------------

// Delete the incoming websocket value.
case *deleteFromHodor:

msg.deleteWsCh <- msg.ws

go func(f *hodorStorage, msg *deleteFromHodor) {
for wsocket := range msg.deleteWsCh {
delete(f.storage, wsocket)
close(msg.deleteWsCh)
break
}
}(f, msg)

// Add a new websocket and goblin-process pair.
case *addToHodor:

// Prepare a new map to send over channel.
newMap := make(map[*websocket.Conn]*actor.PID)
newMap[msg.ws] = msg.pid

msg.addWsCh <- addToHodor{
ws: msg.ws,
pid: msg.pid,
}

go func(f *hodorStorage, msg *addToHodor) {
for wsMap := range msg.addWsCh {
f.storage[wsMap.ws] = wsMap.pid
close(msg.addWsCh)
break
}
}(f, msg)

case *broadcastMessage:
go func(msg *broadcastMessage) {
for _, pid := range f.storage {
// Send data to its own goblin-processor for send message its own websocket.
// So goblin-process will write own message to own websocket.
engine.Send(pid, msg)
}
}(msg)

}
}

// Goblin-processes corresponding to websockets live here..
func (f *websocketGoblin) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case actor.Started:
fmt.Println("[WEBSOCKET] foo has started:", ctx.PID().ID)

case *initValues:
// If exist, make sure you have one websocket.
if f.exist {
break
}

// Change states.
f.ws = msg.ws
f.exist = true
f.quitCh = msg.quitCh

// Add websocket and corresponding goblin-processId pair to hodor-storage.
engine.Send(hodorProcessId, &addToHodor{
ws: msg.ws,
pid: ctx.PID(),
addWsCh: make(chan addToHodor),
})

ourWs := msg.ws

// Generate a reading loop. Read incoming messages.
go func(ourWs *websocket.Conn) {
buf := make([]byte, 1024)
for {
n, err := ourWs.Read(buf)
if err != nil {
if err == io.EOF {
break
}

fmt.Println("read error: ", err)
continue
}

msg := buf[:n]
msgStr := string(msg)
message := fmt.Sprintf("%s:%s", ourWs.RemoteAddr().String(), msgStr)

// If you want, show that in console.
fmt.Println("message.....:", message)

// Send message to the HODOR for broadcasting.
// HODOR will broadcast messages to all goblins.
engine.Send(hodorProcessId, &broadcastMessage{
data: message,
})

// If not exist, break the loop.
// No need to read from the client anymore.
if !f.exist {
break
}
}

}(ourWs)

// Broadcast messages to all goblin-processes.
case *broadcastMessage:
_, err := f.ws.Write([]byte(msg.data))
if err != nil {
engine.Send(ctx.PID(), &closeWebSocket{
ws: f.ws,
})
}

// Close the specific goblin-process and websocket pair.
case *closeWebSocket:
// Changing the f.exist value to false causing-
// the "reader loop" to break. So we don't need read anymore.
f.exist = false

// Close the channel. So that will break the websocket scope.
*f.quitCh <- struct{}{}

// Delete the websocket and goblin-process pair in the hodor-storage.
engine.Send(hodorProcessId, &deleteFromHodor{
ws: msg.ws,
deleteWsCh: make(chan *websocket.Conn),
})

// Poison the goblin-process.
wg := &sync.WaitGroup{}
ctx.Engine().Poison(ctx.PID(), wg)

case actor.Stopped:
fmt.Println("goblin-process is stopped:", ctx.PID().ID)

}
}

func newHodor() actor.Receiver {
return &hodorStorage{
storage: make(map[*websocket.Conn]*actor.PID),
}
}

func newGoblin() actor.Receiver {
return &websocketGoblin{
ws: &websocket.Conn{},
exist: false,
}
}

func main() {

// Create a new engine.
// Ash nazg durbatulûk , ash nazg gimbatul,
// ash nazg thrakatulûk agh burzum-ishi krimpatul.
engine = actor.NewEngine()

Check failure on line 194 in examples/websocket/main.go

View workflow job for this annotation

GitHub Actions / build

assignment mismatch: 1 variable but actor.NewEngine returns 2 values

// Spawn a HODOR(holder-of-the-storage).
hodorProcessId = engine.Spawn(newHodor, "HODOR_STORAGE")

// Handle websocket connections. Then we will create a new process-
// corresponding websocket. Goblin-process is carrying websockets.
http.Handle("/ws", websocket.Handler(HandleFunc(GenerateProcessForWs)))
if err := http.ListenAndServe(":3000", nil); err != nil {
log.Fatal(err)
}

}
49 changes: 49 additions & 0 deletions examples/websocket/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"net/http/httptest"
"strings"
"testing"

"github.com/anthdm/hollywood/actor"
"golang.org/x/net/websocket"
)

func TestHandleWebsocket(t *testing.T) {

// Create a new engine to spawn Hodor-Storage..
engine = actor.NewEngine()
hodorProcessId = engine.Spawn(newHodor, "HODOR_STORAGE")

s := httptest.NewServer(HandleFunc(GenerateProcessForWs))
defer s.Close()

u := "ws" + strings.TrimPrefix(s.URL, "http")
println(u)

// Connect to the server
ws, err := websocket.Dial(u, "", u)
if err != nil {
t.Fatalf("%v", err)
}
defer ws.Close()

go func(ws *websocket.Conn) {
for {
buf := make([]byte, 1024)
_, err = ws.Read(buf)
if err != nil {
continue
}
// println(string(buf))
}
}(ws)

for i := 0; i < 10; i++ {
_, err := ws.Write([]byte("Some text."))
if err != nil {
t.Fatalf("WS Write Error: %v", err)
}
}

}
63 changes: 63 additions & 0 deletions examples/websocket/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"github.com/anthdm/hollywood/actor"
"golang.org/x/net/websocket"
)

// We need faithful reliable friend who is-
// HODOR(holder-of-the-storage) to bridle goblins.
type hodorStorage struct {
storage map[*websocket.Conn]*actor.PID
}

// We have goblin (holder-of-the-websocket)-process.
type websocketGoblin struct {
ws *websocket.Conn
exist bool
quitCh *chan struct{}
}

// We can easily kill goblin-process with chan struct.
type closeWebSocket struct {
ws *websocket.Conn
}

// We can create or delete goblin data in hodor-storage.
type deleteFromHodor struct {
ws *websocket.Conn
deleteWsCh chan *websocket.Conn
}

type addToHodor struct {
ws *websocket.Conn
pid *actor.PID
addWsCh chan addToHodor
}

// We can pass the message of a goblin-pprocess.
type broadcastMessage struct {
data string
}

// Goblins will need armor and spears.
type initValues struct {
ws *websocket.Conn
quitCh *chan struct{}
}

////////////////////////////////////////////////
// SPLITTED TO ADDTOHODOR AND DELETEFROMHODOR //
////////////////////////////////////////////////
//
// We can easily kill goblin-process with that.
// type letterToHodor struct {
// pid *actor.PID
// ws *websocket.Conn
// dropBool uint
// dropCh chan *websocket.Conn
// }
//
////////////////////////////////////////////////
// SPLITTED TO ADDTOHODOR AND DELETEFROMHODOR //
////////////////////////////////////////////////
Loading