diff --git a/Cargo.toml b/Cargo.toml index 776ce290..16f2a0bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ repository = "https://github.com/compio-rs/compio" [workspace.dependencies] compio-buf = { path = "./compio-buf", version = "0.2.0" } compio-driver = { path = "./compio-driver", version = "0.2.0", default-features = false } -compio-runtime = { path = "./compio-runtime", version = "0.1.1" } +compio-runtime = { path = "./compio-runtime", version = "0.2.0" } compio-macros = { path = "./compio-macros", version = "0.1.1" } compio-fs = { path = "./compio-fs", version = "0.2.0" } compio-io = { path = "./compio-io", version = "0.1.0" } @@ -40,6 +40,7 @@ compio-tls = { path = "./compio-tls", version = "0.1.0" } cfg-if = "1.0.0" +criterion = "0.5.1" crossbeam-channel = "0.5.8" crossbeam-queue = "0.3.8" futures-channel = "0.3.29" diff --git a/compio-dispatcher/src/lib.rs b/compio-dispatcher/src/lib.rs index 408be77f..d325c2ed 100644 --- a/compio-dispatcher/src/lib.rs +++ b/compio-dispatcher/src/lib.rs @@ -12,7 +12,10 @@ use std::{ }; use compio_driver::{AsyncifyPool, ProactorBuilder}; -use compio_runtime::event::{Event, EventHandle}; +use compio_runtime::{ + event::{Event, EventHandle}, + Runtime, +}; use crossbeam_channel::{unbounded, Sender}; use futures_util::{future::LocalBoxFuture, FutureExt}; @@ -51,14 +54,14 @@ impl Dispatcher { }; thread_builder.spawn(move || { - let succeeded = compio_runtime::config_proactor(proactor_builder); - debug_assert!( - succeeded, - "the runtime should not be created before proactor builder set" - ); + let runtime = Runtime::builder() + .with_proactor(proactor_builder) + .build() + .expect("cannot create compio runtime"); + let _guard = runtime.enter(); while let Ok(f) = receiver.recv() { - *f.result.lock().unwrap() = Some(std::panic::catch_unwind(move || { - compio_runtime::block_on((f.func)()); + *f.result.lock().unwrap() = Some(std::panic::catch_unwind(|| { + Runtime::current().block_on((f.func)()); })); f.handle.notify().ok(); } diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index ae68d431..0737c585 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -7,7 +7,7 @@ use { compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut}, compio_driver::op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt}, compio_io::{AsyncReadAt, AsyncWriteAt}, - compio_runtime::{submit, Attachable, Attacher}, + compio_runtime::{Attachable, Attacher, Runtime}, std::{future::Future, mem::ManuallyDrop, path::Path}, }; #[cfg(all(feature = "runtime", unix))] @@ -64,7 +64,7 @@ impl File { let this = ManuallyDrop::new(self); async move { let op = CloseFile::new(this.as_raw_fd()); - submit(op).await.0?; + Runtime::current().submit(op).await.0?; Ok(()) } } @@ -91,7 +91,7 @@ impl File { async fn sync_impl(&self, datasync: bool) -> io::Result<()> { self.attach()?; let op = Sync::new(self.as_raw_fd(), datasync); - submit(op).await.0?; + Runtime::current().submit(op).await.0?; Ok(()) } @@ -126,7 +126,11 @@ impl AsyncReadAt for File { async fn read_at(&self, buffer: T, pos: u64) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = ReadAt::new(self.as_raw_fd(), pos, buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } #[cfg(unix)] @@ -137,7 +141,11 @@ impl AsyncReadAt for File { ) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = ReadVectoredAt::new(self.as_raw_fd(), pos, buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } } @@ -164,7 +172,7 @@ impl AsyncWriteAt for &File { async fn write_at(&mut self, buffer: T, pos: u64) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = WriteAt::new(self.as_raw_fd(), pos, buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } #[cfg(unix)] @@ -175,7 +183,7 @@ impl AsyncWriteAt for &File { ) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = WriteVectoredAt::new(self.as_raw_fd(), pos, buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } } diff --git a/compio-fs/src/named_pipe.rs b/compio-fs/src/named_pipe.rs index cc0c83be..180e3b29 100644 --- a/compio-fs/src/named_pipe.rs +++ b/compio-fs/src/named_pipe.rs @@ -29,7 +29,7 @@ use { compio_buf::{BufResult, IoBuf, IoBufMut}, compio_driver::op::ConnectNamedPipe, compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt}, - compio_runtime::{impl_attachable, submit, Attachable}, + compio_runtime::{impl_attachable, Attachable, Runtime}, }; use crate::File; @@ -68,30 +68,27 @@ use crate::File; /// .create(PIPE_NAME)?; /// /// // Spawn the server loop. -/// let server = compio_runtime::block_on(async move { -/// loop { -/// // Wait for a client to connect. -/// let connected = server.connect().await?; +/// # compio_runtime::Runtime::new().unwrap().block_on(async move { +/// loop { +/// // Wait for a client to connect. +/// let connected = server.connect().await?; /// -/// // Construct the next server to be connected before sending the one -/// // we already have of onto a task. This ensures that the server -/// // isn't closed (after it's done in the task) before a new one is -/// // available. Otherwise the client might error with -/// // `io::ErrorKind::NotFound`. -/// server = ServerOptions::new().create(PIPE_NAME)?; +/// // Construct the next server to be connected before sending the one +/// // we already have of onto a task. This ensures that the server +/// // isn't closed (after it's done in the task) before a new one is +/// // available. Otherwise the client might error with +/// // `io::ErrorKind::NotFound`. +/// server = ServerOptions::new().create(PIPE_NAME)?; /// -/// let client = compio_runtime::spawn(async move { -/// // use the connected client -/// # Ok::<_, std::io::Error>(()) -/// }); -/// # if true { break } // needed for type inference to work -/// } -/// -/// Ok::<_, io::Error>(()) -/// }); -/// -/// // do something else not server related here -/// # Ok(()) } +/// let client = compio_runtime::spawn(async move { +/// // use the connected client +/// # Ok::<_, std::io::Error>(()) +/// }); +/// # if true { break } // needed for type inference to work +/// } +/// # Ok::<_, io::Error>(()) +/// # }) +/// # } /// ``` /// /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes @@ -118,7 +115,7 @@ impl NamedPipeServer { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-server-info"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new() /// .pipe_mode(PipeMode::Message) /// .max_instances(5) @@ -149,7 +146,7 @@ impl NamedPipeServer { /// /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let pipe = ServerOptions::new().create(PIPE_NAME)?; /// /// // Wait for a client to connect. @@ -162,7 +159,7 @@ impl NamedPipeServer { pub async fn connect(&self) -> io::Result<()> { self.attach()?; let op = ConnectNamedPipe::new(self.as_raw_fd()); - submit(op).await.0?; + Runtime::current().submit(op).await.0?; Ok(()) } @@ -176,7 +173,7 @@ impl NamedPipeServer { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-disconnect"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new().create(PIPE_NAME).unwrap(); /// /// let mut client = ClientOptions::new().open(PIPE_NAME).await.unwrap(); @@ -280,7 +277,7 @@ impl_attachable!(NamedPipeServer, handle); /// /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client"; /// -/// # compio_runtime::block_on(async move { +/// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let client = loop { /// match ClientOptions::new().open(PIPE_NAME).await { /// Ok(client) => break client, @@ -320,7 +317,7 @@ impl NamedPipeClient { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-info"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let client = ClientOptions::new().open(PIPE_NAME).await?; /// /// let client_info = client.info()?; @@ -481,7 +478,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err1"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let _server = ServerOptions::new() /// .access_inbound(false) /// .create(PIPE_NAME) @@ -504,7 +501,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err2"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new() /// .access_inbound(false) /// .create(PIPE_NAME) @@ -537,7 +534,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut server = ServerOptions::new() /// .access_inbound(false) /// .create(PIPE_NAME) @@ -588,7 +585,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err1"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new() /// .access_outbound(false) /// .create(PIPE_NAME) @@ -611,7 +608,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err2"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new() /// .access_outbound(false) /// .create(PIPE_NAME) @@ -643,7 +640,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut server = ServerOptions::new() /// .access_outbound(false) /// .create(PIPE_NAME) @@ -702,7 +699,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance-error"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server1 = ServerOptions::new() /// .first_pipe_instance(true) /// .create(PIPE_NAME) @@ -727,7 +724,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut builder = ServerOptions::new(); /// builder.first_pipe_instance(true); /// @@ -770,7 +767,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut pipe_template = ServerOptions::new(); /// pipe_template.write_dac(true); /// let pipe = pipe_template.create(PIPE_NAME).unwrap(); @@ -807,7 +804,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut pipe_template = ServerOptions::new(); /// pipe_template.write_dac(false); /// let pipe = pipe_template.create(PIPE_NAME).unwrap(); @@ -890,7 +887,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-max-instances"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let mut server = ServerOptions::new(); /// server.max_instances(2); /// @@ -971,7 +968,7 @@ impl ServerOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-create"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let server = ServerOptions::new().create(PIPE_NAME).unwrap(); /// # }) /// ``` @@ -1059,7 +1056,7 @@ impl ClientOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-new"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// // Server must be created in order for the client creation to succeed. /// let server = ServerOptions::new().create(PIPE_NAME).unwrap(); /// let client = ClientOptions::new().open(PIPE_NAME).await.unwrap(); @@ -1164,7 +1161,7 @@ impl ClientOptions { /// /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe"; /// - /// # compio_runtime::block_on(async move { + /// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let client = loop { /// match ClientOptions::new().open(PIPE_NAME).await { /// Ok(client) => break client, diff --git a/compio-fs/src/open_options/mod.rs b/compio-fs/src/open_options/mod.rs index ab128022..829f76cb 100644 --- a/compio-fs/src/open_options/mod.rs +++ b/compio-fs/src/open_options/mod.rs @@ -33,7 +33,7 @@ use crate::File; /// ```no_run /// use compio_fs::OpenOptions; /// -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let file = OpenOptions::new().read(true).open("foo.txt").await.unwrap(); /// # }); /// ``` @@ -44,7 +44,7 @@ use crate::File; /// ```no_run /// use compio_fs::OpenOptions; /// -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let file = OpenOptions::new() /// .read(true) /// .write(true) diff --git a/compio-fs/src/open_options/unix.rs b/compio-fs/src/open_options/unix.rs index d96432fb..99c7d677 100644 --- a/compio-fs/src/open_options/unix.rs +++ b/compio-fs/src/open_options/unix.rs @@ -1,7 +1,7 @@ use std::{ffi::CString, io, os::unix::prelude::OsStrExt, path::Path}; use compio_driver::{op::OpenFile, FromRawFd, RawFd}; -use compio_runtime::submit; +use compio_runtime::Runtime; use crate::File; @@ -96,7 +96,7 @@ impl OpenOptions { ) })?; let op = OpenFile::new(p, flags, self.mode); - let fd = submit(op).await.0? as RawFd; + let fd = Runtime::current().submit(op).await.0? as RawFd; Ok(unsafe { File::from_raw_fd(fd) }) } } diff --git a/compio-fs/src/open_options/windows.rs b/compio-fs/src/open_options/windows.rs index 8eb95789..a8b98cd8 100644 --- a/compio-fs/src/open_options/windows.rs +++ b/compio-fs/src/open_options/windows.rs @@ -1,7 +1,7 @@ use std::{io, path::Path, ptr::null}; use compio_driver::{op::OpenFile, FromRawFd, RawFd}; -use compio_runtime::submit; +use compio_runtime::Runtime; use widestring::U16CString; use windows_sys::Win32::{ Foundation::{ERROR_INVALID_PARAMETER, GENERIC_READ, GENERIC_WRITE}, @@ -145,7 +145,7 @@ impl OpenOptions { self.get_creation_mode()?, self.get_flags_and_attributes(), ); - let fd = submit(op).await.0? as RawFd; + let fd = Runtime::current().submit(op).await.0? as RawFd; Ok(unsafe { File::from_raw_fd(fd) }) } } diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 12a9fb92..875735a8 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -8,7 +8,7 @@ use { compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}, compio_driver::op::{BufResultExt, Recv, RecvVectored, Send, SendVectored}, compio_io::{AsyncRead, AsyncWrite}, - compio_runtime::{impl_attachable, submit, Attachable}, + compio_runtime::{impl_attachable, Attachable, Runtime}, std::{future::Future, path::Path}, }; @@ -20,7 +20,7 @@ use crate::File; /// use compio_fs::pipe::anonymous; /// use compio_io::{AsyncReadExt, AsyncWriteExt}; /// -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let (mut rx, mut tx) = anonymous().unwrap(); /// /// tx.write_all("Hello world!").await.unwrap(); @@ -114,7 +114,7 @@ impl OpenOptions { /// ``` /// use compio_fs::pipe; /// - /// # compio_runtime::block_on(async { + /// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let tx = pipe::OpenOptions::new() /// .read_write(true) /// .open_sender("path/to/a/fifo") @@ -131,7 +131,7 @@ impl OpenOptions { /// ``` /// use compio_fs::pipe; /// - /// # compio_runtime::block_on(async { + /// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let tx = pipe::OpenOptions::new() /// .read_write(true) /// .open_receiver("path/to/a/fifo") @@ -369,13 +369,13 @@ impl AsyncWrite for &Sender { async fn write(&mut self, buffer: T) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = Send::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } async fn write_vectored(&mut self, buffer: T) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = SendVectored::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } #[inline] @@ -498,13 +498,21 @@ impl AsyncRead for &Receiver { async fn read(&mut self, buffer: B) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = Recv::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } async fn read_vectored(&mut self, buffer: V) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = RecvVectored::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } } diff --git a/compio-io/src/lib.rs b/compio-io/src/lib.rs index f0139d09..b5f2225e 100644 --- a/compio-io/src/lib.rs +++ b/compio-io/src/lib.rs @@ -34,7 +34,7 @@ //! ``` //! use compio_buf::BufResult; //! use compio_io::AsyncRead; -//! # compio_runtime::block_on(async { +//! # compio_runtime::Runtime::new().unwrap().block_on(async { //! //! let mut reader = "Hello, world!".as_bytes(); //! let (res, buf) = reader.read(Vec::with_capacity(20)).await.unwrap(); @@ -55,7 +55,7 @@ //! //! use compio_buf::BufResult; //! use compio_io::AsyncWrite; -//! # compio_runtime::block_on(async { +//! # compio_runtime::Runtime::new().unwrap().block_on(async { //! //! let mut writer = Cursor::new([0; 6]); //! writer.set_position(2); @@ -72,7 +72,7 @@ //! ``` //! use compio_buf::BufResult; //! use compio_io::AsyncWrite; -//! # compio_runtime::block_on(async { +//! # compio_runtime::Runtime::new().unwrap().block_on(async { //! //! let mut writer = vec![1, 2, 3]; //! let (_, buf) = writer.write(vec![3, 2, 1]).await.unwrap(); diff --git a/compio-io/src/util/null.rs b/compio-io/src/util/null.rs index 4ab6783b..a708e59e 100644 --- a/compio-io/src/util/null.rs +++ b/compio-io/src/util/null.rs @@ -53,7 +53,7 @@ impl AsyncWrite for Null { /// ``` /// use compio_io::{null, AsyncRead, AsyncWrite}; /// -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let mut buf = Vec::with_capacity(10); /// let mut null = null(); /// diff --git a/compio-io/src/util/repeat.rs b/compio-io/src/util/repeat.rs index d5b6b3b8..2e672c68 100644 --- a/compio-io/src/util/repeat.rs +++ b/compio-io/src/util/repeat.rs @@ -12,7 +12,7 @@ use crate::{AsyncBufRead, AsyncRead, IoResult}; /// # Examples /// /// ```rust -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// use compio_io::{self, AsyncRead, AsyncReadExt}; /// /// let (len, buffer) = compio_io::repeat(42) @@ -57,7 +57,7 @@ impl AsyncBufRead for Repeat { /// # Examples /// /// ```rust -/// # compio_runtime::block_on(async { +/// # compio_runtime::Runtime::new().unwrap().block_on(async { /// use compio_io::{self, AsyncRead, AsyncReadExt}; /// /// let (len, buffer) = compio_io::repeat(42) diff --git a/compio-macros/src/main_fn.rs b/compio-macros/src/main_fn.rs index 387cb8c9..8797ca04 100644 --- a/compio-macros/src/main_fn.rs +++ b/compio-macros/src/main_fn.rs @@ -54,7 +54,7 @@ impl ToTokens for CompioMain { let block = &self.0.body; let runtime_mod = self.0.crate_name().unwrap_or_else(retrieve_runtime_mod); tokens.append_all(quote!({ - #runtime_mod::block_on(async move #block) + #runtime_mod::Runtime::new().expect("cannot create runtime").block_on(async move #block) })); } } diff --git a/compio-macros/src/test_fn.rs b/compio-macros/src/test_fn.rs index 534eb36f..b8352c7a 100644 --- a/compio-macros/src/test_fn.rs +++ b/compio-macros/src/test_fn.rs @@ -48,7 +48,7 @@ impl ToTokens for CompioTest { let block = &self.0.body; let runtime_mod = self.0.crate_name().unwrap_or_else(retrieve_runtime_mod); tokens.append_all(quote!({ - #runtime_mod::block_on(async move #block) + #runtime_mod::Runtime::new().expect("cannot create runtime").block_on(async move #block) })); } } diff --git a/compio-net/src/socket.rs b/compio-net/src/socket.rs index f2a1f403..2b77e4f8 100644 --- a/compio-net/src/socket.rs +++ b/compio-net/src/socket.rs @@ -9,7 +9,7 @@ use { Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromVectored, RecvResultExt, RecvVectored, Send, SendTo, SendToVectored, SendVectored, ShutdownSocket, }, - compio_runtime::{submit, Attachable, Attacher}, + compio_runtime::{Attachable, Attacher, Runtime}, std::{future::Future, mem::ManuallyDrop}, }; @@ -80,7 +80,7 @@ impl Socket { pub async fn connect_async(&self, addr: &SockAddr) -> io::Result<()> { self.attach()?; let op = Connect::new(self.as_raw_fd(), addr.clone()); - let BufResult(res, _op) = submit(op).await; + let BufResult(res, _op) = Runtime::current().submit(op).await; #[cfg(windows)] { res?; @@ -97,7 +97,7 @@ impl Socket { pub async fn accept(&self) -> io::Result<(Self, SockAddr)> { self.attach()?; let op = Accept::new(self.as_raw_fd()); - let BufResult(res, op) = submit(op).await; + let BufResult(res, op) = Runtime::current().submit(op).await; let accept_sock = unsafe { Socket2::from_raw_fd(res? as _) }; if cfg!(all( unix, @@ -120,7 +120,7 @@ impl Socket { self.socket.protocol()?, )?; let op = Accept::new(self.as_raw_fd(), accept_sock.as_raw_fd() as _); - let BufResult(res, op) = submit(op).await; + let BufResult(res, op) = Runtime::current().submit(op).await; res?; op.update_context()?; let addr = op.into_addr()?; @@ -135,7 +135,7 @@ impl Socket { let this = ManuallyDrop::new(self); async move { let op = CloseSocket::new(this.as_raw_fd()); - submit(op).await.0?; + Runtime::current().submit(op).await.0?; Ok(()) } } @@ -144,7 +144,7 @@ impl Socket { pub async fn shutdown(&self) -> io::Result<()> { self.attach()?; let op = ShutdownSocket::new(self.as_raw_fd(), std::net::Shutdown::Write); - submit(op).await.0?; + Runtime::current().submit(op).await.0?; Ok(()) } @@ -152,35 +152,48 @@ impl Socket { pub async fn recv(&self, buffer: B) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = Recv::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } #[cfg(feature = "runtime")] pub async fn recv_vectored(&self, buffer: V) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = RecvVectored::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_advanced() } #[cfg(feature = "runtime")] pub async fn send(&self, buffer: T) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = Send::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } #[cfg(feature = "runtime")] pub async fn send_vectored(&self, buffer: T) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = SendVectored::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } #[cfg(feature = "runtime")] pub async fn recv_from(&self, buffer: T) -> BufResult<(usize, SockAddr), T> { let ((), buffer) = buf_try!(self.attach(), buffer); let op = RecvFrom::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_addr().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_addr() + .map_advanced() } #[cfg(feature = "runtime")] @@ -190,14 +203,19 @@ impl Socket { ) -> BufResult<(usize, SockAddr), T> { let ((), buffer) = buf_try!(self.attach(), buffer); let op = RecvFromVectored::new(self.as_raw_fd(), buffer); - submit(op).await.into_inner().map_addr().map_advanced() + Runtime::current() + .submit(op) + .await + .into_inner() + .map_addr() + .map_advanced() } #[cfg(feature = "runtime")] pub async fn send_to(&self, buffer: T, addr: &SockAddr) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = SendTo::new(self.as_raw_fd(), buffer, addr.clone()); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } #[cfg(feature = "runtime")] @@ -208,7 +226,7 @@ impl Socket { ) -> BufResult { let ((), buffer) = buf_try!(self.attach(), buffer); let op = SendToVectored::new(self.as_raw_fd(), buffer, addr.clone()); - submit(op).await.into_inner() + Runtime::current().submit(op).await.into_inner() } } diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index fa124e66..1326954c 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -27,23 +27,23 @@ use crate::Socket; /// use compio_net::{TcpListener, TcpStream}; /// use socket2::SockAddr; /// +/// # compio_runtime::Runtime::new().unwrap().block_on(async move { /// let addr = "127.0.0.1:2345".parse::().unwrap(); /// -/// compio_runtime::block_on(async move { -/// let listener = TcpListener::bind(&addr).await.unwrap(); +/// let listener = TcpListener::bind(&addr).await.unwrap(); /// -/// let tx_fut = TcpStream::connect(&addr); +/// let tx_fut = TcpStream::connect(&addr); /// -/// let rx_fut = listener.accept(); +/// let rx_fut = listener.accept(); /// -/// let (mut tx, (mut rx, _)) = futures_util::try_join!(tx_fut, rx_fut).unwrap(); +/// let (mut tx, (mut rx, _)) = futures_util::try_join!(tx_fut, rx_fut).unwrap(); /// -/// tx.write_all("test").await.0.unwrap(); +/// tx.write_all("test").await.0.unwrap(); /// -/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap(); +/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap(); /// -/// assert_eq!(buf, b"test"); -/// }); +/// assert_eq!(buf, b"test"); +/// # }); /// ``` #[derive(Debug)] pub struct TcpListener { @@ -109,7 +109,7 @@ impl TcpListener { /// use compio_net::TcpListener; /// use socket2::SockAddr; /// - /// # compio_runtime::block_on(async { + /// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); /// /// let addr = listener.local_addr().expect("Couldn't get local address"); @@ -144,13 +144,13 @@ impl_attachable!(TcpListener, inner); /// use compio_io::AsyncWrite; /// use compio_net::TcpStream; /// -/// compio_runtime::block_on(async { -/// // Connect to a peer -/// let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// // Connect to a peer +/// let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); /// -/// // Write some data. -/// stream.write("hello world!").await.unwrap(); -/// }) +/// // Write some data. +/// stream.write("hello world!").await.unwrap(); +/// # }) /// ``` #[derive(Debug)] pub struct TcpStream { diff --git a/compio-net/src/udp.rs b/compio-net/src/udp.rs index efa7af3d..7b119d73 100644 --- a/compio-net/src/udp.rs +++ b/compio-net/src/udp.rs @@ -34,31 +34,31 @@ use crate::Socket; /// /// use compio_net::UdpSocket; /// -/// compio_runtime::block_on(async { -/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); -/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); +/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); /// -/// // bind sockets -/// let mut socket = UdpSocket::bind(first_addr).await.unwrap(); -/// let first_addr = socket.local_addr().unwrap(); -/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap(); -/// let second_addr = other_socket.local_addr().unwrap(); +/// // bind sockets +/// let mut socket = UdpSocket::bind(first_addr).await.unwrap(); +/// let first_addr = socket.local_addr().unwrap(); +/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap(); +/// let second_addr = other_socket.local_addr().unwrap(); /// -/// // connect sockets -/// socket.connect(second_addr).await.unwrap(); -/// other_socket.connect(first_addr).await.unwrap(); +/// // connect sockets +/// socket.connect(second_addr).await.unwrap(); +/// other_socket.connect(first_addr).await.unwrap(); /// -/// let buf = Vec::with_capacity(12); +/// let buf = Vec::with_capacity(12); /// -/// // write data -/// socket.send("Hello world!").await.unwrap(); +/// // write data +/// socket.send("Hello world!").await.unwrap(); /// -/// // read data -/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap(); +/// // read data +/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap(); /// -/// assert_eq!(n_bytes, buf.len()); -/// assert_eq!(buf, b"Hello world!"); -/// }); +/// assert_eq!(n_bytes, buf.len()); +/// assert_eq!(buf, b"Hello world!"); +/// # }); /// ``` /// Send and receive packets without connecting: /// @@ -68,28 +68,28 @@ use crate::Socket; /// use compio_net::UdpSocket; /// use socket2::SockAddr; /// -/// compio_runtime::block_on(async { -/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); -/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); +/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); /// -/// // bind sockets -/// let mut socket = UdpSocket::bind(first_addr).await.unwrap(); -/// let first_addr = socket.local_addr().unwrap(); -/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap(); -/// let second_addr = other_socket.local_addr().unwrap(); +/// // bind sockets +/// let mut socket = UdpSocket::bind(first_addr).await.unwrap(); +/// let first_addr = socket.local_addr().unwrap(); +/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap(); +/// let second_addr = other_socket.local_addr().unwrap(); /// -/// let buf = Vec::with_capacity(32); +/// let buf = Vec::with_capacity(32); /// -/// // write data -/// socket.send_to("hello world", second_addr).await.unwrap(); +/// // write data +/// socket.send_to("hello world", second_addr).await.unwrap(); /// -/// // read data -/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); +/// // read data +/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); /// -/// assert_eq!(addr, first_addr); -/// assert_eq!(n_bytes, buf.len()); -/// assert_eq!(buf, b"hello world"); -/// }); +/// assert_eq!(addr, first_addr); +/// assert_eq!(n_bytes, buf.len()); +/// assert_eq!(buf, b"hello world"); +/// # }); /// ``` #[derive(Debug)] pub struct UdpSocket { @@ -150,7 +150,7 @@ impl UdpSocket { /// use compio_net::UdpSocket; /// use socket2::SockAddr; /// - /// # compio_runtime::block_on(async { + /// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let socket = UdpSocket::bind("127.0.0.1:34254") /// .await /// .expect("couldn't bind to address"); @@ -180,7 +180,7 @@ impl UdpSocket { /// use compio_net::UdpSocket; /// use socket2::SockAddr; /// - /// # compio_runtime::block_on(async { + /// # compio_runtime::Runtime::new().unwrap().block_on(async { /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); /// let sock = UdpSocket::bind(&addr).await.unwrap(); /// // the address the socket is bound to diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index f8f6268a..60b46be9 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -27,18 +27,18 @@ use crate::Socket; /// let dir = tempdir().unwrap(); /// let sock_file = dir.path().join("unix-server.sock"); /// -/// compio_runtime::block_on(async move { -/// let listener = UnixListener::bind(&sock_file).unwrap(); +/// # compio_runtime::Runtime::new().unwrap().block_on(async move { +/// let listener = UnixListener::bind(&sock_file).unwrap(); /// -/// let mut tx = UnixStream::connect(&sock_file).unwrap(); -/// let (mut rx, _) = listener.accept().await.unwrap(); +/// let mut tx = UnixStream::connect(&sock_file).unwrap(); +/// let (mut rx, _) = listener.accept().await.unwrap(); /// -/// tx.write_all("test").await.0.unwrap(); +/// tx.write_all("test").await.0.unwrap(); /// -/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap(); +/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap(); /// -/// assert_eq!(buf, b"test"); -/// }); +/// assert_eq!(buf, b"test"); +/// # }); /// ``` #[derive(Debug)] pub struct UnixListener { @@ -112,13 +112,13 @@ impl_attachable!(UnixListener, inner); /// use compio_io::AsyncWrite; /// use compio_net::UnixStream; /// -/// compio_runtime::block_on(async { -/// // Connect to a peer -/// let mut stream = UnixStream::connect("unix-server.sock").unwrap(); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// // Connect to a peer +/// let mut stream = UnixStream::connect("unix-server.sock").unwrap(); /// -/// // Write some data. -/// stream.write("hello world!").await.unwrap(); -/// }) +/// // Write some data. +/// stream.write("hello world!").await.unwrap(); +/// # }) /// ``` #[derive(Debug)] pub struct UnixStream { diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index e3e6a696..f343deb2 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "compio-runtime" -version = "0.1.1" +version = "0.2.0" description = "high-level runtime for compio" categories = ["asynchronous"] keywords = ["async", "runtime"] @@ -35,6 +35,7 @@ compio-log = { workspace = true } async-task = "4.5.0" cfg-if = { workspace = true, optional = true } +criterion = { workspace = true, optional = true } futures-util = { workspace = true } once_cell = { workspace = true } send_wrapper = { workspace = true } diff --git a/compio-runtime/src/attacher.rs b/compio-runtime/src/attacher.rs index 9f702a61..8e509705 100644 --- a/compio-runtime/src/attacher.rs +++ b/compio-runtime/src/attacher.rs @@ -7,7 +7,7 @@ use compio_driver::AsRawFd; #[cfg(not(feature = "once_cell_try"))] use once_cell::sync::OnceCell as OnceLock; -use crate::attach; +use crate::Runtime; /// Attach a handle to the driver of current thread. /// @@ -17,7 +17,7 @@ use crate::attach; #[derive(Debug, Clone)] pub struct Attacher { // Make it thread safe. - once: OnceLock<()>, + once: OnceLock, // Make it !Send & !Sync. _p: PhantomData<*mut ()>, } @@ -33,9 +33,24 @@ impl Attacher { /// Attach the source. This method could be called many times, but if the /// action fails, the error will only return once. + /// + /// You should always call this method before accessing the runtime. It + /// ensures that the current runtime is the exact runtime attached before. pub fn attach(&self, source: &impl AsRawFd) -> io::Result<()> { - self.once.get_or_try_init(|| attach(source.as_raw_fd()))?; - Ok(()) + let r = Runtime::current(); + let inner = r.inner(); + let id = self.once.get_or_try_init(|| { + inner.attach(source.as_raw_fd())?; + io::Result::Ok(inner.id()) + })?; + if id != &inner.id() { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "the current runtime is not the attached runtime", + )) + } else { + Ok(()) + } } /// Check if [`attach`] has been called. diff --git a/compio-runtime/src/event/eventfd.rs b/compio-runtime/src/event/eventfd.rs index 1bf76aec..16bf3164 100644 --- a/compio-runtime/src/event/eventfd.rs +++ b/compio-runtime/src/event/eventfd.rs @@ -6,7 +6,7 @@ use std::{ use compio_buf::{arrayvec::ArrayVec, BufResult}; use compio_driver::{impl_raw_fd, op::Recv, syscall}; -use crate::{attacher::Attacher, submit}; +use crate::{attacher::Attacher, Runtime}; /// An event that won't wake until [`EventHandle::notify`] is called /// successfully. @@ -38,7 +38,7 @@ impl Event { let buffer = ArrayVec::::new(); // Trick: Recv uses readv which doesn't seek. let op = Recv::new(self.as_raw_fd(), buffer); - let BufResult(res, _) = submit(op).await; + let BufResult(res, _) = Runtime::current().submit(op).await; res?; Ok(()) } diff --git a/compio-runtime/src/event/iocp.rs b/compio-runtime/src/event/iocp.rs index 5f58d4f7..f1b3791c 100644 --- a/compio-runtime/src/event/iocp.rs +++ b/compio-runtime/src/event/iocp.rs @@ -3,7 +3,7 @@ use std::{io, pin::Pin, ptr::null_mut, task::Poll}; use compio_driver::{syscall, AsRawFd, OpCode, PushEntry, RawFd}; use windows_sys::Win32::System::IO::{PostQueuedCompletionStatus, OVERLAPPED}; -use crate::{key::Key, runtime::op::OpFuture, RUNTIME}; +use crate::{key::Key, runtime::op::OpFuture, Runtime}; /// An event that won't wake until [`EventHandle::notify`] is called /// successfully. @@ -15,7 +15,7 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let user_data = RUNTIME.with(|runtime| runtime.submit_raw(NopPending::new())); + let user_data = Runtime::current().inner().submit_raw(NopPending::new()); let user_data = match user_data { PushEntry::Pending(user_data) => user_data, PushEntry::Ready(_) => unreachable!("NopPending always returns Pending"), @@ -48,7 +48,7 @@ unsafe impl Sync for EventHandle {} impl EventHandle { fn new(user_data: &Key) -> Self { - let handle = RUNTIME.with(|runtime| runtime.as_raw_fd()); + let handle = Runtime::current().as_raw_fd(); Self { user_data: **user_data, handle, diff --git a/compio-runtime/src/event/pipe.rs b/compio-runtime/src/event/pipe.rs index d5aa5c8e..f2151fc3 100644 --- a/compio-runtime/src/event/pipe.rs +++ b/compio-runtime/src/event/pipe.rs @@ -6,7 +6,7 @@ use std::{ use compio_buf::{arrayvec::ArrayVec, BufResult}; use compio_driver::{impl_raw_fd, op::Recv, syscall}; -use crate::{attacher::Attacher, submit}; +use crate::{attacher::Attacher, Runtime}; /// An event that won't wake until [`EventHandle::notify`] is called /// successfully. @@ -47,7 +47,7 @@ impl Event { let buffer = ArrayVec::::new(); // Trick: Recv uses readv which doesn't seek. let op = Recv::new(self.receiver.as_raw_fd(), buffer); - let BufResult(res, _) = submit(op).await; + let BufResult(res, _) = Runtime::current().submit(op).await; res?; Ok(()) } diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index c1cd9337..e8ad78fd 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -1,9 +1,7 @@ //! The runtime of compio. -//! We don't expose the runtime struct because there could be only one runtime -//! in each thread. //! //! ``` -//! let ans = compio_runtime::block_on(async { +//! let ans = compio_runtime::Runtime::new().unwrap().block_on(async { //! println!("Hello world!"); //! 42 //! }); @@ -11,87 +9,20 @@ //! ``` #![cfg_attr(feature = "once_cell_try", feature(once_cell_try))] -#![cfg_attr(feature = "lazy_cell", feature(lazy_cell))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![warn(missing_docs)] mod attacher; mod key; -pub(crate) mod runtime; +mod runtime; #[cfg(feature = "event")] pub mod event; #[cfg(feature = "time")] pub mod time; -use std::{cell::RefCell, future::Future, io, mem::ManuallyDrop}; - pub use async_task::Task; pub use attacher::*; use compio_buf::BufResult; -use compio_driver::{OpCode, ProactorBuilder, RawFd}; pub(crate) use key::Key; -// It cannot be replaced by std one because of lacking `get` method. -use once_cell::unsync::Lazy as LazyCell; -use runtime::Runtime; - -thread_local! { - pub(crate) static PROACTOR_BUILDER: RefCell = RefCell::new(ProactorBuilder::new()); - // Use ManuallyDrop here to avoid misc bugs on some platforms when - // trying to get current thread id after thread local resources dropped. - pub(crate) static RUNTIME: LazyCell> = LazyCell::new(|| { - ManuallyDrop::new( - PROACTOR_BUILDER - .with(|builder| Runtime::new(&builder.borrow())) - .expect("cannot create compio runtime") - ) - }); -} - -/// Config the inner proactor with a [`ProactorBuilder`]. Note that if any -/// runtime related method is called before, there will be no influence. -/// The return value indicates whether the builder will be used when -/// initializing the runtime. -pub fn config_proactor(new_builder: ProactorBuilder) -> bool { - PROACTOR_BUILDER.with(|builder| *builder.borrow_mut() = new_builder); - RUNTIME.with(|runtime| LazyCell::get(runtime).is_none()) -} - -/// Start a compio runtime and block on the future till it completes. -pub fn block_on(future: F) -> F::Output { - RUNTIME.with(|runtime| runtime.block_on(future)) -} - -/// Spawns a new asynchronous task, returning a [`Task`] for it. -/// -/// Spawning a task enables the task to execute concurrently to other tasks. -/// There is no guarantee that a spawned task will execute to completion. -/// -/// ``` -/// compio_runtime::block_on(async { -/// let task = compio_runtime::spawn(async { -/// println!("Hello from a spawned task!"); -/// 42 -/// }); -/// -/// assert_eq!(task.await, 42); -/// }) -/// ``` -pub fn spawn(future: F) -> Task { - RUNTIME.with(|runtime| runtime.spawn(future)) -} - -/// Attach a raw file descriptor/handle/socket to the runtime. -/// -/// You only need this when authoring your own high-level APIs. High-level -/// resources in this crate are attached automatically. -pub fn attach(fd: RawFd) -> io::Result<()> { - RUNTIME.with(|runtime| runtime.attach(fd)) -} - -/// Submit an operation to the runtime. -/// -/// You only need this when authoring your own [`OpCode`]. -pub fn submit(op: T) -> impl Future> { - RUNTIME.with(|runtime| runtime.submit(op)) -} +pub use runtime::{spawn, EnterGuard, Runtime, RuntimeBuilder}; diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 4df31a24..244a3306 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -3,7 +3,8 @@ use std::{ collections::VecDeque, future::{ready, Future}, io, - rc::Rc, + rc::{Rc, Weak}, + sync::atomic::{AtomicUsize, Ordering}, task::{Context, Poll}, }; @@ -25,7 +26,10 @@ use crate::{ BufResult, Key, }; -pub(crate) struct Runtime { +static RUNTIME_COUNTER: AtomicUsize = AtomicUsize::new(0); + +pub(crate) struct RuntimeInner { + id: usize, driver: RefCell, runnables: Rc>>, op_runtime: RefCell, @@ -33,9 +37,10 @@ pub(crate) struct Runtime { timer_runtime: RefCell, } -impl Runtime { +impl RuntimeInner { pub fn new(builder: &ProactorBuilder) -> io::Result { Ok(Self { + id: RUNTIME_COUNTER.fetch_add(1, Ordering::AcqRel), driver: RefCell::new(builder.build()?), runnables: Rc::new(RefCell::default()), op_runtime: RefCell::default(), @@ -44,6 +49,10 @@ impl Runtime { }) } + pub fn id(&self) -> usize { + self.id + } + // Safety: the return runnable should be scheduled. unsafe fn spawn_unchecked(&self, future: F) -> Task { // clone is cheap because it is Rc; @@ -187,8 +196,270 @@ impl Runtime { } } -impl AsRawFd for Runtime { +impl AsRawFd for RuntimeInner { fn as_raw_fd(&self) -> RawFd { self.driver.borrow().as_raw_fd() } } + +struct RuntimeContext { + depth: usize, + ptr: Weak, +} + +impl RuntimeContext { + pub fn new() -> Self { + Self { + depth: 0, + ptr: Weak::new(), + } + } + + pub fn inc_depth(&mut self) -> usize { + let depth = self.depth; + self.depth += 1; + depth + } + + pub fn dec_depth(&mut self) -> usize { + self.depth -= 1; + self.depth + } + + pub fn set_runtime(&mut self, ptr: Weak) -> Weak { + std::mem::replace(&mut self.ptr, ptr) + } + + pub fn upgrade_runtime(&self) -> Option { + self.ptr.upgrade().map(|inner| Runtime { inner }) + } +} + +thread_local! { + static CURRENT_RUNTIME: RefCell = RefCell::new(RuntimeContext::new()); +} + +/// The async runtime of compio. It is a thread local runtime, and cannot be +/// sent to other threads. +#[derive(Clone)] +pub struct Runtime { + inner: Rc, +} + +impl Runtime { + /// Create [`Runtime`] with default config. + pub fn new() -> io::Result { + Self::builder().build() + } + + /// Create a builder for [`Runtime`]. + pub fn builder() -> RuntimeBuilder { + RuntimeBuilder::new() + } + + /// Get the current running [`Runtime`]. + pub fn try_current() -> Option { + CURRENT_RUNTIME.with_borrow(|r| r.upgrade_runtime()) + } + + /// Get the current running [`Runtime`]. + /// + /// ## Panics + /// + /// This method will panic if there are no running [`Runtime`]. + pub fn current() -> Self { + Self::try_current().expect("not in a compio runtime") + } + + pub(crate) fn inner(&self) -> &RuntimeInner { + &self.inner + } + + /// Enter the runtime context. This runtime will be set as the `current` + /// one. + /// + /// ## Panics + /// + /// When calling `Runtime::enter` multiple times, the returned guards + /// **must** be dropped in the reverse order that they were acquired. + /// Failure to do so will result in a panic and possible memory leaks. + /// + /// Do **not** do the following, this shows a scenario that will result in a + /// panic and possible memory leak. + /// + /// ```should_panic + /// use compio_runtime::Runtime; + /// + /// let rt1 = Runtime::new().unwrap(); + /// let rt2 = Runtime::new().unwrap(); + /// + /// let enter1 = rt1.enter(); + /// let enter2 = rt2.enter(); + /// + /// drop(enter1); + /// drop(enter2); + /// ``` + pub fn enter(&self) -> EnterGuard { + EnterGuard::new(self) + } + + /// Block on the future till it completes. + pub fn block_on(&self, future: F) -> F::Output { + let guard = self.enter(); + guard.block_on(future) + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. + /// There is no guarantee that a spawned task will execute to completion. + pub fn spawn(&self, future: F) -> Task { + self.inner.spawn(future) + } + + /// Attach a raw file descriptor/handle/socket to the runtime. + /// + /// You only need this when authoring your own high-level APIs. High-level + /// resources in this crate are attached automatically. + pub fn attach(&self, fd: RawFd) -> io::Result<()> { + self.inner.attach(fd) + } + + /// Submit an operation to the runtime. + /// + /// You only need this when authoring your own [`OpCode`]. + pub fn submit(&self, op: T) -> impl Future> { + self.inner.submit(op) + } +} + +impl AsRawFd for Runtime { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(feature = "criterion")] +impl criterion::async_executor::AsyncExecutor for Runtime { + fn block_on(&self, future: impl Future) -> T { + self.block_on(future) + } +} + +#[cfg(feature = "criterion")] +impl criterion::async_executor::AsyncExecutor for &Runtime { + fn block_on(&self, future: impl Future) -> T { + (**self).block_on(future) + } +} + +/// Builder for [`Runtime`]. +#[derive(Debug, Clone)] +pub struct RuntimeBuilder { + proactor_builder: ProactorBuilder, +} + +impl Default for RuntimeBuilder { + fn default() -> Self { + Self::new() + } +} + +impl RuntimeBuilder { + /// Create the builder with default config. + pub fn new() -> Self { + Self { + proactor_builder: ProactorBuilder::new(), + } + } + + /// Replace proactor builder. + pub fn with_proactor(&mut self, builder: ProactorBuilder) -> &mut Self { + self.proactor_builder = builder; + self + } + + /// Build [`Runtime`]. + pub fn build(&self) -> io::Result { + Ok(Runtime { + inner: Rc::new(RuntimeInner::new(&self.proactor_builder)?), + }) + } +} + +/// Runtime context guard. +/// +/// When the guard is dropped, exit the corresponding runtime context. +#[must_use] +pub struct EnterGuard<'a> { + runtime: &'a Runtime, + old_ptr: Weak, + depth: usize, +} + +impl<'a> EnterGuard<'a> { + fn new(runtime: &'a Runtime) -> Self { + let (old_ptr, depth) = CURRENT_RUNTIME.with_borrow_mut(|ctx| { + ( + ctx.set_runtime(Rc::downgrade(&runtime.inner)), + ctx.inc_depth(), + ) + }); + Self { + runtime, + old_ptr, + depth, + } + } + + /// Block on the future in the runtime backed of this guard. + pub fn block_on(&self, future: F) -> F::Output { + self.runtime.inner.block_on(future) + } +} + +#[cold] +fn panic_incorrent_drop_order() { + if !std::thread::panicking() { + panic!( + "`EnterGuard` values dropped out of order. Guards returned by `Runtime::enter()` must \ + be dropped in the reverse order as they were acquired." + ) + } +} + +impl Drop for EnterGuard<'_> { + fn drop(&mut self) { + let depth = CURRENT_RUNTIME.with_borrow_mut(|ctx| { + ctx.set_runtime(std::mem::take(&mut self.old_ptr)); + ctx.dec_depth() + }); + if depth != self.depth { + panic_incorrent_drop_order() + } + } +} + +/// Spawns a new asynchronous task, returning a [`Task`] for it. +/// +/// Spawning a task enables the task to execute concurrently to other tasks. +/// There is no guarantee that a spawned task will execute to completion. +/// +/// ``` +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let task = compio_runtime::spawn(async { +/// println!("Hello from a spawned task!"); +/// 42 +/// }); +/// +/// assert_eq!(task.await, 42); +/// # }) +/// ``` +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::current`]. +pub fn spawn(future: F) -> Task { + Runtime::current().spawn(future) +} diff --git a/compio-runtime/src/runtime/op.rs b/compio-runtime/src/runtime/op.rs index a7216e3f..da6385c2 100644 --- a/compio-runtime/src/runtime/op.rs +++ b/compio-runtime/src/runtime/op.rs @@ -8,7 +8,7 @@ use std::{ use compio_buf::BufResult; use compio_driver::{Entry, OpCode}; -use crate::key::Key; +use crate::{key::Key, Runtime}; #[derive(Default)] pub(crate) struct RegisteredOp { @@ -73,7 +73,7 @@ impl Future for OpFuture { type Output = BufResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = crate::RUNTIME.with(|runtime| runtime.poll_task(cx, self.user_data)); + let res = Runtime::current().inner().poll_task(cx, self.user_data); if res.is_ready() { self.get_mut().completed = true; } @@ -84,7 +84,7 @@ impl Future for OpFuture { impl Drop for OpFuture { fn drop(&mut self) { if !self.completed { - crate::RUNTIME.with(|runtime| runtime.cancel_op(self.user_data)) + Runtime::current().inner().cancel_op(self.user_data) } } } diff --git a/compio-runtime/src/runtime/time.rs b/compio-runtime/src/runtime/time.rs index e2d73291..4707d19c 100644 --- a/compio-runtime/src/runtime/time.rs +++ b/compio-runtime/src/runtime/time.rs @@ -8,6 +8,8 @@ use std::{ use slab::Slab; +use crate::Runtime; + #[derive(Debug)] struct TimerEntry { key: usize, @@ -121,7 +123,7 @@ impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = crate::RUNTIME.with(|runtime| runtime.poll_timer(cx, self.key)); + let res = Runtime::current().inner().poll_timer(cx, self.key); if res.is_ready() { self.get_mut().completed = true; } @@ -132,7 +134,7 @@ impl Future for TimerFuture { impl Drop for TimerFuture { fn drop(&mut self) { if !self.completed { - crate::RUNTIME.with(|runtime| runtime.cancel_timer(self.key)); + Runtime::current().inner().cancel_timer(self.key); } } } diff --git a/compio-runtime/src/time.rs b/compio-runtime/src/time.rs index db27ca99..c5a23cda 100644 --- a/compio-runtime/src/time.rs +++ b/compio-runtime/src/time.rs @@ -9,6 +9,8 @@ use std::{ use futures_util::{select, FutureExt}; +use crate::Runtime; + /// Waits until `duration` has elapsed. /// /// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An @@ -25,15 +27,13 @@ use futures_util::{select, FutureExt}; /// /// use compio_runtime::time::sleep; /// -/// compio_runtime::block_on(async { -/// sleep(Duration::from_millis(100)).await; -/// println!("100 ms have elapsed"); -/// }) +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// sleep(Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// # }) /// ``` pub async fn sleep(duration: Duration) { - crate::RUNTIME - .with(|runtime| runtime.create_timer(duration)) - .await + Runtime::current().inner().create_timer(duration).await } /// Waits until `deadline` is reached. @@ -49,10 +49,10 @@ pub async fn sleep(duration: Duration) { /// /// use compio_runtime::time::sleep_until; /// -/// compio_runtime::block_on(async { -/// sleep_until(Instant::now() + Duration::from_millis(100)).await; -/// println!("100 ms have elapsed"); -/// }) +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// sleep_until(Instant::now() + Duration::from_millis(100)).await; +/// println!("100 ms have elapsed"); +/// # }) /// ``` pub async fn sleep_until(deadline: Instant) { sleep(deadline - Instant::now()).await @@ -151,15 +151,15 @@ impl Interval { /// /// use compio_runtime::time::interval; /// -/// compio_runtime::block_on(async { -/// let mut interval = interval(Duration::from_millis(10)); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let mut interval = interval(Duration::from_millis(10)); /// -/// interval.tick().await; // ticks immediately -/// interval.tick().await; // ticks after 10ms -/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks immediately +/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks after 10ms /// -/// // approximately 20ms have elapsed. -/// }) +/// // approximately 20ms have elapsed. +/// # }) /// ``` /// /// A simple example using [`interval`] to execute a task every two seconds. @@ -183,13 +183,13 @@ impl Interval { /// sleep(Duration::from_secs(1)).await /// } /// -/// compio_runtime::block_on(async { -/// let mut interval = interval(Duration::from_secs(2)); -/// for _i in 0..5 { -/// interval.tick().await; -/// task_that_takes_a_second().await; -/// } -/// }) +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let mut interval = interval(Duration::from_secs(2)); +/// for _i in 0..5 { +/// interval.tick().await; +/// task_that_takes_a_second().await; +/// } +/// # }) /// ``` /// /// [`sleep`]: crate::time::sleep() @@ -215,16 +215,16 @@ pub fn interval(period: Duration) -> Interval { /// /// use compio_runtime::time::interval_at; /// -/// compio_runtime::block_on(async { -/// let start = Instant::now() + Duration::from_millis(50); -/// let mut interval = interval_at(start, Duration::from_millis(10)); +/// # compio_runtime::Runtime::new().unwrap().block_on(async { +/// let start = Instant::now() + Duration::from_millis(50); +/// let mut interval = interval_at(start, Duration::from_millis(10)); /// -/// interval.tick().await; // ticks after 50ms -/// interval.tick().await; // ticks after 10ms -/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks after 50ms +/// interval.tick().await; // ticks after 10ms +/// interval.tick().await; // ticks after 10ms /// -/// // approximately 70ms have elapsed. -/// }); +/// // approximately 70ms have elapsed. +/// # }); /// ``` pub fn interval_at(start: Instant, period: Duration) -> Interval { assert!(period > Duration::ZERO, "`period` must be non-zero."); diff --git a/compio-runtime/tests/event.rs b/compio-runtime/tests/event.rs index fbdabce4..92055db2 100644 --- a/compio-runtime/tests/event.rs +++ b/compio-runtime/tests/event.rs @@ -2,7 +2,7 @@ use compio_runtime::event::Event; #[test] fn event_handle() { - compio_runtime::block_on(async { + compio_runtime::Runtime::new().unwrap().block_on(async { let event = Event::new().unwrap(); let handle = event.handle().unwrap(); std::thread::spawn(move || { diff --git a/compio-signal/src/lib.rs b/compio-signal/src/lib.rs index cf6a60bc..1138e412 100644 --- a/compio-signal/src/lib.rs +++ b/compio-signal/src/lib.rs @@ -7,10 +7,10 @@ //! ```rust,no_run //! use compio_signal::ctrl_c; //! -//! compio_runtime::block_on(async { -//! ctrl_c().await.unwrap(); -//! println!("ctrl-c received!"); -//! }) +//! # compio_runtime::Runtime::new().unwrap().block_on(async { +//! ctrl_c().await.unwrap(); +//! println!("ctrl-c received!"); +//! # }) //! ``` #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] diff --git a/compio-signal/src/unix.rs b/compio-signal/src/unix.rs index fd7c4a01..9478e06f 100644 --- a/compio-signal/src/unix.rs +++ b/compio-signal/src/unix.rs @@ -15,8 +15,7 @@ thread_local! { } unsafe extern "C" fn signal_handler(sig: i32) { - HANDLER.with(|handler| { - let mut handler = handler.borrow_mut(); + HANDLER.with_borrow_mut(|handler| { if let Some(fds) = handler.get_mut(&sig) { if !fds.is_empty() { let fds = std::mem::take(fds); @@ -40,19 +39,12 @@ fn register(sig: i32, fd: &Event) -> io::Result<()> { unsafe { init(sig) }; let raw_fd = fd.as_raw_fd(); let handle = fd.handle()?; - HANDLER.with(|handler| { - handler - .borrow_mut() - .entry(sig) - .or_default() - .insert(raw_fd, handle) - }); + HANDLER.with_borrow_mut(|handler| handler.entry(sig).or_default().insert(raw_fd, handle)); Ok(()) } fn unregister(sig: i32, fd: RawFd) { - let need_uninit = HANDLER.with(|handler| { - let mut handler = handler.borrow_mut(); + let need_uninit = HANDLER.with_borrow_mut(|handler| { if let Some(fds) = handler.get_mut(&sig) { fds.remove(&fd); if !fds.is_empty() { diff --git a/compio/Cargo.toml b/compio/Cargo.toml index f6974fe2..203fe45b 100644 --- a/compio/Cargo.toml +++ b/compio/Cargo.toml @@ -44,9 +44,10 @@ compio-log = { workspace = true, optional = true } # Shared dev dependencies for all platforms [dev-dependencies] compio-buf = { workspace = true, features = ["bumpalo"] } +compio-runtime = { workspace = true, features = ["criterion"] } compio-macros = { workspace = true } -criterion = { version = "0.5.1", features = ["async_tokio"] } +criterion = { workspace = true, features = ["async_tokio"] } futures-util = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = [ @@ -84,6 +85,8 @@ time = ["compio-runtime/time", "runtime"] dispatcher = ["dep:compio-dispatcher", "runtime"] all = ["time", "macros", "signal", "dispatcher"] +criterion = ["compio-runtime?/criterion"] + # Nightly features allocator_api = ["compio-buf/allocator_api", "compio-io?/allocator_api"] lazy_cell = ["compio-signal?/lazy_cell"] diff --git a/compio/benches/fs.rs b/compio/benches/fs.rs index 41e91d7e..96dd78f0 100644 --- a/compio/benches/fs.rs +++ b/compio/benches/fs.rs @@ -1,17 +1,9 @@ -use criterion::{async_executor::AsyncExecutor, criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, Criterion}; use tempfile::NamedTempFile; criterion_group!(fs, read, write); criterion_main!(fs); -struct CompioRuntime; - -impl AsyncExecutor for CompioRuntime { - fn block_on(&self, future: impl std::future::Future) -> T { - compio::runtime::block_on(future) - } -} - fn read(c: &mut Criterion) { let mut group = c.benchmark_group("read"); @@ -42,7 +34,8 @@ fn read(c: &mut Criterion) { }); group.bench_function("compio", |b| { - b.to_async(CompioRuntime).iter(|| async { + let runtime = compio::runtime::Runtime::new().unwrap(); + b.to_async(&runtime).iter(|| async { use compio::io::AsyncReadAtExt; let file = compio::fs::File::open("Cargo.toml").await.unwrap(); @@ -85,8 +78,9 @@ fn write(c: &mut Criterion) { }); group.bench_function("compio", |b| { + let runtime = compio::runtime::Runtime::new().unwrap(); let temp_file = NamedTempFile::new().unwrap(); - b.to_async(CompioRuntime).iter(|| async { + b.to_async(&runtime).iter(|| async { use compio::io::AsyncWriteAtExt; let mut file = compio::fs::File::create(temp_file.path()).await.unwrap(); diff --git a/compio/benches/named_pipe.rs b/compio/benches/named_pipe.rs index 80438492..5f937313 100644 --- a/compio/benches/named_pipe.rs +++ b/compio/benches/named_pipe.rs @@ -1,16 +1,8 @@ -use criterion::{async_executor::AsyncExecutor, criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, Criterion}; criterion_group!(named_pipe, basic); criterion_main!(named_pipe); -struct CompioRuntime; - -impl AsyncExecutor for CompioRuntime { - fn block_on(&self, future: impl std::future::Future) -> T { - compio::runtime::block_on(future) - } -} - fn basic(c: &mut Criterion) { #[allow(dead_code)] const PACKET_LEN: usize = 1048576; @@ -51,7 +43,8 @@ fn basic(c: &mut Criterion) { }); group.bench_function("compio", |b| { - b.to_async(CompioRuntime).iter(|| async { + let runtime = compio::runtime::Runtime::new().unwrap(); + b.to_async(&runtime).iter(|| async { #[cfg(windows)] { use compio::{ diff --git a/compio/benches/net.rs b/compio/benches/net.rs index db01356b..4c769810 100644 --- a/compio/benches/net.rs +++ b/compio/benches/net.rs @@ -1,16 +1,8 @@ -use criterion::{async_executor::AsyncExecutor, criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, Criterion}; criterion_group!(net, tcp, udp); criterion_main!(net); -struct CompioRuntime; - -impl AsyncExecutor for CompioRuntime { - fn block_on(&self, future: impl std::future::Future) -> T { - compio::runtime::block_on(future) - } -} - fn tcp(c: &mut Criterion) { const PACKET_LEN: usize = 1048576; static PACKET: &[u8] = &[1u8; PACKET_LEN]; @@ -40,7 +32,8 @@ fn tcp(c: &mut Criterion) { }); group.bench_function("compio", |b| { - b.to_async(CompioRuntime).iter(|| async { + let runtime = compio::runtime::Runtime::new().unwrap(); + b.to_async(&runtime).iter(|| async { use compio::io::{AsyncReadExt, AsyncWriteExt}; let listener = compio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -105,7 +98,8 @@ fn udp(c: &mut Criterion) { }); group.bench_function("compio", |b| { - b.to_async(CompioRuntime).iter(|| async { + let runtime = compio::runtime::Runtime::new().unwrap(); + b.to_async(&runtime).iter(|| async { let rx = compio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap(); let addr_rx = rx.local_addr().unwrap(); let tx = compio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap(); diff --git a/compio/src/lib.rs b/compio/src/lib.rs index 3a222068..78df6585 100644 --- a/compio/src/lib.rs +++ b/compio/src/lib.rs @@ -5,7 +5,7 @@ //! //! ## Quick start //! ```rust -//! # compio::runtime::block_on(async { +//! # compio::runtime::Runtime::new().unwrap().block_on(async { //! use compio::{fs::File, io::AsyncReadAtExt}; //! //! let file = File::open("Cargo.toml").await.unwrap(); diff --git a/compio/tests/runtime.rs b/compio/tests/runtime.rs index ef3bbe56..cc84278b 100644 --- a/compio/tests/runtime.rs +++ b/compio/tests/runtime.rs @@ -26,7 +26,7 @@ async fn multi_threading() { let rx = Unattached::new(rx).unwrap(); if let Err(e) = std::thread::spawn(move || { let mut rx = rx.into_inner(); - compio::runtime::block_on(async { + compio::runtime::Runtime::new().unwrap().block_on(async { let buffer = Vec::with_capacity(DATA.len()); let (n, buffer) = rx.read_exact(buffer).await.unwrap(); assert_eq!(n, buffer.len()); @@ -55,7 +55,7 @@ async fn try_clone() { let rx = Unattached::new(rx.try_clone().unwrap()).unwrap(); if let Err(e) = std::thread::spawn(move || { let mut rx = rx.into_inner(); - compio::runtime::block_on(async { + compio::runtime::Runtime::new().unwrap().block_on(async { let buffer = Vec::with_capacity(DATA.len()); let (n, buffer) = rx.read_exact(buffer).await.unwrap(); assert_eq!(n, buffer.len());