From 271255fefa089db786d69e9380eafe899a81036e Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Fri, 24 Mar 2023 16:35:58 +0300 Subject: [PATCH 1/9] websocket-example Build a simple websocket with hollywood actors. --- examples/websocket/handle.go | 44 +++++++++++++ examples/websocket/main.go | 122 +++++++++++++++++++++++++++++++++++ examples/websocket/type.go | 30 +++++++++ 3 files changed, 196 insertions(+) create mode 100644 examples/websocket/handle.go create mode 100644 examples/websocket/main.go create mode 100644 examples/websocket/type.go diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go new file mode 100644 index 0000000..f3ef318 --- /dev/null +++ b/examples/websocket/handle.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "time" + + "github.com/anthdm/hollywood/actor" + "golang.org/x/net/websocket" +) + +type handleWithPid func(ws *websocket.Conn) *actor.PID + +func HandleFunc(f handleWithPid) websocket.Handler { + return func(c *websocket.Conn) { + defer fmt.Println("web socket is deleted") + + pid := f(c) + fmt.Println("new websocket is spawned: ", pid.ID) + + for { + time.Sleep(time.Second * 3) + // Is client alive? + _, err := c.Write([]byte("[IFEXIST] are u alive?")) + if err != nil { + engine.Send(pid, &closeWsMsg{ + ws: c, + }) + // Kill the websocket. + break + } + + } + + } +} + +func GenerateProcessForWs(ws *websocket.Conn) *actor.PID { + pid := engine.Spawn(webSocketFoo, ws.RemoteAddr().String()) + defer engine.Send(pid, &setWsVal{ + pid: pid, + ws: ws, + }) + return pid +} diff --git a/examples/websocket/main.go b/examples/websocket/main.go new file mode 100644 index 0000000..1dc3d55 --- /dev/null +++ b/examples/websocket/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "fmt" + "io" + "log" + "net/http" + + "github.com/anthdm/hollywood/actor" + "golang.org/x/net/websocket" +) + +var ( + engine *actor.Engine + storageProcessId *actor.PID +) + +func webSocketStorage() actor.Receiver { + return &wsPidStore{ + storage: make(map[*websocket.Conn]*actor.PID), + } +} + +func webSocketFoo() actor.Receiver { + return &wsFoo{ + ws: &websocket.Conn{}, + exist: false, + } +} + +// It is just one process. So only store socket data. +func (f *wsPidStore) Receive(ctx *actor.Context) { + switch msg := ctx.Message().(type) { + case actor.Started: + fmt.Println("[wsPidStore] storage has started") + case *sendStorageMsg: + fmt.Println("[wsPidStore] message has received", msg.ws) + // Delete incoming socket value. + if !msg.drop { + f.storage[msg.ws] = msg.pid + } else { + delete(f.storage, msg.ws) + } + } +} + +// Accepted (web)sockets. +func (f *wsFoo) Receive(ctx *actor.Context) { + switch msg := ctx.Message().(type) { + case actor.Started: + fmt.Println("[WEBSOCKET] foo has started") + + case *setWsVal: + // If exist, no need to assign. + if f.exist { + break + } + // Change state. + f.ws = msg.ws + f.exist = true + + // Send values to storage. + engine.Send(storageProcessId, &sendStorageMsg{ + ws: f.ws, + drop: false, + }) + + ourWs := msg.ws + // Generate a reading loop. Read incoming messages. + go func(ourWs *websocket.Conn) { + buf := make([]byte, 1024) + for { + n, err := ourWs.Read(buf) + if err != nil { + if err == io.EOF { + break + } + fmt.Println("read error: ", err) + continue + } + msg := buf[:n] + fmt.Println("message:", string(msg)) // convert value to string because we are human. + // If not exist, break the loop. No more need read from client. + if !f.exist { + break + } + } + + }(ourWs) + case *closeWsMsg: + // In handle func, we are checking the client which is alive or not. + // Then send a *closeWsMsg message to relevant socket. + // Relevant socket is this scope. + // + // Change exist state to false. + f.exist = false + engine.Send(storageProcessId, &sendStorageMsg{ + ws: f.ws, + drop: true, + }) + + fmt.Println("socket processor is deleted.") + // Break a process. + return + } +} + +func main() { + + // Create new engine. + engine = actor.NewEngine() + + // Spawn a storage process. + storageProcessId = engine.Spawn(webSocketStorage, "storer") + + // Handle WebSockets. + http.Handle("/ws", websocket.Handler(HandleFunc(GenerateProcessForWs))) + if err := http.ListenAndServe(":3000", nil); err != nil { + log.Fatal(err) + } + +} diff --git a/examples/websocket/type.go b/examples/websocket/type.go new file mode 100644 index 0000000..d3368f7 --- /dev/null +++ b/examples/websocket/type.go @@ -0,0 +1,30 @@ +package main + +import ( + "github.com/anthdm/hollywood/actor" + "golang.org/x/net/websocket" +) + +type wsFoo struct { + ws *websocket.Conn + exist bool +} + +type sendStorageMsg struct { + pid *actor.PID + ws *websocket.Conn + drop bool +} + +type wsPidStore struct { + storage map[*websocket.Conn]*actor.PID +} + +type setWsVal struct { + pid *actor.PID + ws *websocket.Conn +} + +type closeWsMsg struct { + ws *websocket.Conn +} From 36a738df5975fff8e945096b0dbed48916af5245 Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Sat, 25 Mar 2023 16:06:58 +0300 Subject: [PATCH 2/9] quitchannel-added-for-break-after-writer-error If any websocket writer process dont write, it means the client is disconnected. So we must break the corresponding socket and processor pair. --- examples/websocket/handle.go | 44 +++++++++-------- examples/websocket/main.go | 91 +++++++++++++++++++++++++----------- examples/websocket/type.go | 14 ++++-- 3 files changed, 98 insertions(+), 51 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index f3ef318..3333925 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -2,43 +2,47 @@ package main import ( "fmt" - "time" "github.com/anthdm/hollywood/actor" "golang.org/x/net/websocket" ) -type handleWithPid func(ws *websocket.Conn) *actor.PID +type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan bool) func HandleFunc(f handleWithPid) websocket.Handler { return func(c *websocket.Conn) { defer fmt.Println("web socket is deleted") - pid := f(c) + // We get QuitChannel. If we can't write later and we get a write error, we break this scope (websocket). + pid, quitCh := f(c) fmt.Println("new websocket is spawned: ", pid.ID) for { - time.Sleep(time.Second * 3) - // Is client alive? - _, err := c.Write([]byte("[IFEXIST] are u alive?")) - if err != nil { - engine.Send(pid, &closeWsMsg{ - ws: c, - }) - // Kill the websocket. - break - } - + // Waiting for break call. + <-*quitCh + engine.Send(pid, &closeWsMsg{ + ws: c, + }) + + // Kill the websocket. + break } - } } -func GenerateProcessForWs(ws *websocket.Conn) *actor.PID { +func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan bool) { + + // Spawn new pid for new socket. pid := engine.Spawn(webSocketFoo, ws.RemoteAddr().String()) - defer engine.Send(pid, &setWsVal{ - pid: pid, - ws: ws, + + // Create a channel to break the socket. + quitCh := make(chan bool) + + // Send datas which is init values. + engine.Send(pid, &setWsVal{ + ws: ws, + quitCh: &quitCh, }) - return pid + + return pid, &quitCh } diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 1dc3d55..0c3c89e 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -15,32 +15,29 @@ var ( storageProcessId *actor.PID ) -func webSocketStorage() actor.Receiver { - return &wsPidStore{ - storage: make(map[*websocket.Conn]*actor.PID), - } -} - -func webSocketFoo() actor.Receiver { - return &wsFoo{ - ws: &websocket.Conn{}, - exist: false, - } -} - -// It is just one process. So only store socket data. +// It is storing ProcessId and WebSocket identities. +// Also it broadcasts messages to all processes. func (f *wsPidStore) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: fmt.Println("[wsPidStore] storage has started") + case *sendStorageMsg: fmt.Println("[wsPidStore] message has received", msg.ws) // Delete incoming socket value. - if !msg.drop { - f.storage[msg.ws] = msg.pid - } else { + if msg.drop { delete(f.storage, msg.ws) + } else { + f.storage[msg.ws] = msg.pid } + + case *broadcastMsg: + go func(msg *broadcastMsg) { + for _, pid := range f.storage { + engine.Send(pid, msg) + } + }(msg) + } } @@ -51,16 +48,18 @@ func (f *wsFoo) Receive(ctx *actor.Context) { fmt.Println("[WEBSOCKET] foo has started") case *setWsVal: - // If exist, no need to assign. + // If exist, make sure you have one socket. if f.exist { break } - // Change state. + // Change states. f.ws = msg.ws f.exist = true + f.quitCh = msg.quitCh - // Send values to storage. + // Add socket and procesId pair to storage. engine.Send(storageProcessId, &sendStorageMsg{ + pid: ctx.PID(), ws: f.ws, drop: false, }) @@ -75,25 +74,50 @@ func (f *wsFoo) Receive(ctx *actor.Context) { if err == io.EOF { break } + fmt.Println("read error: ", err) continue } msg := buf[:n] - fmt.Println("message:", string(msg)) // convert value to string because we are human. - // If not exist, break the loop. No more need read from client. + msgStr := string(msg) + + message := fmt.Sprintf("%s:%s", ourWs.RemoteAddr().String(), msgStr) + // If u want show in console. + fmt.Println("message:", message) + + // Send message to storage for broadcasting. + // Storage will broadcast messages to all pids. + engine.Send(storageProcessId, &broadcastMsg{ + data: message, + }) + + // If not exist, break the loop. No need to read from the client anymore. if !f.exist { break } } }(ourWs) + + // Broadcast messages to all pids. + case *broadcastMsg: + _, err := f.ws.Write([]byte(msg.data)) + if err != nil { + engine.Send(ctx.PID(), &closeWsMsg{ + ws: f.ws, + }) + } + + // Close the specific socket and pid pair. case *closeWsMsg: - // In handle func, we are checking the client which is alive or not. - // Then send a *closeWsMsg message to relevant socket. - // Relevant socket is this scope. - // - // Change exist state to false. + // Changing the existing state to false causes the reader loop to break. + // We don't need read anymore. f.exist = false + + // Close the channel. Break the (web)socket whic is in handler func. + *f.quitCh <- true + + // Delete the web socket in the repository. engine.Send(storageProcessId, &sendStorageMsg{ ws: f.ws, drop: true, @@ -105,6 +129,19 @@ func (f *wsFoo) Receive(ctx *actor.Context) { } } +func webSocketStorage() actor.Receiver { + return &wsPidStore{ + storage: make(map[*websocket.Conn]*actor.PID), + } +} + +func webSocketFoo() actor.Receiver { + return &wsFoo{ + ws: &websocket.Conn{}, + exist: false, + } +} + func main() { // Create new engine. diff --git a/examples/websocket/type.go b/examples/websocket/type.go index d3368f7..f540208 100644 --- a/examples/websocket/type.go +++ b/examples/websocket/type.go @@ -6,8 +6,9 @@ import ( ) type wsFoo struct { - ws *websocket.Conn - exist bool + ws *websocket.Conn + exist bool + quitCh *chan bool } type sendStorageMsg struct { @@ -16,13 +17,18 @@ type sendStorageMsg struct { drop bool } +type broadcastMsg struct { + data string +} + type wsPidStore struct { storage map[*websocket.Conn]*actor.PID } type setWsVal struct { - pid *actor.PID - ws *websocket.Conn + pid *actor.PID + ws *websocket.Conn + quitCh *chan bool } type closeWsMsg struct { From d1548e531656404644148813bc0dc319f729315b Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Mon, 27 Mar 2023 03:05:51 +0300 Subject: [PATCH 3/9] quitchannel-bool-is-changed-to-struct-no-buffer no-buffer-structure is more low-overhead than other. --- examples/websocket/handle.go | 6 +++--- examples/websocket/main.go | 2 +- examples/websocket/type.go | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index 3333925..508d297 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -7,7 +7,7 @@ import ( "golang.org/x/net/websocket" ) -type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan bool) +type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan struct{}) func HandleFunc(f handleWithPid) websocket.Handler { return func(c *websocket.Conn) { @@ -30,13 +30,13 @@ func HandleFunc(f handleWithPid) websocket.Handler { } } -func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan bool) { +func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) { // Spawn new pid for new socket. pid := engine.Spawn(webSocketFoo, ws.RemoteAddr().String()) // Create a channel to break the socket. - quitCh := make(chan bool) + quitCh := make(chan struct{}) // Send datas which is init values. engine.Send(pid, &setWsVal{ diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 0c3c89e..a6ac39d 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -115,7 +115,7 @@ func (f *wsFoo) Receive(ctx *actor.Context) { f.exist = false // Close the channel. Break the (web)socket whic is in handler func. - *f.quitCh <- true + *f.quitCh <- struct{}{} // Delete the web socket in the repository. engine.Send(storageProcessId, &sendStorageMsg{ diff --git a/examples/websocket/type.go b/examples/websocket/type.go index f540208..ad0b7b6 100644 --- a/examples/websocket/type.go +++ b/examples/websocket/type.go @@ -8,7 +8,7 @@ import ( type wsFoo struct { ws *websocket.Conn exist bool - quitCh *chan bool + quitCh *chan struct{} } type sendStorageMsg struct { @@ -26,9 +26,8 @@ type wsPidStore struct { } type setWsVal struct { - pid *actor.PID ws *websocket.Conn - quitCh *chan bool + quitCh *chan struct{} } type closeWsMsg struct { From d20472dff24ef9909cfc521799a9d9ec4eb85a2b Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Mon, 27 Mar 2023 20:58:35 +0300 Subject: [PATCH 4/9] process-id-is-salted-and-poison-is-added Process id is salted with time and we are breaking websocketprocess with hollywood poison. --- examples/websocket/handle.go | 20 +++++++++++++++----- examples/websocket/main.go | 16 +++++++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index 508d297..13c5f4a 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -2,11 +2,19 @@ package main import ( "fmt" + "math/rand" + "strconv" + "time" "github.com/anthdm/hollywood/actor" "golang.org/x/net/websocket" ) +const ( + min = 1 + max = 300 +) + type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan struct{}) func HandleFunc(f handleWithPid) websocket.Handler { @@ -15,14 +23,11 @@ func HandleFunc(f handleWithPid) websocket.Handler { // We get QuitChannel. If we can't write later and we get a write error, we break this scope (websocket). pid, quitCh := f(c) - fmt.Println("new websocket is spawned: ", pid.ID) + fmt.Println("new websocket process is spawned: ", pid.ID) for { // Waiting for break call. <-*quitCh - engine.Send(pid, &closeWsMsg{ - ws: c, - }) // Kill the websocket. break @@ -31,9 +36,14 @@ func HandleFunc(f handleWithPid) websocket.Handler { } func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) { + // Generate unique process id for new process. + now := time.Now() + rand.Seed(now.UnixNano()) + randNum := rand.Intn(max-min+1) + min + salt := strconv.Itoa(randNum + now.Nanosecond()) // Spawn new pid for new socket. - pid := engine.Spawn(webSocketFoo, ws.RemoteAddr().String()) + pid := engine.Spawn(webSocketFoo, salt) // Create a channel to break the socket. quitCh := make(chan struct{}) diff --git a/examples/websocket/main.go b/examples/websocket/main.go index a6ac39d..1ebc598 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -5,6 +5,7 @@ import ( "io" "log" "net/http" + "sync" "github.com/anthdm/hollywood/actor" "golang.org/x/net/websocket" @@ -20,10 +21,10 @@ var ( func (f *wsPidStore) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: - fmt.Println("[wsPidStore] storage has started") + fmt.Println("[wsPidStore] storage has started:", ctx.PID().ID) case *sendStorageMsg: - fmt.Println("[wsPidStore] message has received", msg.ws) + fmt.Println("[wsPidStore] message has received from", ctx.PID()) // Delete incoming socket value. if msg.drop { delete(f.storage, msg.ws) @@ -35,6 +36,7 @@ func (f *wsPidStore) Receive(ctx *actor.Context) { go func(msg *broadcastMsg) { for _, pid := range f.storage { engine.Send(pid, msg) + fmt.Println("\n pid:", pid.ID) } }(msg) @@ -45,7 +47,7 @@ func (f *wsPidStore) Receive(ctx *actor.Context) { func (f *wsFoo) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: - fmt.Println("[WEBSOCKET] foo has started") + fmt.Println("[WEBSOCKET] foo has started:", ctx.PID().ID) case *setWsVal: // If exist, make sure you have one socket. @@ -123,9 +125,13 @@ func (f *wsFoo) Receive(ctx *actor.Context) { drop: true, }) - fmt.Println("socket processor is deleted.") // Break a process. - return + wg := &sync.WaitGroup{} + ctx.Engine().Poison(ctx.PID(), wg) + + case actor.Stopped: + fmt.Println("socket processor is stopped:", ctx.PID().ID) + } } From 14ee20c79b355d8c8c9876952a4e9820de599d75 Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Tue, 28 Mar 2023 01:47:04 +0300 Subject: [PATCH 5/9] imagination-added-goblins-and-hodor make it easy to understand with hodor and goblins. --- examples/websocket/handle.go | 23 +++++---- examples/websocket/main.go | 95 ++++++++++++++++++++---------------- examples/websocket/type.go | 32 +++++++----- 3 files changed, 85 insertions(+), 65 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index 13c5f4a..03e1785 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -20,13 +20,16 @@ type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan struct{}) func HandleFunc(f handleWithPid) websocket.Handler { return func(c *websocket.Conn) { defer fmt.Println("web socket is deleted") - - // We get QuitChannel. If we can't write later and we get a write error, we break this scope (websocket). + // If we can't write to the socket later and we get- + // a "write error", we break this (websocket) scope. + // + // Generating a new process for websocket. + // Getting a quitChanel to break websocket scope. pid, quitCh := f(c) fmt.Println("new websocket process is spawned: ", pid.ID) for { - // Waiting for break call. + // Waiting for the "break call" from processes. <-*quitCh // Kill the websocket. @@ -36,20 +39,20 @@ func HandleFunc(f handleWithPid) websocket.Handler { } func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) { - // Generate unique process id for new process. + // Create unique pid with salting for new process. now := time.Now() rand.Seed(now.UnixNano()) randNum := rand.Intn(max-min+1) + min - salt := strconv.Itoa(randNum + now.Nanosecond()) + uniquePid := strconv.Itoa(randNum + now.Nanosecond()) - // Spawn new pid for new socket. - pid := engine.Spawn(webSocketFoo, salt) + // Spawn a new process for the new socket. + pid := engine.Spawn(newGoblin, uniquePid) - // Create a channel to break the socket. + // Create a channel to break the socket. quitCh := make(chan struct{}) - // Send datas which is init values. - engine.Send(pid, &setWsVal{ + // Send datas which is init values. + engine.Send(pid, &initValues{ ws: ws, quitCh: &quitCh, }) diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 1ebc598..32dbb95 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -12,45 +12,49 @@ import ( ) var ( - engine *actor.Engine - storageProcessId *actor.PID + engine *actor.Engine + hodorProcessId *actor.PID ) -// It is storing ProcessId and WebSocket identities. -// Also it broadcasts messages to all processes. -func (f *wsPidStore) Receive(ctx *actor.Context) { +// It is storing goblin-process and coressponding websocket identities. +// Also it broadcasts messages to all goblin-processes. +func (f *hodorStorage) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: - fmt.Println("[wsPidStore] storage has started:", ctx.PID().ID) + fmt.Println("[HODOR] storage has started, id:", ctx.PID().ID) - case *sendStorageMsg: - fmt.Println("[wsPidStore] message has received from", ctx.PID()) - // Delete incoming socket value. + case *letterToHodor: + fmt.Println("[HODOR] message has received from:", ctx.PID()) + + // HODOR will do these. + // Delete the incoming websocket value - + // or add a new websocket and goblin-process pair. if msg.drop { delete(f.storage, msg.ws) } else { f.storage[msg.ws] = msg.pid } - case *broadcastMsg: - go func(msg *broadcastMsg) { + case *broadcastMessage: + go func(msg *broadcastMessage) { for _, pid := range f.storage { + // Send data to its own goblin-processor for send message its own websocket. + // So goblin-process will write own message to own websocket. engine.Send(pid, msg) - fmt.Println("\n pid:", pid.ID) } }(msg) } } -// Accepted (web)sockets. -func (f *wsFoo) Receive(ctx *actor.Context) { +// Goblin-processes corresponding to websockets live here.. +func (f *websocketGoblin) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: fmt.Println("[WEBSOCKET] foo has started:", ctx.PID().ID) - case *setWsVal: - // If exist, make sure you have one socket. + case *initValues: + // If exist, make sure you have one websocket. if f.exist { break } @@ -59,8 +63,8 @@ func (f *wsFoo) Receive(ctx *actor.Context) { f.exist = true f.quitCh = msg.quitCh - // Add socket and procesId pair to storage. - engine.Send(storageProcessId, &sendStorageMsg{ + // Add websocket and corresponding goblin-processId pair to hodor-storage. + engine.Send(hodorProcessId, &letterToHodor{ pid: ctx.PID(), ws: f.ws, drop: false, @@ -84,16 +88,18 @@ func (f *wsFoo) Receive(ctx *actor.Context) { msgStr := string(msg) message := fmt.Sprintf("%s:%s", ourWs.RemoteAddr().String(), msgStr) - // If u want show in console. + + // If you want, show that in console. fmt.Println("message:", message) - // Send message to storage for broadcasting. - // Storage will broadcast messages to all pids. - engine.Send(storageProcessId, &broadcastMsg{ + // Send message to the HODOR for broadcasting. + // HODOR will broadcast messages to all goblins. + engine.Send(hodorProcessId, &broadcastMessage{ data: message, }) - // If not exist, break the loop. No need to read from the client anymore. + // If not exist, break the loop. + // No need to read from the client anymore. if !f.exist { break } @@ -101,48 +107,48 @@ func (f *wsFoo) Receive(ctx *actor.Context) { }(ourWs) - // Broadcast messages to all pids. - case *broadcastMsg: + // Broadcast messages to all goblin-processes. + case *broadcastMessage: _, err := f.ws.Write([]byte(msg.data)) if err != nil { - engine.Send(ctx.PID(), &closeWsMsg{ + engine.Send(ctx.PID(), &closeWebSocket{ ws: f.ws, }) } - // Close the specific socket and pid pair. - case *closeWsMsg: - // Changing the existing state to false causes the reader loop to break. - // We don't need read anymore. + // Close the specific goblin-process and websocket pair. + case *closeWebSocket: + // Changing the f.exist value to false causing - + // the "reader loop" to break. So we don't need read anymore. f.exist = false - // Close the channel. Break the (web)socket whic is in handler func. + // Close the channel. Break the websocket scope. *f.quitCh <- struct{}{} - // Delete the web socket in the repository. - engine.Send(storageProcessId, &sendStorageMsg{ + // Delete the websocket and goblin-process pair in the hodor-storage. + engine.Send(hodorProcessId, &letterToHodor{ ws: f.ws, drop: true, }) - // Break a process. + // Poison the goblin process. wg := &sync.WaitGroup{} ctx.Engine().Poison(ctx.PID(), wg) case actor.Stopped: - fmt.Println("socket processor is stopped:", ctx.PID().ID) + fmt.Println("goblin-process is stopped:", ctx.PID().ID) } } -func webSocketStorage() actor.Receiver { - return &wsPidStore{ +func newHodor() actor.Receiver { + return &hodorStorage{ storage: make(map[*websocket.Conn]*actor.PID), } } -func webSocketFoo() actor.Receiver { - return &wsFoo{ +func newGoblin() actor.Receiver { + return &websocketGoblin{ ws: &websocket.Conn{}, exist: false, } @@ -150,13 +156,16 @@ func webSocketFoo() actor.Receiver { func main() { - // Create new engine. + // Create a new engine. + // Ash nazg durbatulûk, ash nazg gimbatul, + // ash nazg thrakatulûk agh burzum-ishi krimpatul. engine = actor.NewEngine() - // Spawn a storage process. - storageProcessId = engine.Spawn(webSocketStorage, "storer") + // Spawn a HODOR(holder-of-the-storage). + hodorProcessId = engine.Spawn(newHodor, "HODOR_STORAGE") - // Handle WebSockets. + // Handle websocket connections. Then we will create a new process - + // corresponding websocket. Goblin-process is carrying websockets. http.Handle("/ws", websocket.Handler(HandleFunc(GenerateProcessForWs))) if err := http.ListenAndServe(":3000", nil); err != nil { log.Fatal(err) diff --git a/examples/websocket/type.go b/examples/websocket/type.go index ad0b7b6..30a3666 100644 --- a/examples/websocket/type.go +++ b/examples/websocket/type.go @@ -5,31 +5,39 @@ import ( "golang.org/x/net/websocket" ) -type wsFoo struct { +// We need faithful reliable friend who is - +// HODOR(holder-of-the-storage) to bridle goblins. +type hodorStorage struct { + storage map[*websocket.Conn]*actor.PID +} + +// We have goblin(holder-of-the-websocket)-process. +type websocketGoblin struct { ws *websocket.Conn exist bool quitCh *chan struct{} } -type sendStorageMsg struct { +// We can easily kill goblin-process with chan struct. +type closeWebSocket struct { + ws *websocket.Conn +} + +// We can create or delete goblin data in - +// hodor-storage. +type letterToHodor struct { pid *actor.PID ws *websocket.Conn drop bool } -type broadcastMsg struct { +// We can pass the message of a goblin-pprocess. +type broadcastMessage struct { data string } -type wsPidStore struct { - storage map[*websocket.Conn]*actor.PID -} - -type setWsVal struct { +// Goblins will need armor and spears. +type initValues struct { ws *websocket.Conn quitCh *chan struct{} } - -type closeWsMsg struct { - ws *websocket.Conn -} From 30e26953749877f5139a8ec73769c6c1aeeec029 Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Tue, 28 Mar 2023 18:13:21 +0300 Subject: [PATCH 6/9] lowercase-const-is-changed-to-uppercase-const rules is changing. --- examples/websocket/handle.go | 26 ++++++++++++++------------ examples/websocket/main.go | 4 ++-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index 03e1785..0796a08 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -11,8 +11,8 @@ import ( ) const ( - min = 1 - max = 300 + MIN = 1 + MAX = 300 ) type handleWithPid func(ws *websocket.Conn) (*actor.PID, *chan struct{}) @@ -23,35 +23,37 @@ func HandleFunc(f handleWithPid) websocket.Handler { // If we can't write to the socket later and we get- // a "write error", we break this (websocket) scope. // - // Generating a new process for websocket. - // Getting a quitChanel to break websocket scope. + // Generating a new goblin-process for websocket. + // Getting a quitChanel to break websocket handle scope. pid, quitCh := f(c) - fmt.Println("new websocket process is spawned: ", pid.ID) + fmt.Println("new goblin-process which is holding incoming websocket is spawned: ", pid.ID) for { - // Waiting for the "break call" from processes. + // Waiting for the "break call" from goblins-processes. <-*quitCh - // Kill the websocket. + // Kill the websocket handling scope. break } } } func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) { - // Create unique pid with salting for new process. + // Create unique pid with salting for new goblins-processes. now := time.Now() rand.Seed(now.UnixNano()) - randNum := rand.Intn(max-min+1) + min + + randNum := rand.Intn(MAX-MIN+1) + MIN uniquePid := strconv.Itoa(randNum + now.Nanosecond()) - // Spawn a new process for the new socket. + // Spawn a new goblin-process for incoming websocket. pid := engine.Spawn(newGoblin, uniquePid) - // Create a channel to break the socket. + // Create a channel to break the websocket handling scope. quitCh := make(chan struct{}) - // Send datas which is init values. + // Send datas the created goblin-process. + // Than goblin-process will hold (the websocket connection) and (the quitCh pointer). engine.Send(pid, &initValues{ ws: ws, quitCh: &quitCh, diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 32dbb95..5286c38 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -122,7 +122,7 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { // the "reader loop" to break. So we don't need read anymore. f.exist = false - // Close the channel. Break the websocket scope. + // Close the channel. So that will break the websocket scope. *f.quitCh <- struct{}{} // Delete the websocket and goblin-process pair in the hodor-storage. @@ -131,7 +131,7 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { drop: true, }) - // Poison the goblin process. + // Poison the goblin-process. wg := &sync.WaitGroup{} ctx.Engine().Poison(ctx.PID(), wg) From b648c0ffbce2ac91a0c369d476f926fc2c2a0a6b Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Thu, 30 Mar 2023 17:37:37 +0300 Subject: [PATCH 7/9] solved-race-condition-storage-add-delete-statement The add(add map) and delete(from map) statements should be split into two cases in storage-process. --- examples/websocket/main.go | 30 ++++++++++++------------------ examples/websocket/type.go | 11 +++++++---- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 5286c38..6b6f5f2 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -23,17 +23,13 @@ func (f *hodorStorage) Receive(ctx *actor.Context) { case actor.Started: fmt.Println("[HODOR] storage has started, id:", ctx.PID().ID) - case *letterToHodor: - fmt.Println("[HODOR] message has received from:", ctx.PID()) - - // HODOR will do these. - // Delete the incoming websocket value - - // or add a new websocket and goblin-process pair. - if msg.drop { - delete(f.storage, msg.ws) - } else { - f.storage[msg.ws] = msg.pid - } + // Delete the incoming websocket value. + case *deleteFromHodor: + delete(f.storage, msg.ws) + + // Add a new websocket and goblin-process pair. + case addToHodor: + f.storage[msg.ws] = msg.pid case *broadcastMessage: go func(msg *broadcastMessage) { @@ -64,10 +60,9 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { f.quitCh = msg.quitCh // Add websocket and corresponding goblin-processId pair to hodor-storage. - engine.Send(hodorProcessId, &letterToHodor{ - pid: ctx.PID(), - ws: f.ws, - drop: false, + engine.Send(hodorProcessId, &addToHodor{ + pid: ctx.PID(), + ws: f.ws, }) ourWs := msg.ws @@ -126,9 +121,8 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { *f.quitCh <- struct{}{} // Delete the websocket and goblin-process pair in the hodor-storage. - engine.Send(hodorProcessId, &letterToHodor{ - ws: f.ws, - drop: true, + engine.Send(hodorProcessId, &deleteFromHodor{ + ws: f.ws, }) // Poison the goblin-process. diff --git a/examples/websocket/type.go b/examples/websocket/type.go index 30a3666..64d7111 100644 --- a/examples/websocket/type.go +++ b/examples/websocket/type.go @@ -25,10 +25,13 @@ type closeWebSocket struct { // We can create or delete goblin data in - // hodor-storage. -type letterToHodor struct { - pid *actor.PID - ws *websocket.Conn - drop bool +type deleteFromHodor struct { + ws *websocket.Conn +} + +type addToHodor struct { + pid *actor.PID + ws *websocket.Conn } // We can pass the message of a goblin-pprocess. From 3d9758e5b277b50efcefd64cbd71457efd427516 Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Thu, 30 Mar 2023 17:38:08 +0300 Subject: [PATCH 8/9] add-socket-handle-test --- examples/websocket/main_test.go | 49 +++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 examples/websocket/main_test.go diff --git a/examples/websocket/main_test.go b/examples/websocket/main_test.go new file mode 100644 index 0000000..2c99416 --- /dev/null +++ b/examples/websocket/main_test.go @@ -0,0 +1,49 @@ +package main + +import ( + "net/http/httptest" + "strings" + "testing" + + "github.com/anthdm/hollywood/actor" + "golang.org/x/net/websocket" +) + +func TestHandleWebsocket(t *testing.T) { + + // Create a new engine to spawn Hodor-Storage.. + engine = actor.NewEngine() + hodorProcessId = engine.Spawn(newHodor, "HODOR_STORAGE") + + s := httptest.NewServer(HandleFunc(GenerateProcessForWs)) + defer s.Close() + + u := "ws" + strings.TrimPrefix(s.URL, "http") + println(u) + + // Connect to the server + ws, err := websocket.Dial(u, "", u) + if err != nil { + t.Fatalf("%v", err) + } + defer ws.Close() + + go func(ws *websocket.Conn) { + for { + buf := make([]byte, 1024) + _, err = ws.Read(buf) + if err != nil { + continue + } + // println(string(buf)) + } + }(ws) + + for i := 0; i < 10; i++ { + _, err := ws.Write([]byte("Some text.")) + if err != nil { + t.Fatalf("WS Write Error: %v", err) + } + } + +} From 7f534a6ded4fbf06698b86ee727904fb87d433c6 Mon Sep 17 00:00:00 2001 From: Abdullah BIYIK Date: Fri, 31 Mar 2023 20:09:09 +0300 Subject: [PATCH 9/9] race-cond-so-i-have-to-solve-this-with-channels I solved race-condition temporarily with channels. --- examples/websocket/handle.go | 16 +++++---- examples/websocket/main.go | 70 +++++++++++++++++++++++++++--------- examples/websocket/type.go | 33 ++++++++++++----- 3 files changed, 88 insertions(+), 31 deletions(-) diff --git a/examples/websocket/handle.go b/examples/websocket/handle.go index 0796a08..f66e223 100644 --- a/examples/websocket/handle.go +++ b/examples/websocket/handle.go @@ -25,14 +25,15 @@ func HandleFunc(f handleWithPid) websocket.Handler { // // Generating a new goblin-process for websocket. // Getting a quitChanel to break websocket handle scope. + pid, quitCh := f(c) - fmt.Println("new goblin-process which is holding incoming websocket is spawned: ", pid.ID) + fmt.Println("new goblin-process is spawned: ", pid.ID) for { - // Waiting for the "break call" from goblins-processes. + // Waiting for the "break call" from goblin-proces. <-*quitCh - // Kill the websocket handling scope. + // Kill the websocket handling scope.. break } } @@ -46,14 +47,15 @@ func GenerateProcessForWs(ws *websocket.Conn) (*actor.PID, *chan struct{}) { randNum := rand.Intn(MAX-MIN+1) + MIN uniquePid := strconv.Itoa(randNum + now.Nanosecond()) - // Spawn a new goblin-process for incoming websocket. + // Spawn a new goblin-process for incoming websocket. pid := engine.Spawn(newGoblin, uniquePid) - // Create a channel to break the websocket handling scope. + // Create a channel to break the websocket handling scope. quitCh := make(chan struct{}) - // Send datas the created goblin-process. - // Than goblin-process will hold (the websocket connection) and (the quitCh pointer). + // Send datas the created goblin-process. + // Than goblin-process will hold (the websoc- + // ket connection) and (the quitCh pointer). engine.Send(pid, &initValues{ ws: ws, quitCh: &quitCh, diff --git a/examples/websocket/main.go b/examples/websocket/main.go index 6b6f5f2..20e4ddc 100644 --- a/examples/websocket/main.go +++ b/examples/websocket/main.go @@ -23,13 +23,47 @@ func (f *hodorStorage) Receive(ctx *actor.Context) { case actor.Started: fmt.Println("[HODOR] storage has started, id:", ctx.PID().ID) + //---------------------------- + //-----CAUSES RACE COND.------ + //---------------------------- + // case *letterToHodor: + // fmt.Println("[HODOR] message has received from:", ctx.PID()) + //---------------------------- + //-----CAUSES RACE COND.------ + //---------------------------- + // Delete the incoming websocket value. case *deleteFromHodor: - delete(f.storage, msg.ws) - // Add a new websocket and goblin-process pair. - case addToHodor: - f.storage[msg.ws] = msg.pid + msg.deleteWsCh <- msg.ws + + go func(f *hodorStorage, msg *deleteFromHodor) { + for wsocket := range msg.deleteWsCh { + delete(f.storage, wsocket) + close(msg.deleteWsCh) + break + } + }(f, msg) + + // Add a new websocket and goblin-process pair. + case *addToHodor: + + // Prepare a new map to send over channel. + newMap := make(map[*websocket.Conn]*actor.PID) + newMap[msg.ws] = msg.pid + + msg.addWsCh <- addToHodor{ + ws: msg.ws, + pid: msg.pid, + } + + go func(f *hodorStorage, msg *addToHodor) { + for wsMap := range msg.addWsCh { + f.storage[wsMap.ws] = wsMap.pid + close(msg.addWsCh) + break + } + }(f, msg) case *broadcastMessage: go func(msg *broadcastMessage) { @@ -54,6 +88,7 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { if f.exist { break } + // Change states. f.ws = msg.ws f.exist = true @@ -61,11 +96,13 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { // Add websocket and corresponding goblin-processId pair to hodor-storage. engine.Send(hodorProcessId, &addToHodor{ - pid: ctx.PID(), - ws: f.ws, + ws: msg.ws, + pid: ctx.PID(), + addWsCh: make(chan addToHodor), }) ourWs := msg.ws + // Generate a reading loop. Read incoming messages. go func(ourWs *websocket.Conn) { buf := make([]byte, 1024) @@ -79,15 +116,15 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { fmt.Println("read error: ", err) continue } + msg := buf[:n] msgStr := string(msg) - message := fmt.Sprintf("%s:%s", ourWs.RemoteAddr().String(), msgStr) // If you want, show that in console. - fmt.Println("message:", message) + fmt.Println("message.....:", message) - // Send message to the HODOR for broadcasting. + // Send message to the HODOR for broadcasting. // HODOR will broadcast messages to all goblins. engine.Send(hodorProcessId, &broadcastMessage{ data: message, @@ -102,7 +139,7 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { }(ourWs) - // Broadcast messages to all goblin-processes. + // Broadcast messages to all goblin-processes. case *broadcastMessage: _, err := f.ws.Write([]byte(msg.data)) if err != nil { @@ -113,7 +150,7 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { // Close the specific goblin-process and websocket pair. case *closeWebSocket: - // Changing the f.exist value to false causing - + // Changing the f.exist value to false causing- // the "reader loop" to break. So we don't need read anymore. f.exist = false @@ -122,7 +159,8 @@ func (f *websocketGoblin) Receive(ctx *actor.Context) { // Delete the websocket and goblin-process pair in the hodor-storage. engine.Send(hodorProcessId, &deleteFromHodor{ - ws: f.ws, + ws: msg.ws, + deleteWsCh: make(chan *websocket.Conn), }) // Poison the goblin-process. @@ -150,16 +188,16 @@ func newGoblin() actor.Receiver { func main() { - // Create a new engine. - // Ash nazg durbatulûk, ash nazg gimbatul, + // Create a new engine. + // Ash nazg durbatulûk , ash nazg gimbatul, // ash nazg thrakatulûk agh burzum-ishi krimpatul. engine = actor.NewEngine() // Spawn a HODOR(holder-of-the-storage). hodorProcessId = engine.Spawn(newHodor, "HODOR_STORAGE") - // Handle websocket connections. Then we will create a new process - - // corresponding websocket. Goblin-process is carrying websockets. + // Handle websocket connections. Then we will create a new process- + // corresponding websocket. Goblin-process is carrying websockets. http.Handle("/ws", websocket.Handler(HandleFunc(GenerateProcessForWs))) if err := http.ListenAndServe(":3000", nil); err != nil { log.Fatal(err) diff --git a/examples/websocket/type.go b/examples/websocket/type.go index 64d7111..c6a606e 100644 --- a/examples/websocket/type.go +++ b/examples/websocket/type.go @@ -5,13 +5,13 @@ import ( "golang.org/x/net/websocket" ) -// We need faithful reliable friend who is - -// HODOR(holder-of-the-storage) to bridle goblins. +// We need faithful reliable friend who is- +// HODOR(holder-of-the-storage) to bridle goblins. type hodorStorage struct { storage map[*websocket.Conn]*actor.PID } -// We have goblin(holder-of-the-websocket)-process. +// We have goblin (holder-of-the-websocket)-process. type websocketGoblin struct { ws *websocket.Conn exist bool @@ -23,15 +23,16 @@ type closeWebSocket struct { ws *websocket.Conn } -// We can create or delete goblin data in - -// hodor-storage. +// We can create or delete goblin data in hodor-storage. type deleteFromHodor struct { - ws *websocket.Conn + ws *websocket.Conn + deleteWsCh chan *websocket.Conn } type addToHodor struct { - pid *actor.PID - ws *websocket.Conn + ws *websocket.Conn + pid *actor.PID + addWsCh chan addToHodor } // We can pass the message of a goblin-pprocess. @@ -44,3 +45,19 @@ type initValues struct { ws *websocket.Conn quitCh *chan struct{} } + +//////////////////////////////////////////////// +// SPLITTED TO ADDTOHODOR AND DELETEFROMHODOR // +//////////////////////////////////////////////// +// +// We can easily kill goblin-process with that. +// type letterToHodor struct { +// pid *actor.PID +// ws *websocket.Conn +// dropBool uint +// dropCh chan *websocket.Conn +// } +// +//////////////////////////////////////////////// +// SPLITTED TO ADDTOHODOR AND DELETEFROMHODOR // +////////////////////////////////////////////////