From d80775fabb6171cdbbe8243548a5e0e524ad0edf Mon Sep 17 00:00:00 2001 From: Mees Delzenne Date: Thu, 28 Mar 2024 10:34:43 +0100 Subject: [PATCH] Introduce new more efficient schedular --- Cargo.toml | 2 + core/Cargo.toml | 8 +- core/src/class.rs | 2 +- core/src/runtime.rs | 2 + core/src/runtime/raw.rs | 4 +- core/src/runtime/schedular.rs | 293 +++++++++++++++++++++++++++ core/src/runtime/schedular/queue.rs | 112 ++++++++++ core/src/runtime/schedular/vtable.rs | 41 ++++ core/src/runtime/schedular/waker.rs | 62 ++++++ core/src/runtime/spawner.rs | 61 ++---- 10 files changed, 536 insertions(+), 51 deletions(-) create mode 100644 core/src/runtime/schedular.rs create mode 100644 core/src/runtime/schedular/queue.rs create mode 100644 core/src/runtime/schedular/vtable.rs create mode 100644 core/src/runtime/schedular/waker.rs diff --git a/Cargo.toml b/Cargo.toml index 914e3373..bc453fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ package = "either" version = "1" optional = true + [workspace] members = [ "sys", @@ -128,3 +129,4 @@ trybuild = "1.0.82" [package.metadata.docs.rs] features = ["full-async", "parallel", "doc-cfg"] + diff --git a/core/Cargo.toml b/core/Cargo.toml index ca9e38b0..9c2ac1d1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -42,6 +42,11 @@ optional = true version = "1.3" optional = true +[dependencies.atomic-waker] +version = "1.1.2" +optional = true + + [features] default = [] @@ -84,7 +89,7 @@ properties = [] array-buffer = [] # Enable interop between Rust futures and JS Promises -futures = ["async-lock"] +futures = ["dep:async-lock","dep:atomic-waker"] # Allows transferring objects between different contexts of the same runtime. multi-ctx = [] @@ -128,3 +133,4 @@ trybuild = "1.0.23" [package.metadata.docs.rs] features = ["full-async", "doc-cfg"] + diff --git a/core/src/class.rs b/core/src/class.rs index 83be1701..df6db141 100644 --- a/core/src/class.rs +++ b/core/src/class.rs @@ -348,7 +348,7 @@ impl<'js> Object<'js> { if self.instance_of::() { Ok(Class(self.clone(), PhantomData)) } else { - Err(&self) + Err(self) } } diff --git a/core/src/runtime.rs b/core/src/runtime.rs index 730aced2..6e490c14 100644 --- a/core/src/runtime.rs +++ b/core/src/runtime.rs @@ -19,6 +19,8 @@ pub(crate) use r#async::InnerRuntime; #[cfg(feature = "futures")] pub use r#async::{AsyncRuntime, AsyncWeakRuntime}; #[cfg(feature = "futures")] +mod schedular; +#[cfg(feature = "futures")] mod spawner; /// A struct with information about the runtimes memory usage. diff --git a/core/src/runtime/raw.rs b/core/src/runtime/raw.rs index daa45340..ebbc5ac4 100644 --- a/core/src/runtime/raw.rs +++ b/core/src/runtime/raw.rs @@ -22,7 +22,7 @@ pub(crate) struct Opaque<'js> { pub interrupt_handler: Option, #[cfg(feature = "futures")] - pub spawner: Option>, + pub spawner: Option, _marker: PhantomData<&'js ()>, } @@ -50,7 +50,7 @@ impl<'js> Opaque<'js> { } #[cfg(feature = "futures")] - pub fn spawner(&mut self) -> &mut Spawner<'js> { + pub fn spawner(&mut self) -> &mut Spawner { self.spawner .as_mut() .expect("tried to use async function in non async runtime") diff --git a/core/src/runtime/schedular.rs b/core/src/runtime/schedular.rs new file mode 100644 index 00000000..76bbc0e3 --- /dev/null +++ b/core/src/runtime/schedular.rs @@ -0,0 +1,293 @@ +use std::{ + cell::{Cell, UnsafeCell}, + future::Future, + mem::{offset_of, ManuallyDrop}, + pin::Pin, + ptr::NonNull, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Weak, + }, + task::{Context, Poll}, +}; + +mod queue; +use queue::Queue; + +mod vtable; +use vtable::VTable; + +mod waker; + +use self::queue::NodeHeader; + +use std::ops::{Deref, DerefMut}; + +pub struct Defer { + value: ManuallyDrop, + f: Option, +} + +impl Defer { + pub fn new(value: T, func: F) -> Self { + Defer { + value: ManuallyDrop::new(value), + f: Some(func), + } + } + + pub fn take(mut self) -> T { + self.f = None; + unsafe { ManuallyDrop::take(&mut self.value) } + } +} + +impl Deref for Defer { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +impl DerefMut for Defer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } +} + +impl Drop for Defer +where + F: FnOnce(&mut T), +{ + fn drop(&mut self) { + if let Some(x) = self.f.take() { + (x)(&mut *self.value); + unsafe { ManuallyDrop::drop(&mut self.value) } + } + } +} + +#[repr(C)] +struct Task { + head: NodeHeader, + body: TaskBody, + future: UnsafeCell, +} + +// Seperate struct to not have everything be repr(C) +struct TaskBody { + queue: Weak, + vtable: &'static VTable, + // The double linked list of tasks. + next: Cell>>>, + prev: Cell>>>, + // wether the task is currently in the queue to be re-polled. + queued: AtomicBool, + done: Cell, +} + +pub struct Schedular { + len: Cell, + should_poll: Arc, + all_next: Cell>>>, + all_prev: Cell>>>, +} + +impl Schedular { + pub fn new() -> Self { + let queue = Arc::new(Queue::new()); + unsafe { + Pin::new_unchecked(&*queue).init(); + } + Schedular { + len: Cell::new(0), + should_poll: queue, + all_prev: Cell::new(None), + all_next: Cell::new(None), + } + } + + pub fn is_empty(&self) -> bool { + self.all_next.get().is_none() + } + + /// # Safety + /// This function erases any lifetime associated with the future. + /// Caller must ensure that either the future completes or is dropped before the lifetime + pub unsafe fn push(&self, f: F) + where + F: Future, + { + let queue = Arc::downgrade(&self.should_poll); + + debug_assert_eq!(offset_of!(Task, future), offset_of!(Task, future)); + + let task = Arc::new(Task { + head: NodeHeader::new(), + body: TaskBody { + queue, + vtable: VTable::get::(), + next: Cell::new(None), + prev: Cell::new(None), + queued: AtomicBool::new(true), + done: Cell::new(false), + }, + future: UnsafeCell::new(ManuallyDrop::new(f)), + }); + + // One count for the all list and one for the should_poll list. + let task = NonNull::new_unchecked(Arc::into_raw(task) as *mut Task).cast::>(); + Arc::increment_strong_count(task.as_ptr()); + + self.push_task_to_all(task); + + Pin::new_unchecked(&*self.should_poll).push(task.cast()); + self.len.set(self.len.get() + 1); + } + + unsafe fn push_task_to_all(&self, task: NonNull>) { + task.as_ref().body.next.set(self.all_next.get()); + + if let Some(x) = self.all_next.get() { + x.as_ref().body.prev.set(Some(task)); + } + self.all_next.set(Some(task)); + if self.all_prev.get().is_none() { + self.all_prev.set(Some(task)); + } + } + + unsafe fn pop_task_all(&self, task: NonNull>) { + task.as_ref().body.queued.store(true, Ordering::Release); + task.as_ref().body.done.set(true); + + // detach the task from the all list + if let Some(next) = task.as_ref().body.next.get() { + next.as_ref().body.prev.set(task.as_ref().body.prev.get()) + } else { + self.all_prev.set(task.as_ref().body.prev.get()); + } + if let Some(prev) = task.as_ref().body.prev.get() { + prev.as_ref().body.next.set(task.as_ref().body.next.get()) + } else { + self.all_next.set(task.as_ref().body.next.get()); + } + + // drop the ownership of the all list, + // Task is now dropped or only owned by wakers or + Self::drop_task(task); + self.len.set(self.len.get() - 1); + } + + unsafe fn drop_task(ptr: NonNull>) { + (ptr.as_ref().body.vtable.task_drop)(ptr) + } + + unsafe fn drive_task(ptr: NonNull>, ctx: &mut Context) -> Poll<()> { + (ptr.as_ref().body.vtable.task_drive)(ptr, ctx) + } + + pub unsafe fn poll(&self, cx: &mut Context) -> Poll { + // During polling ownership is upheld by making sure arc counts are properly tranfered. + // Both ques, should_poll and all, have ownership of the arc count. + // Whenever a task is pushed onto the should_poll queue ownership is transfered. + // During task pusing into the schedular ownership of the count was transfered into the all + // list. + + if self.is_empty() { + // No tasks, nothing to be done. + return Poll::Ready(false); + } + + self.should_poll.waker().register(cx.waker()); + + let mut iteration = 0; + let mut yielded = 0; + let mut pending = false; + + loop { + // Popped a task, ownership taken from the que + let cur = match Pin::new_unchecked(&*self.should_poll).pop() { + queue::Pop::Empty => { + if pending { + return Poll::Pending; + } else { + return Poll::Ready(iteration > 0); + } + } + queue::Pop::Value(x) => x, + queue::Pop::Inconsistant => { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + }; + + let cur = cur.cast::>(); + + if cur.as_ref().body.done.get() { + // Task was already done, we con drop the ownership we got from the que. + Self::drop_task(cur); + continue; + } + + let prev = cur.as_ref().body.queued.swap(false, Ordering::AcqRel); + assert!(prev); + + // ownership transfered into the waker, which won't drop until the iteration completes. + let waker = waker::get(cur); + // if drive_task panics we still want to remove the task from the list. + // So handle it with a drop + let remove = Defer::new(self, |this| (*this).pop_task_all(cur)); + let mut ctx = Context::from_waker(&waker); + + iteration += 1; + + match Self::drive_task(cur, &mut ctx) { + Poll::Ready(_) => { + // Nothing todo the defer will remove the task from the list. + } + Poll::Pending => { + // don't remove task from the list. + remove.take(); + pending = true; + yielded += cur.as_ref().body.queued.load(Ordering::Relaxed) as usize; + if yielded > 2 || iteration > self.len.get() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + } + } + } + } + + pub fn clear(&self) { + // Clear all pending futures from the all list + let mut cur = self.all_next.get(); + while let Some(c) = cur { + unsafe { + cur = c.as_ref().body.next.get(); + self.pop_task_all(c) + } + } + + loop { + let cur = match unsafe { Pin::new_unchecked(&*self.should_poll).pop() } { + queue::Pop::Empty => break, + queue::Pop::Value(x) => x, + queue::Pop::Inconsistant => { + std::thread::yield_now(); + continue; + } + }; + + unsafe { Self::drop_task(cur.cast()) }; + } + } +} + +impl Drop for Schedular { + fn drop(&mut self) { + self.clear() + } +} diff --git a/core/src/runtime/schedular/queue.rs b/core/src/runtime/schedular/queue.rs new file mode 100644 index 00000000..9b02063e --- /dev/null +++ b/core/src/runtime/schedular/queue.rs @@ -0,0 +1,112 @@ +use std::{ + cell::Cell, + pin::Pin, + ptr::{self, NonNull}, + sync::atomic::{AtomicPtr, Ordering}, +}; + +use atomic_waker::AtomicWaker; + +pub struct NodeHeader { + next: AtomicPtr, +} + +impl NodeHeader { + pub fn new() -> NodeHeader { + NodeHeader { + next: AtomicPtr::new(ptr::null_mut()), + } + } +} + +pub struct Queue { + waker: AtomicWaker, + head: AtomicPtr, + tail: Cell<*const NodeHeader>, + stub: NodeHeader, +} + +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +#[derive(Debug)] +pub enum Pop { + Empty, + Value(NonNull), + Inconsistant, +} + +/// Intrusive MPSC queue from 1024cores blog. +/// Similar to the one used int the FuturesUnordered implementation +impl Queue { + pub fn new() -> Self { + Queue { + waker: AtomicWaker::new(), + head: AtomicPtr::new(ptr::null_mut()), + tail: Cell::new(ptr::null_mut()), + stub: NodeHeader { + next: AtomicPtr::new(ptr::null_mut()), + }, + } + } + + pub fn waker(&self) -> &AtomicWaker { + &self.waker + } + + pub unsafe fn init(self: Pin<&Self>) { + let ptr = &self.stub as *const _ as *mut _; + self.head.store(ptr, Ordering::Release); + self.tail.set(ptr); + } + + /// # Safety + /// - node must be a valid pointer + /// - Queue must have been properly initialized. + pub unsafe fn push(self: Pin<&Self>, node: NonNull) { + node.as_ref().next.store(ptr::null_mut(), Ordering::Release); + + let prev = self.get_ref().head.swap(node.as_ptr(), Ordering::AcqRel); + + (*prev).next.store(node.as_ptr(), Ordering::Release); + } + + /// # Safety + /// - Queue must have been properly initialized. + /// - Can only be called from a single thread. + pub unsafe fn pop(self: Pin<&Self>) -> Pop { + let mut tail = self.tail.get(); + let mut next = (*tail).next.load(Ordering::Acquire); + + if std::ptr::eq(tail, &self.get_ref().stub) { + if next.is_null() { + return Pop::Empty; + } + + self.tail.set(next); + tail = next; + next = (*next).next.load(std::sync::atomic::Ordering::Acquire); + } + + if !next.is_null() { + self.tail.set(next); + return Pop::Value(NonNull::new_unchecked(tail as *mut NodeHeader)); + } + + let head = self.head.load(Ordering::Acquire); + if !std::ptr::eq(head, tail) { + return Pop::Inconsistant; + } + + self.push(NonNull::from(&self.get_ref().stub)); + + next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + self.tail.set(next); + return Pop::Value(NonNull::new_unchecked(tail as *mut NodeHeader)); + } + + Pop::Empty + } +} diff --git a/core/src/runtime/schedular/vtable.rs b/core/src/runtime/schedular/vtable.rs new file mode 100644 index 00000000..72b10dbe --- /dev/null +++ b/core/src/runtime/schedular/vtable.rs @@ -0,0 +1,41 @@ +use std::{ + future::Future, + pin::Pin, + ptr::NonNull, + sync::Arc, + task::{Context, Poll}, +}; + +use super::Task; + +#[derive(Debug, Clone)] +pub(crate) struct VTable { + pub(crate) task_drive: unsafe fn(NonNull>, cx: &mut Context) -> Poll<()>, + pub(crate) task_drop: unsafe fn(NonNull>), +} + +impl VTable { + pub const fn get>() -> &'static VTable { + trait HasVTable { + const V_TABLE: VTable; + } + + impl> HasVTable for F { + const V_TABLE: VTable = VTable { + task_drop: VTable::drop::, + task_drive: VTable::drive::, + }; + } + + &::V_TABLE + } + + unsafe fn drop>(ptr: NonNull>) { + Arc::decrement_strong_count(ptr.cast::>().as_ptr()) + } + + unsafe fn drive>(ptr: NonNull>, cx: &mut Context) -> Poll<()> { + let ptr = ptr.cast::>(); + Pin::new_unchecked(&mut (*ptr.as_ref().future.get())).poll(cx) + } +} diff --git a/core/src/runtime/schedular/waker.rs b/core/src/runtime/schedular/waker.rs new file mode 100644 index 00000000..1ed9c731 --- /dev/null +++ b/core/src/runtime/schedular/waker.rs @@ -0,0 +1,62 @@ +use std::{ + pin::Pin, + ptr::NonNull, + sync::{atomic::Ordering, Arc}, + task::{RawWaker, RawWakerVTable, Waker}, +}; + +use super::Task; + +unsafe fn schedular_clone(ptr: *const ()) -> RawWaker { + Arc::increment_strong_count(ptr.cast::>()); + RawWaker::new(ptr.cast(), &SCHEDULAR_WAKER_V_TABLE) +} + +unsafe fn schedular_wake(ptr: *const ()) { + let task = NonNull::new_unchecked(ptr as *mut ()).cast::>(); + + if task + .as_ref() + .body + .queued + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + // Already awoken, skip! + schedular_drop(ptr); + return; + } + + // retrieve the queue, if already dropped, just return as we don't need to awake anything. + let Some(queue) = task.as_ref().body.queue.upgrade() else { + schedular_drop(ptr); + return; + }; + + // push to the que + Pin::new_unchecked(&*queue).push(task.cast()); + + // wake up the schedular. + queue.waker().wake() +} + +unsafe fn schedular_wake_ref(ptr: *const ()) { + Arc::increment_strong_count(ptr.cast::>()); + schedular_wake(ptr) +} + +unsafe fn schedular_drop(ptr: *const ()) { + let ptr = ptr.cast::>(); + ((*ptr).body.vtable.task_drop)(NonNull::new_unchecked(ptr as *mut _)) +} + +static SCHEDULAR_WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new( + schedular_clone, + schedular_wake, + schedular_wake_ref, + schedular_drop, +); + +pub unsafe fn get(ptr: NonNull>) -> Waker { + unsafe { Waker::from_raw(RawWaker::new(ptr.as_ptr().cast(), &SCHEDULAR_WAKER_V_TABLE)) } +} diff --git a/core/src/runtime/spawner.rs b/core/src/runtime/spawner.rs index ae75a17f..c445b227 100644 --- a/core/src/runtime/spawner.rs +++ b/core/src/runtime/spawner.rs @@ -1,41 +1,37 @@ use std::{ - cell::RefCell, future::Future, pin::{pin, Pin}, - task::ready, - task::{Poll, Waker}, + task::{ready, Poll, Waker}, }; use async_lock::futures::LockArc; use crate::AsyncRuntime; -use super::{AsyncWeakRuntime, InnerRuntime}; - -type FuturesVec = RefCell>>; +use super::{schedular::Schedular, AsyncWeakRuntime, InnerRuntime}; /// A structure to hold futures spawned inside the runtime. /// /// TODO: change future lookup in poll from O(n) to O(1). -pub struct Spawner<'js> { - futures: FuturesVec + 'js>>>, +pub struct Spawner { + schedular: Schedular, wakeup: Vec, } -impl<'js> Spawner<'js> { +impl Spawner { pub fn new() -> Self { Spawner { - futures: RefCell::new(Vec::new()), + schedular: Schedular::new(), wakeup: Vec::new(), } } - pub fn push(&mut self, f: F) + pub unsafe fn push(&mut self, f: F) where - F: Future + 'js, + F: Future, { + unsafe { self.schedular.push(f) }; self.wakeup.drain(..).for_each(Waker::wake); - self.futures.borrow_mut().push(Some(Box::pin(f))) } pub fn listen(&mut self, wake: Waker) { @@ -43,51 +39,22 @@ impl<'js> Spawner<'js> { } // Drives the runtime futures forward, returns false if their where no futures - pub fn drive<'a>(&'a self) -> SpawnFuture<'a, 'js> { + pub fn drive<'a>(&'a self) -> SpawnFuture<'a> { SpawnFuture(self) } pub fn is_empty(&mut self) -> bool { - self.futures.borrow().is_empty() - } -} - -impl Drop for Spawner<'_> { - fn drop(&mut self) { - self.wakeup.drain(..).for_each(Waker::wake) + self.schedular.is_empty() } } -pub struct SpawnFuture<'a, 'js>(&'a Spawner<'js>); +pub struct SpawnFuture<'a>(&'a Spawner); -impl<'a, 'js> Future for SpawnFuture<'a, 'js> { +impl<'a> Future for SpawnFuture<'a> { type Output = bool; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - if self.0.futures.borrow().is_empty() { - return Poll::Ready(false); - } - - let mut i = 0; - let mut did_complete = false; - while i < self.0.futures.borrow().len() { - let mut borrow = self.0.futures.borrow_mut()[i].take().unwrap(); - if borrow.as_mut().poll(cx).is_pending() { - // put back. - self.0.futures.borrow_mut()[i] = Some(borrow); - } else { - did_complete = true; - } - i += 1; - } - - self.0.futures.borrow_mut().retain_mut(|f| f.is_some()); - - if did_complete { - Poll::Ready(true) - } else { - Poll::Pending - } + unsafe { self.0.schedular.poll(cx) } } }