diff --git a/clients.go b/clients.go index 591a646b..f5fafd74 100644 --- a/clients.go +++ b/clients.go @@ -584,25 +584,12 @@ func (cl *Client) WritePacket(pk packets.Packet) error { return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25] } + nb := net.Buffers{buf.Bytes()} n, err := func() (int64, error) { cl.Lock() defer cl.Unlock() - - length, err := cl.Net.bconn.Write(buf.Bytes()) - if nil != err { - return 0, err - } - - // immediate flush if no packets in channel and write buffer is not empty - if len(cl.State.outbound) == 0 && cl.Net.bconn.Writer.Buffered() > 0 { - if err = cl.Net.bconn.Flush(); nil != err { - return 0, err - } - } - - return int64(length), nil + return nb.WriteTo(cl.Net.Conn) }() - if err != nil { return err } diff --git a/server_test.go b/server_test.go index bc87a473..604aaae6 100644 --- a/server_test.go +++ b/server_test.go @@ -15,7 +15,6 @@ import ( "sync/atomic" "testing" "time" - "bufio" "github.com/mochi-mqtt/server/v2/hooks/storage" "github.com/mochi-mqtt/server/v2/listeners" @@ -2469,10 +2468,6 @@ func TestServerProcessInboundQos2Flow(t *testing.T) { t.Run("qos step"+strconv.Itoa(i), func(t *testing.T) { r, w = net.Pipe() cl.Net.Conn = w - cl.Net.bconn = bufio.NewReadWriter( - bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize), - bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize), - ) recv := make(chan []byte) go func() { // receive the ack @@ -2548,10 +2543,6 @@ func TestServerProcessOutboundQos2Flow(t *testing.T) { r, w := net.Pipe() time.Sleep(time.Millisecond) cl.Net.Conn = w - cl.Net.bconn = bufio.NewReadWriter( - bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize), - bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize), - ) recv := make(chan []byte) go func() { // receive the ack