diff --git a/connection_windows.go b/connection_windows.go index 92b2666c0..443404c4a 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -21,11 +21,13 @@ import ( "syscall" "time" + "github.com/panjf2000/ants/v2" "golang.org/x/sys/windows" "github.com/panjf2000/gnet/v2/pkg/buffer/elastic" errorx "github.com/panjf2000/gnet/v2/pkg/errors" bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer" + goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" ) type netErr struct { @@ -404,6 +406,19 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error { return nil } +type nonBlockingPool struct { + *goPool.Pool +} + +func (np *nonBlockingPool) Go(task func()) (err error) { + if err = np.Submit(task); err == ants.ErrPoolOverload { + go task() + } + return +} + +var workerPool = nonBlockingPool{Pool: goPool.Default()} + // Gfd return an uninitialized GFD which is not valid, // this method is only implemented for compatibility, don't use it on Windows. // func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} } @@ -422,12 +437,12 @@ func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error { case c.loop.ch <- callback: default: // If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop. - go func() { + err = workerPool.Go(func() { c.loop.ch <- callback - }() + }) } - return nil + return err } func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error { @@ -444,7 +459,7 @@ func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error { }) } -func (c *conn) Wake(cb AsyncCallback) error { +func (c *conn) Wake(cb AsyncCallback) (err error) { wakeFn := func() (err error) { err = c.loop.wake(c) if cb != nil { @@ -457,15 +472,15 @@ func (c *conn) Wake(cb AsyncCallback) error { case c.loop.ch <- wakeFn: default: // If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop. - go func() { + err = workerPool.Go(func() { c.loop.ch <- wakeFn - }() + }) } - return nil + return } -func (c *conn) Close() error { +func (c *conn) Close() (err error) { closeFn := func() error { return c.loop.close(c, nil) } @@ -474,15 +489,15 @@ func (c *conn) Close() error { case c.loop.ch <- closeFn: default: // If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop. - go func() { + err = workerPool.Go(func() { c.loop.ch <- closeFn - }() + }) } - return nil + return } -func (c *conn) CloseWithCallback(cb AsyncCallback) error { +func (c *conn) CloseWithCallback(cb AsyncCallback) (err error) { closeFn := func() (err error) { err = c.loop.close(c, nil) if cb != nil { @@ -495,12 +510,12 @@ func (c *conn) CloseWithCallback(cb AsyncCallback) error { case c.loop.ch <- closeFn: default: // If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop. - go func() { + err = workerPool.Go(func() { c.loop.ch <- closeFn - }() + }) } - return nil + return } func (*conn) SetDeadline(_ time.Time) error { diff --git a/gnet.go b/gnet.go index 4d934f737..b2572932f 100644 --- a/gnet.go +++ b/gnet.go @@ -241,7 +241,8 @@ type Writer interface { // // Note that the parameter gnet.Conn might have been already released when it's UDP protocol, // thus it shouldn't be accessed. -// This callback must not block, otherwise, it blocks the event-loop. +// This callback will be executed in event-loop, thus it must not block, otherwise, +// it blocks the event-loop. type AsyncCallback func(c Conn, err error) error // Socket is a set of functions which manipulate the underlying file descriptor of a connection.