From 6569c6948a00d6352656372ca4809e12308317de Mon Sep 17 00:00:00 2001 From: lxzan Date: Thu, 25 Jan 2024 11:26:46 +0800 Subject: [PATCH] Add WritevAsync Method --- compress_test.go | 2 +- writer.go | 26 ++++++++++++++++++-------- writer_test.go | 36 ++++++++++++++++++++++++++++++++---- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/compress_test.go b/compress_test.go index 8135d962..97e57335 100644 --- a/compress_test.go +++ b/compress_test.go @@ -214,7 +214,7 @@ func TestPermessageNegotiation(t *testing.T) { assert.Equal(t, string(client.cpsWindow.dict), "he") _ = client.WriteString("llo") assert.Equal(t, string(client.cpsWindow.dict), "hello") - _ = client.WriteV(OpcodeText, []byte(", "), []byte("world!")) + _ = client.Writev(OpcodeText, []byte(", "), []byte("world!")) assert.Equal(t, string(client.cpsWindow.dict), "hello, world!") }) diff --git a/writer.go b/writer.go index 6f3fc019..9ac248ff 100644 --- a/writer.go +++ b/writer.go @@ -49,14 +49,6 @@ func (c *Conn) WriteMessage(opcode Opcode, payload []byte) error { return err } -// WriteV 批量写入文本/二进制消息, 文本消息应该使用UTF8编码 -// writes batch text/binary messages, text messages should be encoded in UTF8. -func (c *Conn) WriteV(opcode Opcode, payloads ...[]byte) error { - var err = c.doWrite(opcode, internal.Buffers(payloads)) - c.emitError(err) - return err -} - // WriteAsync 异步写 // 异步非阻塞地将消息写入到任务队列, 收到回调后才允许回收payload内存 // Asynchronously and non-blockingly write the message to the task queue, allowing the payload memory to be reclaimed only after a callback is received. @@ -68,6 +60,24 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte, callback func(error)) { }) } +// Writev 类似WriteMessage, 区别是可以一次写入多个切片 +// Similar to WriteMessage, except that you can write multiple slices at once. +func (c *Conn) Writev(opcode Opcode, payloads ...[]byte) error { + var err = c.doWrite(opcode, internal.Buffers(payloads)) + c.emitError(err) + return err +} + +// WritevAsync 类似WriteAsync, 区别是可以一次写入多个切片 +// Similar to WriteAsync, except that you can write multiple slices at once. +func (c *Conn) WritevAsync(opcode Opcode, payloads [][]byte, callback func(error)) { + c.writeQueue.Push(func() { + if err := c.Writev(opcode, payloads...); callback != nil { + callback(err) + } + }) +} + // 执行写入逻辑, 注意妥善维护压缩字典 func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { c.mu.Lock() diff --git a/writer_test.go b/writer_test.go index 31ecf40a..47237987 100644 --- a/writer_test.go +++ b/writer_test.go @@ -329,7 +329,7 @@ func TestRecovery(t *testing.T) { time.Sleep(100 * time.Millisecond) } -func TestConn_WriteV(t *testing.T) { +func TestConn_Writev(t *testing.T) { t.Run("", func(t *testing.T) { var serverHandler = new(webSocketMocker) var clientHandler = new(webSocketMocker) @@ -348,7 +348,7 @@ func TestConn_WriteV(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteV(OpcodeText, [][]byte{ + var err = client.Writev(OpcodeText, [][]byte{ []byte("he"), []byte("llo"), []byte(", world!"), @@ -357,6 +357,34 @@ func TestConn_WriteV(t *testing.T) { wg.Wait() }) + t.Run("", func(t *testing.T) { + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{} + var clientOption = &ClientOption{} + var wg = &sync.WaitGroup{} + wg.Add(1) + + serverHandler.onMessage = func(socket *Conn, message *Message) { + if bytes.Equal(message.Bytes(), []byte("hello, world!")) { + wg.Done() + } + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + client.WritevAsync(OpcodeText, [][]byte{ + []byte("he"), + []byte("llo"), + []byte(", world!"), + }, func(err error) { + assert.NoError(t, err) + }) + wg.Wait() + }) + t.Run("", func(t *testing.T) { var serverHandler = new(webSocketMocker) var clientHandler = new(webSocketMocker) @@ -389,7 +417,7 @@ func TestConn_WriteV(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteV(OpcodeText, [][]byte{ + var err = client.Writev(OpcodeText, [][]byte{ []byte("he"), []byte("llo"), []byte(", world!"), @@ -423,7 +451,7 @@ func TestConn_WriteV(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteV(OpcodeText, [][]byte{ + var err = client.Writev(OpcodeText, [][]byte{ []byte("山高月小"), []byte("水落石出")[2:], }...)