Skip to content

Commit

Permalink
improve transport performance with bufio (#321)
Browse files Browse the repository at this point in the history
* improve transport performance with bufio

* fix issue of unit test

* fix issue

* optimize code
  • Loading branch information
x20080406 authored Oct 22, 2023
1 parent 4c0c862 commit 8e52e49
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
17 changes: 15 additions & 2 deletions clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,25 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
}

nb := net.Buffers{buf.Bytes()}
n, err := func() (int64, error) {
cl.Lock()
defer cl.Unlock()
return nb.WriteTo(cl.Net.Conn)

length, err := cl.Net.bconn.Write(buf.Bytes())
if nil != err {
return 0, err
}

// immediate flush if no packets in channel and write buffer is not empty
if len(cl.State.outbound) == 0 && cl.Net.bconn.Writer.Buffered() > 0 {
if err = cl.Net.bconn.Flush(); nil != err {
return 0, err
}
}

return int64(length), nil
}()

if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"
"testing"
"time"
"bufio"

"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/listeners"
Expand Down Expand Up @@ -2468,6 +2469,10 @@ func TestServerProcessInboundQos2Flow(t *testing.T) {
t.Run("qos step"+strconv.Itoa(i), func(t *testing.T) {
r, w = net.Pipe()
cl.Net.Conn = w
cl.Net.bconn = bufio.NewReadWriter(
bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize),
bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize),
)

recv := make(chan []byte)
go func() { // receive the ack
Expand Down Expand Up @@ -2543,6 +2548,10 @@ func TestServerProcessOutboundQos2Flow(t *testing.T) {
r, w := net.Pipe()
time.Sleep(time.Millisecond)
cl.Net.Conn = w
cl.Net.bconn = bufio.NewReadWriter(
bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize),
bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize),
)

recv := make(chan []byte)
go func() { // receive the ack
Expand Down

0 comments on commit 8e52e49

Please sign in to comment.