Skip to content

Commit

Permalink
opt: use worker pool for asynchronous tasks on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Nov 13, 2024
1 parent 2f1bf9b commit 5f2d2a4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
45 changes: 30 additions & 15 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"syscall"
"time"

"github.com/panjf2000/ants/v2"
"golang.org/x/sys/windows"

"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

type netErr struct {
Expand Down Expand Up @@ -404,6 +406,19 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
return nil
}

type nonBlockingPool struct {
*goPool.Pool
}

func (np *nonBlockingPool) Go(task func()) (err error) {
if err = np.Submit(task); err == ants.ErrPoolOverload {
go task()
}

Check warning on line 416 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L415-L416

Added lines #L415 - L416 were not covered by tests
return
}

var workerPool = nonBlockingPool{Pool: goPool.Default()}

// Gfd return an uninitialized GFD which is not valid,
// this method is only implemented for compatibility, don't use it on Windows.
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }
Expand All @@ -422,12 +437,12 @@ func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
case c.loop.ch <- callback:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
err = workerPool.Go(func() {
c.loop.ch <- callback
}()
})
}

return nil
return err
}

func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error {
Expand All @@ -444,7 +459,7 @@ func (c *conn) AsyncWritev(bs [][]byte, cb AsyncCallback) error {
})
}

func (c *conn) Wake(cb AsyncCallback) error {
func (c *conn) Wake(cb AsyncCallback) (err error) {
wakeFn := func() (err error) {
err = c.loop.wake(c)
if cb != nil {
Expand All @@ -457,15 +472,15 @@ func (c *conn) Wake(cb AsyncCallback) error {
case c.loop.ch <- wakeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
err = workerPool.Go(func() {

Check warning on line 475 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L475

Added line #L475 was not covered by tests
c.loop.ch <- wakeFn
}()
})

Check warning on line 477 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L477

Added line #L477 was not covered by tests
}

return nil
return
}

func (c *conn) Close() error {
func (c *conn) Close() (err error) {
closeFn := func() error {
return c.loop.close(c, nil)
}
Expand All @@ -474,15 +489,15 @@ func (c *conn) Close() error {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
err = workerPool.Go(func() {

Check warning on line 492 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L492

Added line #L492 was not covered by tests
c.loop.ch <- closeFn
}()
})

Check warning on line 494 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L494

Added line #L494 was not covered by tests
}

return nil
return
}

func (c *conn) CloseWithCallback(cb AsyncCallback) error {
func (c *conn) CloseWithCallback(cb AsyncCallback) (err error) {
closeFn := func() (err error) {
err = c.loop.close(c, nil)
if cb != nil {
Expand All @@ -495,12 +510,12 @@ func (c *conn) CloseWithCallback(cb AsyncCallback) error {
case c.loop.ch <- closeFn:
default:
// If the event-loop channel is full, asynchronize this operation to avoid blocking the eventloop.
go func() {
err = workerPool.Go(func() {

Check warning on line 513 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L513

Added line #L513 was not covered by tests
c.loop.ch <- closeFn
}()
})

Check warning on line 515 in connection_windows.go

View check run for this annotation

Codecov / codecov/patch

connection_windows.go#L515

Added line #L515 was not covered by tests
}

return nil
return
}

func (*conn) SetDeadline(_ time.Time) error {
Expand Down
3 changes: 2 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ type Writer interface {
//
// Note that the parameter gnet.Conn might have been already released when it's UDP protocol,
// thus it shouldn't be accessed.
// This callback must not block, otherwise, it blocks the event-loop.
// This callback will be executed in event-loop, thus it must not block, otherwise,
// it blocks the event-loop.
type AsyncCallback func(c Conn, err error) error

// Socket is a set of functions which manipulate the underlying file descriptor of a connection.
Expand Down

0 comments on commit 5f2d2a4

Please sign in to comment.