From a0050b4d5de23c27671d2a614c390de0b2fe286d Mon Sep 17 00:00:00 2001 From: andyl Date: Wed, 13 Mar 2024 17:08:43 +0800 Subject: [PATCH 1/8] opt: client dial with context --- client_test.go | 58 ++++++++++++++++------------------------------- client_unix.go | 11 ++++++++- client_windows.go | 14 +++++++++++- 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/client_test.go b/client_test.go index 91f226e03..ba657e5c8 100644 --- a/client_test.go +++ b/client_test.go @@ -7,7 +7,6 @@ import ( "io" "math/rand" "net" - "sync" "sync/atomic" "testing" "time" @@ -21,12 +20,17 @@ import ( goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" ) +type connHandler struct { + network string + rspCh chan []byte + data []byte +} + type clientEvents struct { *BuiltinEventEngine tester *testing.T svr *testClientServer packetLen int - rspChMap sync.Map } func (ev *clientEvents) OnBoot(e Engine) Action { @@ -38,9 +42,6 @@ func (ev *clientEvents) OnBoot(e Engine) Action { } func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) { - c.SetContext([]byte{}) - rspCh := make(chan []byte, 1) - ev.rspChMap.Store(c.LocalAddr().String(), rspCh) return nil, None } @@ -54,24 +55,18 @@ func (ev *clientEvents) OnClose(Conn, error) Action { } func (ev *clientEvents) OnTraffic(c Conn) (action Action) { - ctx := c.Context() - var p []byte - if ctx != nil { - p = ctx.([]byte) - } else { // UDP + handler := c.Context().(*connHandler) + if handler.network == "udp" { ev.packetLen = 1024 } buf, err := c.Next(-1) assert.NoError(ev.tester, err) - p = append(p, buf...) - if len(p) < ev.packetLen { - c.SetContext(p) + handler.data = append(handler.data, buf...) + if len(handler.data) < ev.packetLen { return } - v, _ := ev.rspChMap.Load(c.LocalAddr().String()) - rspCh := v.(chan []byte) - rspCh <- p - c.SetContext([]byte{}) + handler.rspCh <- handler.data + handler.data = handler.data[:0] return } @@ -330,38 +325,23 @@ func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr c Conn err error ) + var handler = &connHandler{ + network: network, + rspCh: make(chan []byte, 1), + } if netDial { var netConn net.Conn netConn, err = NetDial(network, addr) require.NoError(t, err) - c, err = cli.Enroll(netConn) + c, err = cli.EnrollWithContext(netConn, handler) } else { - c, err = cli.Dial(network, addr) + c, err = cli.DialWithContext(network, addr, handler) } require.NoError(t, err) defer c.Close() err = c.Wake(nil) require.NoError(t, err) - var rspCh chan []byte - if network == "udp" { - rspCh = make(chan []byte, 1) - ev.rspChMap.Store(c.LocalAddr().String(), rspCh) - } else { - var ( - v interface{} - ok bool - ) - start := time.Now() - for time.Since(start) < time.Second { - v, ok = ev.rspChMap.Load(c.LocalAddr().String()) - if ok { - break - } - time.Sleep(10 * time.Millisecond) - } - require.True(t, ok) - rspCh = v.(chan []byte) - } + var rspCh = handler.rspCh duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 t.Logf("test duration: %dms", duration/time.Millisecond) start := time.Now() diff --git a/client_unix.go b/client_unix.go index 70bf37c7c..ee808f069 100644 --- a/client_unix.go +++ b/client_unix.go @@ -140,15 +140,23 @@ func (cli *Client) Stop() (err error) { // Dial is like net.Dial(). func (cli *Client) Dial(network, address string) (Conn, error) { + return cli.DialWithContext(network, address, nil) +} + +func (cli *Client) DialWithContext(network, address string, ctx interface{}) (Conn, error) { c, err := net.Dial(network, address) if err != nil { return nil, err } - return cli.Enroll(c) + return cli.EnrollWithContext(c, ctx) } // Enroll converts a net.Conn to gnet.Conn and then adds it into Client. func (cli *Client) Enroll(c net.Conn) (Conn, error) { + return cli.EnrollWithContext(c, nil) +} + +func (cli *Client) EnrollWithContext(c net.Conn, ctx interface{}) (Conn, error) { defer c.Close() sc, ok := c.(syscall.Conn) @@ -222,5 +230,6 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) { gc.Close() return nil, err } + gc.SetContext(ctx) return gc, nil } diff --git a/client_windows.go b/client_windows.go index d90951b53..ae9baae2a 100644 --- a/client_windows.go +++ b/client_windows.go @@ -118,6 +118,10 @@ func unixAddr(addr string) string { } func (cli *Client) Dial(network, addr string) (Conn, error) { + return cli.DialWithContext(network, addr, nil) +} + +func (cli *Client) DialWithContext(network, addr string, ctx interface{}) (Conn, error) { var ( c net.Conn err error @@ -135,10 +139,14 @@ func (cli *Client) Dial(network, addr string) (Conn, error) { return nil, err } } - return cli.Enroll(c) + return cli.EnrollWithContext(c, ctx) } func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { + return cli.EnrollWithContext(nc, nil) +} + +func (cli *Client) EnrollWithContext(nc net.Conn, ctx interface{}) (gc Conn, err error) { switch v := nc.(type) { case *net.TCPConn: if cli.opts.TCPNoDelay == TCPNoDelay { @@ -156,6 +164,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { } c := newTCPConn(nc, cli.el) + c.SetContext(ctx) cli.el.ch <- c go func(c *conn, tc net.Conn, el *eventloop) { var buffer [0x10000]byte @@ -171,6 +180,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { gc = c case *net.UnixConn: c := newTCPConn(nc, cli.el) + c.SetContext(ctx) cli.el.ch <- c go func(c *conn, uc net.Conn, el *eventloop) { var buffer [0x10000]byte @@ -192,6 +202,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { gc = c case *net.UDPConn: c := newUDPConn(cli.el, nc.LocalAddr(), nc.RemoteAddr()) + c.SetContext(ctx) c.rawConn = nc go func(uc net.Conn, el *eventloop) { var buffer [0x10000]byte @@ -201,6 +212,7 @@ func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { return } c := newUDPConn(cli.el, uc.LocalAddr(), uc.RemoteAddr()) + c.SetContext(ctx) c.rawConn = uc el.ch <- packUDPConn(c, buffer[:n]) } From 5e4d33f08536bb9e0756a3d598552d7bab5bec20 Mon Sep 17 00:00:00 2001 From: andyl Date: Thu, 14 Mar 2024 14:24:17 +0800 Subject: [PATCH 2/8] chore: add some comments on DialContext --- client_test.go | 6 +++--- client_unix.go | 12 +++++++----- client_windows.go | 10 +++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/client_test.go b/client_test.go index ba657e5c8..57e96f2ea 100644 --- a/client_test.go +++ b/client_test.go @@ -333,15 +333,15 @@ func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr var netConn net.Conn netConn, err = NetDial(network, addr) require.NoError(t, err) - c, err = cli.EnrollWithContext(netConn, handler) + c, err = cli.EnrollContext(netConn, handler) } else { - c, err = cli.DialWithContext(network, addr, handler) + c, err = cli.DialContext(network, addr, handler) } require.NoError(t, err) defer c.Close() err = c.Wake(nil) require.NoError(t, err) - var rspCh = handler.rspCh + rspCh := handler.rspCh duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 t.Logf("test duration: %dms", duration/time.Millisecond) start := time.Now() diff --git a/client_unix.go b/client_unix.go index ee808f069..0995c6d6a 100644 --- a/client_unix.go +++ b/client_unix.go @@ -140,23 +140,25 @@ func (cli *Client) Stop() (err error) { // Dial is like net.Dial(). func (cli *Client) Dial(network, address string) (Conn, error) { - return cli.DialWithContext(network, address, nil) + return cli.DialContext(network, address, nil) } -func (cli *Client) DialWithContext(network, address string, ctx interface{}) (Conn, error) { +// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context. +func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) { c, err := net.Dial(network, address) if err != nil { return nil, err } - return cli.EnrollWithContext(c, ctx) + return cli.EnrollContext(c, ctx) } // Enroll converts a net.Conn to gnet.Conn and then adds it into Client. func (cli *Client) Enroll(c net.Conn) (Conn, error) { - return cli.EnrollWithContext(c, nil) + return cli.EnrollContext(c, nil) } -func (cli *Client) EnrollWithContext(c net.Conn, ctx interface{}) (Conn, error) { +// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context. +func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) { defer c.Close() sc, ok := c.(syscall.Conn) diff --git a/client_windows.go b/client_windows.go index ae9baae2a..b856bf7c3 100644 --- a/client_windows.go +++ b/client_windows.go @@ -118,10 +118,10 @@ func unixAddr(addr string) string { } func (cli *Client) Dial(network, addr string) (Conn, error) { - return cli.DialWithContext(network, addr, nil) + return cli.DialContext(network, addr, nil) } -func (cli *Client) DialWithContext(network, addr string, ctx interface{}) (Conn, error) { +func (cli *Client) DialContext(network, addr string, ctx interface{}) (Conn, error) { var ( c net.Conn err error @@ -139,14 +139,14 @@ func (cli *Client) DialWithContext(network, addr string, ctx interface{}) (Conn, return nil, err } } - return cli.EnrollWithContext(c, ctx) + return cli.EnrollContext(c, ctx) } func (cli *Client) Enroll(nc net.Conn) (gc Conn, err error) { - return cli.EnrollWithContext(nc, nil) + return cli.EnrollContext(nc, nil) } -func (cli *Client) EnrollWithContext(nc net.Conn, ctx interface{}) (gc Conn, err error) { +func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err error) { switch v := nc.(type) { case *net.TCPConn: if cli.opts.TCPNoDelay == TCPNoDelay { From 3992ceab92cde785f8ffcd0611267936fdfb1efe Mon Sep 17 00:00:00 2001 From: andyl Date: Thu, 14 Mar 2024 21:26:52 +0800 Subject: [PATCH 3/8] chore: gofumpt client_test.go --- client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client_test.go b/client_test.go index 57e96f2ea..a01f168ff 100644 --- a/client_test.go +++ b/client_test.go @@ -325,7 +325,7 @@ func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr c Conn err error ) - var handler = &connHandler{ + handler := &connHandler{ network: network, rspCh: make(chan []byte, 1), } From ae751f183df5b1efed493a71f31596b9925534db Mon Sep 17 00:00:00 2001 From: andyl Date: Mon, 18 Mar 2024 10:03:50 +0800 Subject: [PATCH 4/8] chore: client_test.go code format --- client_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index a01f168ff..66ff0f8a1 100644 --- a/client_test.go +++ b/client_test.go @@ -42,6 +42,7 @@ func (ev *clientEvents) OnBoot(e Engine) Action { } func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) { + _ = c return nil, None } @@ -272,7 +273,7 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) { if i%2 == 0 { netConn = true } - go startGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async, netConn) + go startGnetClient(s.tester, s.client, s.network, s.addr, s.multicore, s.async, netConn) } } if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 { @@ -319,7 +320,7 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus assert.NoError(t, err) } -func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr string, multicore, async, netDial bool) { +func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore, async, netDial bool) { rand.Seed(time.Now().UnixNano()) var ( c Conn From d58b4c03ad7436c227081c02dbe23aa8af6ba25b Mon Sep 17 00:00:00 2001 From: andyl Date: Mon, 18 Mar 2024 10:16:32 +0800 Subject: [PATCH 5/8] chore: client_test.go code format --- client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index 66ff0f8a1..6b8145a34 100644 --- a/client_test.go +++ b/client_test.go @@ -41,8 +41,7 @@ func (ev *clientEvents) OnBoot(e Engine) Action { return None } -func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) { - _ = c +func (ev *clientEvents) OnOpen(_ Conn) ([]byte, Action) { return nil, None } From 7f6cfcf045b0a6bc7e7b4d72ee733939b497bb6d Mon Sep 17 00:00:00 2001 From: andyl Date: Mon, 18 Mar 2024 11:05:23 +0800 Subject: [PATCH 6/8] chore: client_test.go code format --- client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client_test.go b/client_test.go index 6b8145a34..b981e7ae9 100644 --- a/client_test.go +++ b/client_test.go @@ -41,7 +41,7 @@ func (ev *clientEvents) OnBoot(e Engine) Action { return None } -func (ev *clientEvents) OnOpen(_ Conn) ([]byte, Action) { +func (ev *clientEvents) OnOpen(Conn) ([]byte, Action) { return nil, None } From 5e3f1cf85e9e81509899c638d4fca23be217ef20 Mon Sep 17 00:00:00 2001 From: andyl Date: Mon, 18 Mar 2024 16:20:27 +0800 Subject: [PATCH 7/8] bug: fix SetContext and clientEvents OnTraffic data race --- client_test.go | 2 +- client_unix.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index b981e7ae9..ce26d1004 100644 --- a/client_test.go +++ b/client_test.go @@ -66,7 +66,7 @@ func (ev *clientEvents) OnTraffic(c Conn) (action Action) { return } handler.rspCh <- handler.data - handler.data = handler.data[:0] + handler.data = nil return } diff --git a/client_unix.go b/client_unix.go index 0995c6d6a..fc4470b3f 100644 --- a/client_unix.go +++ b/client_unix.go @@ -227,11 +227,11 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) { default: return nil, errorx.ErrUnsupportedProtocol } + gc.SetContext(ctx) err = cli.el.poller.UrgentTrigger(cli.el.register, gc) if err != nil { gc.Close() return nil, err } - gc.SetContext(ctx) return gc, nil } From fa1e739a80f8e52079797aa8cfd3d984f6a3514a Mon Sep 17 00:00:00 2001 From: andyl Date: Tue, 19 Mar 2024 09:51:34 +0800 Subject: [PATCH 8/8] chore: delete client_test.go unused code --- client_test.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/client_test.go b/client_test.go index ce26d1004..f6b4fd6e8 100644 --- a/client_test.go +++ b/client_test.go @@ -41,10 +41,6 @@ func (ev *clientEvents) OnBoot(e Engine) Action { return None } -func (ev *clientEvents) OnOpen(Conn) ([]byte, Action) { - return nil, None -} - func (ev *clientEvents) OnClose(Conn, error) Action { if ev.svr != nil { if atomic.AddInt32(&ev.svr.clientActive, -1) == 0 { @@ -195,7 +191,6 @@ func TestServeWithGnetClient(t *testing.T) { type testClientServer struct { *BuiltinEventEngine client *Client - clientEV *clientEvents tester *testing.T eng Engine network string @@ -293,9 +288,9 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus workerPool: goPool.Default(), } var err error - ts.clientEV = &clientEvents{tester: t, packetLen: streamLen, svr: ts} + clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts} ts.client, err = NewClient( - ts.clientEV, + clientEV, WithLogLevel(logging.DebugLevel), WithLockOSThread(true), WithTicker(true),