Skip to content

Commit

Permalink
Merge pull request #52 from lxzan/testing
Browse files Browse the repository at this point in the history
v1.6.11
  • Loading branch information
lxzan authored Sep 19, 2023
2 parents 3a247a3 + cef3243 commit 26f29d4
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 23 deletions.
1 change: 1 addition & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (c *Conn) ReadLoop() {

// 回收资源
if c.isServer {
c.br.Reset(nil)
c.config.readerPool.Put(c.br)
c.br = nil
}
Expand Down
3 changes: 1 addition & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gws

import (
"bytes"
"errors"
"fmt"
"github.com/lxzan/gws/internal"
)
Expand Down Expand Up @@ -51,7 +50,7 @@ func (c *Conn) readControl() error {
case OpcodeCloseConnection:
return c.emitClose(bytes.NewBuffer(payload))
default:
var err = errors.New(fmt.Sprintf("unexpected opcode: %d", opcode))
var err = fmt.Errorf("gws: unexpected opcode %d", opcode)
return internal.NewError(internal.CloseProtocolError, err)
}
}
Expand Down
16 changes: 7 additions & 9 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"github.com/lxzan/gws/internal"
"math"
"sync"
"sync/atomic"
)

Expand Down Expand Up @@ -142,6 +143,7 @@ type (
}

broadcastMessageWrapper struct {
once sync.Once
err error
index int
frame *bytes.Buffer
Expand All @@ -155,23 +157,19 @@ func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster {
c := &Broadcaster{
opcode: opcode,
payload: payload,
msgs: [2]*broadcastMessageWrapper{},
msgs: [2]*broadcastMessageWrapper{{}, {}},
state: int64(math.MaxInt32),
}
return c
}

// Broadcast 广播
// 向单个客户端发送广播消息. 注意: 不要并行调用Broadcast方法
// Send a broadcast message to a single client. Note: Do not call the Broadcast method in parallel.
// 向客户端发送广播消息
// Send a broadcast message to a client.
func (c *Broadcaster) Broadcast(socket *Conn) error {
var idx = internal.SelectValue(socket.compressEnabled, 1, 0)
var msg = c.msgs[idx]
if msg == nil {
msg = &broadcastMessageWrapper{}
msg.frame, msg.index, msg.err = socket.genFrame(c.opcode, c.payload)
c.msgs[idx] = msg
}
msg.once.Do(func() { msg.frame, msg.index, msg.err = socket.genFrame(c.opcode, c.payload) })
if msg.err != nil {
return msg.err
}
Expand All @@ -197,7 +195,7 @@ func (c *Broadcaster) doClose() {
}

// Release 释放资源
// 在完成所有Broadcast之后调用Release方法释放资源.
// 在完成所有Broadcast调用之后执行Release方法释放资源.
// Call the Release method after all the Broadcasts have been completed to release the resources.
func (c *Broadcaster) Release() {
if atomic.AddInt64(&c.state, -1*math.MaxInt32) == 0 {
Expand Down
13 changes: 1 addition & 12 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,11 @@ func TestNewBroadcaster(t *testing.T) {
}

type broadcastHandler struct {
BuiltinEventHandler
wg *sync.WaitGroup
sockets *sync.Map
}

func (b broadcastHandler) OnOpen(socket *Conn) {
}

func (b broadcastHandler) OnClose(socket *Conn, err error) {
}

func (b broadcastHandler) OnPing(socket *Conn, payload []byte) {
}

func (b broadcastHandler) OnPong(socket *Conn, payload []byte) {
}

func (b broadcastHandler) OnMessage(socket *Conn, message *Message) {
defer message.Close()
b.wg.Done()
Expand Down

0 comments on commit 26f29d4

Please sign in to comment.