From bb32a8cd19160580292916f033de70baffd94a8e Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 29 Mar 2024 21:50:57 +0800 Subject: [PATCH 1/3] opt: mitigate the latency issue by prioritizing asynchronous writes Fixes #423 --- acceptor_unix.go | 3 +- client_unix.go | 5 ++- connection_unix.go | 11 ++--- engine_unix.go | 5 ++- eventloop_unix.go | 5 ++- internal/netpoll/epoll_default_poller.go | 45 +++++++++------------ internal/netpoll/epoll_optimized_poller.go | 45 +++++++++------------ internal/netpoll/kqueue_default_poller.go | 41 ++++++++----------- internal/netpoll/kqueue_optimized_poller.go | 41 ++++++++----------- internal/queue/lock_free_queue.go | 4 ++ internal/queue/queue.go | 13 ++++++ 11 files changed, 103 insertions(+), 115 deletions(-) diff --git a/acceptor_unix.go b/acceptor_unix.go index cb752c60b..7297463ed 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -23,6 +23,7 @@ import ( "golang.org/x/sys/unix" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/logging" @@ -51,7 +52,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { el := eng.eventLoops.next(remoteAddr) c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) - err = el.poller.UrgentTrigger(el.register, c) + err = el.poller.Trigger(queue.HighPriority, el.register, c) if err != nil { eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err) _ = unix.Close(nfd) diff --git a/client_unix.go b/client_unix.go index 1381829e9..7c50d0608 100644 --- a/client_unix.go +++ b/client_unix.go @@ -30,6 +30,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/math" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/buffer/ring" errorx "github.com/panjf2000/gnet/v2/pkg/errors" @@ -126,7 +127,7 @@ func (cli *Client) Start() error { // Stop stops the client event-loop. func (cli *Client) Stop() (err error) { - logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)) + logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)) // Stop the ticker. if cli.opts.Ticker { cli.el.engine.ticker.cancel() @@ -233,7 +234,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) { ccb := &connWithCallback{c: gc, cb: func() { close(connOpened) }} - err = cli.el.poller.UrgentTrigger(cli.el.register, ccb) + err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb) if err != nil { gc.Close() return nil, err diff --git a/connection_unix.go b/connection_unix.go index ba7cffa77..18a33d2fe 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -29,6 +29,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/gfd" gio "github.com/panjf2000/gnet/v2/internal/io" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/internal/socket" "github.com/panjf2000/gnet/v2/pkg/buffer/elastic" errorx "github.com/panjf2000/gnet/v2/pkg/errors" @@ -442,18 +443,18 @@ func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error { } return err } - return c.loop.poller.Trigger(c.asyncWrite, &asyncWriteHook{callback, buf}) + return c.loop.poller.Trigger(queue.HighPriority, c.asyncWrite, &asyncWriteHook{callback, buf}) } func (c *conn) AsyncWritev(bs [][]byte, callback AsyncCallback) error { if c.isDatagram { return errorx.ErrUnsupportedOp } - return c.loop.poller.Trigger(c.asyncWritev, &asyncWritevHook{callback, bs}) + return c.loop.poller.Trigger(queue.HighPriority, c.asyncWritev, &asyncWritevHook{callback, bs}) } func (c *conn) Wake(callback AsyncCallback) error { - return c.loop.poller.UrgentTrigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.wake(c) if callback != nil { _ = callback(c, err) @@ -463,7 +464,7 @@ func (c *conn) Wake(callback AsyncCallback) error { } func (c *conn) CloseWithCallback(callback AsyncCallback) error { - return c.loop.poller.Trigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.close(c, nil) if callback != nil { _ = callback(c, err) @@ -473,7 +474,7 @@ func (c *conn) CloseWithCallback(callback AsyncCallback) error { } func (c *conn) Close() error { - return c.loop.poller.Trigger(func(_ interface{}) (err error) { + return c.loop.poller.Trigger(queue.LowPriority, func(_ interface{}) (err error) { err = c.loop.close(c, nil) return }, nil) diff --git a/engine_unix.go b/engine_unix.go index 3d2fc9cf2..69e5d4ab9 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -27,6 +27,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/gfd" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/pkg/errors" ) @@ -203,14 +204,14 @@ func (eng *engine) stop(s Engine) { // Notify all event-loops to exit. eng.eventLoops.iterate(func(_ int, el *eventloop) bool { - err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) + err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err) } return true }) if eng.acceptor != nil { - err := eng.acceptor.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) + err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err) } diff --git a/eventloop_unix.go b/eventloop_unix.go index c06df0fe0..245b7ca3f 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -30,6 +30,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/io" "github.com/panjf2000/gnet/v2/internal/netpoll" + "github.com/panjf2000/gnet/v2/internal/queue" errorx "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/logging" ) @@ -253,7 +254,9 @@ func (el *eventloop) ticker(ctx context.Context) { switch action { case None: case Shutdown: - err := el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil) + // It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes + // to finish up before shutting down the service. + err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil) el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err) } if timer == nil { diff --git a/internal/netpoll/epoll_default_poller.go b/internal/netpoll/epoll_default_poller.go index a83613aaf..aa98d0b87 100644 --- a/internal/netpoll/epoll_default_poller.go +++ b/internal/netpoll/epoll_default_poller.go @@ -33,12 +33,13 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int // epoll fd - efd int // eventfd - efdBuf []byte // efd buffer to read an 8-byte integer - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int // epoll fd + efd int // eventfd + efdBuf []byte // efd buffer to read an 8-byte integer + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -63,6 +64,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -81,31 +83,20 @@ var ( b = (*(*[8]byte)(unsafe.Pointer(&u)))[:] ) -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.efd, b); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("write", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Write(p.efd, b); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/epoll_optimized_poller.go b/internal/netpoll/epoll_optimized_poller.go index c4fb008ea..1aa02cf9e 100644 --- a/internal/netpoll/epoll_optimized_poller.go +++ b/internal/netpoll/epoll_optimized_poller.go @@ -32,12 +32,13 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int // epoll fd - epa *PollAttachment // PollAttachment for waking events - efdBuf []byte // efd buffer to read an 8-byte integer - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int // epoll fd + epa *PollAttachment // PollAttachment for waking events + efdBuf []byte // efd buffer to read an 8-byte integer + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -64,6 +65,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -82,31 +84,20 @@ var ( b = (*(*[8]byte)(unsafe.Pointer(&u)))[:] ) -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("write", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Write(p.epa.FD, b); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/kqueue_default_poller.go b/internal/netpoll/kqueue_default_poller.go index 3431c4286..6791971ac 100644 --- a/internal/netpoll/kqueue_default_poller.go +++ b/internal/netpoll/kqueue_default_poller.go @@ -32,10 +32,11 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -58,6 +59,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -72,31 +74,20 @@ var note = []unix.Kevent_t{{ Fflags: unix.NOTE_TRIGGER, }} -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("kevent trigger", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { err = nil diff --git a/internal/netpoll/kqueue_optimized_poller.go b/internal/netpoll/kqueue_optimized_poller.go index 5b56bcd2e..18ce168a7 100644 --- a/internal/netpoll/kqueue_optimized_poller.go +++ b/internal/netpoll/kqueue_optimized_poller.go @@ -33,10 +33,11 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { - fd int - wakeupCall int32 - asyncTaskQueue queue.AsyncTaskQueue // queue with low priority - urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + fd int + wakeupCall int32 + asyncTaskQueue queue.AsyncTaskQueue // queue with low priority + urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority + highPriorityEventsThreshold int32 // threshold of high-priority events } // OpenPoller instantiates a poller. @@ -59,6 +60,7 @@ func OpenPoller() (poller *Poller, err error) { } poller.asyncTaskQueue = queue.NewLockFreeQueue() poller.urgentAsyncTaskQueue = queue.NewLockFreeQueue() + poller.highPriorityEventsThreshold = MaxPollEventsCap return } @@ -73,31 +75,20 @@ var note = []unix.Kevent_t{{ Fflags: unix.NOTE_TRIGGER, }} -// UrgentTrigger puts task into urgentAsyncTaskQueue and wakes up the poller which is waiting for network-events, -// then the poller will get tasks from urgentAsyncTaskQueue and run them. +// Trigger enqueues task and wakes up the poller to process pending tasks. +// By default, any incoming task will enqueued into urgentAsyncTaskQueue +// before the threshold of high-priority events is reached. When it happens, +// any asks other than high-priority tasks will be shunted to asyncTaskQueue. // -// Note that urgentAsyncTaskQueue is a queue with high-priority and its size is expected to be small, -// so only those urgent tasks should be put into this queue. -func (p *Poller) UrgentTrigger(fn queue.TaskFunc, arg interface{}) (err error) { +// Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog. +func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg interface{}) (err error) { task := queue.GetTask() task.Run, task.Arg = fn, arg - p.urgentAsyncTaskQueue.Enqueue(task) - if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { + p.asyncTaskQueue.Enqueue(task) + } else { + p.urgentAsyncTaskQueue.Enqueue(task) } - return os.NewSyscallError("kevent trigger", err) -} - -// Trigger is like UrgentTrigger but it puts task into asyncTaskQueue, -// call this method when the task is not so urgent, for instance writing data back to the peer. -// -// Note that asyncTaskQueue is a queue with low-priority whose size may grow large and tasks in it may backlog. -func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) { - task := queue.GetTask() - task.Run, task.Arg = fn, arg - p.asyncTaskQueue.Enqueue(task) if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { err = nil diff --git a/internal/queue/lock_free_queue.go b/internal/queue/lock_free_queue.go index a089ddbde..16099fecd 100644 --- a/internal/queue/lock_free_queue.go +++ b/internal/queue/lock_free_queue.go @@ -157,6 +157,10 @@ func (q *lockFreeQueue) IsEmpty() bool { return atomic.LoadInt32(&q.length) == 0 } +func (q *lockFreeQueue) Length() int32 { + return atomic.LoadInt32(&q.length) +} + func load(p *unsafe.Pointer) (n *node) { return (*node)(atomic.LoadPointer(p)) } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 194799a53..826f1843f 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -43,4 +43,17 @@ type AsyncTaskQueue interface { Enqueue(*Task) Dequeue() *Task IsEmpty() bool + Length() int32 } + +// EventPriority is the priority of an event. +type EventPriority int + +const ( + // HighPriority is for the tasks expected to be executed + // as soon as possible. + HighPriority EventPriority = iota + // LowPriority is for the tasks that won't matter much + // even if they are deferred a little bit. + LowPriority +) From eef395021a08a3f76fc3ff87e9e1094bea4515a7 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 30 Mar 2024 18:41:21 +0800 Subject: [PATCH 2/3] chore: update some logs --- acceptor_unix.go | 2 +- engine_unix.go | 8 ++++---- eventloop_unix.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/acceptor_unix.go b/acceptor_unix.go index 7297463ed..13177a8b8 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -54,7 +54,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) err = el.poller.Trigger(queue.HighPriority, el.register, c) if err != nil { - eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err) + eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err) _ = unix.Close(nfd) c.release() } diff --git a/engine_unix.go b/engine_unix.go index 69e5d4ab9..5040a3fe1 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -203,17 +203,17 @@ func (eng *engine) stop(s Engine) { eng.eventHandler.OnShutdown(s) // Notify all event-loops to exit. - eng.eventLoops.iterate(func(_ int, el *eventloop) bool { + eng.eventLoops.iterate(func(i int, el *eventloop) bool { err := el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { - eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err) + eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", i, err) } return true }) if eng.acceptor != nil { err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { - eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err) + eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err) } } @@ -300,7 +300,7 @@ func (eng *engine) sendCmd(cmd *asyncCmd, urgent bool) error { return errors.ErrInvalidConn } if urgent { - return el.poller.UrgentTrigger(el.execCmd, cmd) + return el.poller.Trigger(queue.LowPriority, el.execCmd, cmd) } return el.poller.Trigger(el.execCmd, cmd) } diff --git a/eventloop_unix.go b/eventloop_unix.go index 245b7ca3f..f3082f14f 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -257,7 +257,7 @@ func (el *eventloop) ticker(ctx context.Context) { // It seems reasonable to mark this as low-priority, waiting for some tasks like asynchronous writes // to finish up before shutting down the service. err := el.poller.Trigger(queue.LowPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil) - el.getLogger().Debugf("stopping ticker in event-loop(%d) from OnTick(), UrgentTrigger:%v", el.idx, err) + el.getLogger().Debugf("failed to enqueue shutdown signal of high-priority for event-loop(%d): %v", el.idx, err) } if timer == nil { timer = time.NewTimer(delay) From efb0d94c26264efec698d578faaa8265f69a3229 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 30 Mar 2024 19:03:28 +0800 Subject: [PATCH 3/3] chore: add a few comments --- internal/netpoll/epoll_default_poller.go | 2 ++ internal/netpoll/epoll_optimized_poller.go | 2 ++ internal/netpoll/kqueue_default_poller.go | 2 ++ internal/netpoll/kqueue_optimized_poller.go | 2 ++ 4 files changed, 8 insertions(+) diff --git a/internal/netpoll/epoll_default_poller.go b/internal/netpoll/epoll_default_poller.go index aa98d0b87..b8e686192 100644 --- a/internal/netpoll/epoll_default_poller.go +++ b/internal/netpoll/epoll_default_poller.go @@ -95,6 +95,8 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { p.asyncTaskQueue.Enqueue(task) } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { diff --git a/internal/netpoll/epoll_optimized_poller.go b/internal/netpoll/epoll_optimized_poller.go index 1aa02cf9e..2c5db7353 100644 --- a/internal/netpoll/epoll_optimized_poller.go +++ b/internal/netpoll/epoll_optimized_poller.go @@ -96,6 +96,8 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { p.asyncTaskQueue.Enqueue(task) } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { diff --git a/internal/netpoll/kqueue_default_poller.go b/internal/netpoll/kqueue_default_poller.go index 6791971ac..12f29b443 100644 --- a/internal/netpoll/kqueue_default_poller.go +++ b/internal/netpoll/kqueue_default_poller.go @@ -86,6 +86,8 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { p.asyncTaskQueue.Enqueue(task) } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { diff --git a/internal/netpoll/kqueue_optimized_poller.go b/internal/netpoll/kqueue_optimized_poller.go index 18ce168a7..1b5b69e7b 100644 --- a/internal/netpoll/kqueue_optimized_poller.go +++ b/internal/netpoll/kqueue_optimized_poller.go @@ -87,6 +87,8 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in if priority > queue.HighPriority && p.urgentAsyncTaskQueue.Length() >= p.highPriorityEventsThreshold { p.asyncTaskQueue.Enqueue(task) } else { + // There might be some low-priority tasks overflowing into urgentAsyncTaskQueue in a flash, + // but that's tolerable because it ought to be a rare case. p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) {