Skip to content

Commit

Permalink
Revert "improve transport performance with bufio (#321)" (#323)
Browse files Browse the repository at this point in the history
This reverts commit 8e52e49.
  • Loading branch information
mochi-co authored Oct 24, 2023
1 parent 8e52e49 commit 99e50ae
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 24 deletions.
17 changes: 2 additions & 15 deletions clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,25 +584,12 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
}

nb := net.Buffers{buf.Bytes()}
n, err := func() (int64, error) {
cl.Lock()
defer cl.Unlock()

length, err := cl.Net.bconn.Write(buf.Bytes())
if nil != err {
return 0, err
}

// immediate flush if no packets in channel and write buffer is not empty
if len(cl.State.outbound) == 0 && cl.Net.bconn.Writer.Buffered() > 0 {
if err = cl.Net.bconn.Flush(); nil != err {
return 0, err
}
}

return int64(length), nil
return nb.WriteTo(cl.Net.Conn)
}()

if err != nil {
return err
}
Expand Down
9 changes: 0 additions & 9 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"sync/atomic"
"testing"
"time"
"bufio"

"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/listeners"
Expand Down Expand Up @@ -2469,10 +2468,6 @@ func TestServerProcessInboundQos2Flow(t *testing.T) {
t.Run("qos step"+strconv.Itoa(i), func(t *testing.T) {
r, w = net.Pipe()
cl.Net.Conn = w
cl.Net.bconn = bufio.NewReadWriter(
bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize),
bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize),
)

recv := make(chan []byte)
go func() { // receive the ack
Expand Down Expand Up @@ -2548,10 +2543,6 @@ func TestServerProcessOutboundQos2Flow(t *testing.T) {
r, w := net.Pipe()
time.Sleep(time.Millisecond)
cl.Net.Conn = w
cl.Net.bconn = bufio.NewReadWriter(
bufio.NewReaderSize(w, cl.ops.options.ClientNetReadBufferSize),
bufio.NewWriterSize(w, cl.ops.options.ClientNetWriteBufferSize),
)

recv := make(chan []byte)
go func() { // receive the ack
Expand Down

0 comments on commit 99e50ae

Please sign in to comment.