Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch: v2.5.5 #620

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@ import (
func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
switch err {
case nil:
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
Expand Down Expand Up @@ -71,17 +70,16 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
}

nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
switch err {
case nil:
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
Expand Down
33 changes: 29 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"io"
"math/rand"
"net"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
Expand Down Expand Up @@ -437,7 +439,7 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
clientEV,
WithLogLevel(logging.DebugLevel),
WithTCPNoDelay(TCPNoDelay),
WithLockOSThread(true),
WithTicker(true),
)
Expand All @@ -455,7 +457,6 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy
WithReusePort(reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithTCPNoDelay(TCPDelay),
WithLoadBalancing(lb))
assert.NoError(t, err)
}
Expand Down Expand Up @@ -599,9 +600,22 @@ func testConnWakeImmediately(t *testing.T, client *Client, clientEV *clientEvent
}

func TestWakeConnImmediately(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

clientEV := &clientEventsForWake{tester: t}
client, err := NewClient(clientEV, WithLogLevel(logging.DebugLevel))
logPath := filepath.Join(t.TempDir(), "gnet-test-wake-conn-immediately.log")
client, err := NewClient(clientEV,
WithSocketRecvBuffer(4*1024),
WithSocketSendBuffer(4*1024),
WithLogPath(logPath),
WithLogLevel(logging.WarnLevel),
WithReadBufferCap(512),
WithWriteBufferCap(512))
assert.NoError(t, err)
logging.Cleanup()

err = client.Start()
assert.NoError(t, err)
Expand All @@ -614,6 +628,11 @@ func TestWakeConnImmediately(t *testing.T) {
}

func TestClientReadOnEOF(t *testing.T) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

ln, err := net.Listen("tcp", "127.0.0.1:9999")
assert.NoError(t, err)
defer ln.Close()
Expand All @@ -635,7 +654,13 @@ func TestClientReadOnEOF(t *testing.T) {
}, 1),
data: []byte("test"),
}
cli, err := NewClient(ev)
cli, err := NewClient(ev,
WithSocketRecvBuffer(4*1024),
WithSocketSendBuffer(4*1024),
WithTCPKeepAlive(time.Minute),
WithLogger(zap.NewExample().Sugar()),
WithReadBufferCap(32*1024),
WithWriteBufferCap(32*1024))
assert.NoError(t, err)
defer cli.Stop() //nolint:errcheck

Expand Down
13 changes: 8 additions & 5 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,16 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
)
switch c.(type) {
case *net.UnixConn:
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
ua := c.LocalAddr().(*net.UnixAddr)
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(dupFD)
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.TCPConn:
if cli.opts.TCPNoDelay == TCPDelay {
if err = socket.SetNoDelay(dupFD, 0); err != nil {
if cli.opts.TCPNoDelay == TCPNoDelay {
if err = socket.SetNoDelay(dupFD, 1); err != nil {
return nil, err
}
}
Expand All @@ -217,12 +218,14 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
return nil, err
}
}
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
if err != nil {
return nil, err
}
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
Expand Down
16 changes: 14 additions & 2 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math/rand"
"net"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -642,7 +643,7 @@ func runServer(t *testing.T, addrs []string, et, reuseport, multicore, async, wr
WithReusePort(reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute),
WithTCPNoDelay(TCPDelay),
WithTCPNoDelay(TCPNoDelay),
WithLoadBalancing(lb))
} else {
err = Run(ts,
Expand Down Expand Up @@ -883,8 +884,19 @@ func (t *testShutdownServer) OnTick() (delay time.Duration, action Action) {
}

func testShutdown(t *testing.T, network, addr string) {
currentLogger, currentFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher()
t.Cleanup(func() {
logging.SetDefaultLoggerAndFlusher(currentLogger, currentFlusher) // restore
})

events := &testShutdownServer{tester: t, network: network, addr: addr, N: 100}
err := Run(events, network+"://"+addr, WithTicker(true), WithReadBufferCap(512), WithWriteBufferCap(512))
logPath := filepath.Join(t.TempDir(), "gnet-test-shutdown.log")
err := Run(events, network+"://"+addr,
WithLogPath(logPath),
WithLogLevel(logging.WarnLevel),
WithTicker(true),
WithReadBufferCap(512),
WithWriteBufferCap(512))
assert.NoError(t, err)
require.Equal(t, 0, int(events.clients), "did not close all clients")
}
Expand Down
7 changes: 5 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ type Options struct {

// TCPNoDelay controls whether the operating system should delay
// packet transmission in hopes of sending fewer packets (Nagle's algorithm).
// When this option is assign to TCPNoDelay, TCP_NODELAY socket option will
// be turned on, on the contrary, if it is assigned to TCPDelay, the socket
// option will be turned off.
//
// The default is true (no delay), meaning that data is sent
// as soon as possible after a write operation.
// The default is TCPNoDelay, meaning that TCP_NODELAY is turned on and data
// will not be buffered but sent as soon as possible after a write operation.
TCPNoDelay TCPSocketOpt

// SocketRecvBuffer sets the maximum socket receive buffer in bytes.
Expand Down
Loading