Skip to content

Commit

Permalink
Merge pull request #141 from Berrysoft/refactor/pub-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
George-Miao authored Nov 17, 2023
2 parents 3414f00 + 5d90270 commit 7274ee5
Show file tree
Hide file tree
Showing 36 changed files with 572 additions and 341 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repository = "https://github.com/compio-rs/compio"
[workspace.dependencies]
compio-buf = { path = "./compio-buf", version = "0.2.0" }
compio-driver = { path = "./compio-driver", version = "0.2.0", default-features = false }
compio-runtime = { path = "./compio-runtime", version = "0.1.1" }
compio-runtime = { path = "./compio-runtime", version = "0.2.0" }
compio-macros = { path = "./compio-macros", version = "0.1.1" }
compio-fs = { path = "./compio-fs", version = "0.2.0" }
compio-io = { path = "./compio-io", version = "0.1.0" }
Expand All @@ -40,6 +40,7 @@ compio-tls = { path = "./compio-tls", version = "0.1.0" }


cfg-if = "1.0.0"
criterion = "0.5.1"
crossbeam-channel = "0.5.8"
crossbeam-queue = "0.3.8"
futures-channel = "0.3.29"
Expand Down
19 changes: 11 additions & 8 deletions compio-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
};

use compio_driver::{AsyncifyPool, ProactorBuilder};
use compio_runtime::event::{Event, EventHandle};
use compio_runtime::{
event::{Event, EventHandle},
Runtime,
};
use crossbeam_channel::{unbounded, Sender};
use futures_util::{future::LocalBoxFuture, FutureExt};

Expand Down Expand Up @@ -51,14 +54,14 @@ impl Dispatcher {
};

thread_builder.spawn(move || {
let succeeded = compio_runtime::config_proactor(proactor_builder);
debug_assert!(
succeeded,
"the runtime should not be created before proactor builder set"
);
let runtime = Runtime::builder()
.with_proactor(proactor_builder)
.build()
.expect("cannot create compio runtime");
let _guard = runtime.enter();
while let Ok(f) = receiver.recv() {
*f.result.lock().unwrap() = Some(std::panic::catch_unwind(move || {
compio_runtime::block_on((f.func)());
*f.result.lock().unwrap() = Some(std::panic::catch_unwind(|| {
Runtime::current().block_on((f.func)());
}));
f.handle.notify().ok();
}
Expand Down
22 changes: 15 additions & 7 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut},
compio_driver::op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt},
compio_io::{AsyncReadAt, AsyncWriteAt},
compio_runtime::{submit, Attachable, Attacher},
compio_runtime::{Attachable, Attacher, Runtime},
std::{future::Future, mem::ManuallyDrop, path::Path},
};
#[cfg(all(feature = "runtime", unix))]
Expand Down Expand Up @@ -64,7 +64,7 @@ impl File {
let this = ManuallyDrop::new(self);
async move {
let op = CloseFile::new(this.as_raw_fd());
submit(op).await.0?;
Runtime::current().submit(op).await.0?;
Ok(())
}
}
Expand All @@ -91,7 +91,7 @@ impl File {
async fn sync_impl(&self, datasync: bool) -> io::Result<()> {
self.attach()?;
let op = Sync::new(self.as_raw_fd(), datasync);
submit(op).await.0?;
Runtime::current().submit(op).await.0?;
Ok(())
}

Expand Down Expand Up @@ -126,7 +126,11 @@ impl AsyncReadAt for File {
async fn read_at<T: IoBufMut>(&self, buffer: T, pos: u64) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = ReadAt::new(self.as_raw_fd(), pos, buffer);
submit(op).await.into_inner().map_advanced()
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
}

#[cfg(unix)]
Expand All @@ -137,7 +141,11 @@ impl AsyncReadAt for File {
) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = ReadVectoredAt::new(self.as_raw_fd(), pos, buffer);
submit(op).await.into_inner().map_advanced()
Runtime::current()
.submit(op)
.await
.into_inner()
.map_advanced()
}
}

Expand All @@ -164,7 +172,7 @@ impl AsyncWriteAt for &File {
async fn write_at<T: IoBuf>(&mut self, buffer: T, pos: u64) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = WriteAt::new(self.as_raw_fd(), pos, buffer);
submit(op).await.into_inner()
Runtime::current().submit(op).await.into_inner()
}

#[cfg(unix)]
Expand All @@ -175,7 +183,7 @@ impl AsyncWriteAt for &File {
) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = WriteVectoredAt::new(self.as_raw_fd(), pos, buffer);
submit(op).await.into_inner()
Runtime::current().submit(op).await.into_inner()
}
}

Expand Down
83 changes: 40 additions & 43 deletions compio-fs/src/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use {
compio_buf::{BufResult, IoBuf, IoBufMut},
compio_driver::op::ConnectNamedPipe,
compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt},
compio_runtime::{impl_attachable, submit, Attachable},
compio_runtime::{impl_attachable, Attachable, Runtime},
};

use crate::File;
Expand Down Expand Up @@ -68,30 +68,27 @@ use crate::File;
/// .create(PIPE_NAME)?;
///
/// // Spawn the server loop.
/// let server = compio_runtime::block_on(async move {
/// loop {
/// // Wait for a client to connect.
/// let connected = server.connect().await?;
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// loop {
/// // Wait for a client to connect.
/// let connected = server.connect().await?;
///
/// // Construct the next server to be connected before sending the one
/// // we already have of onto a task. This ensures that the server
/// // isn't closed (after it's done in the task) before a new one is
/// // available. Otherwise the client might error with
/// // `io::ErrorKind::NotFound`.
/// server = ServerOptions::new().create(PIPE_NAME)?;
/// // Construct the next server to be connected before sending the one
/// // we already have of onto a task. This ensures that the server
/// // isn't closed (after it's done in the task) before a new one is
/// // available. Otherwise the client might error with
/// // `io::ErrorKind::NotFound`.
/// server = ServerOptions::new().create(PIPE_NAME)?;
///
/// let client = compio_runtime::spawn(async move {
/// // use the connected client
/// # Ok::<_, std::io::Error>(())
/// });
/// # if true { break } // needed for type inference to work
/// }
///
/// Ok::<_, io::Error>(())
/// });
///
/// // do something else not server related here
/// # Ok(()) }
/// let client = compio_runtime::spawn(async move {
/// // use the connected client
/// # Ok::<_, std::io::Error>(())
/// });
/// # if true { break } // needed for type inference to work
/// }
/// # Ok::<_, io::Error>(())
/// # })
/// # }
/// ```
///
/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
Expand All @@ -118,7 +115,7 @@ impl NamedPipeServer {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-server-info";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new()
/// .pipe_mode(PipeMode::Message)
/// .max_instances(5)
Expand Down Expand Up @@ -149,7 +146,7 @@ impl NamedPipeServer {
///
/// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let pipe = ServerOptions::new().create(PIPE_NAME)?;
///
/// // Wait for a client to connect.
Expand All @@ -162,7 +159,7 @@ impl NamedPipeServer {
pub async fn connect(&self) -> io::Result<()> {
self.attach()?;
let op = ConnectNamedPipe::new(self.as_raw_fd());
submit(op).await.0?;
Runtime::current().submit(op).await.0?;
Ok(())
}

Expand All @@ -176,7 +173,7 @@ impl NamedPipeServer {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-disconnect";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
///
/// let mut client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
Expand Down Expand Up @@ -280,7 +277,7 @@ impl_attachable!(NamedPipeServer, handle);
///
/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let client = loop {
/// match ClientOptions::new().open(PIPE_NAME).await {
/// Ok(client) => break client,
Expand Down Expand Up @@ -320,7 +317,7 @@ impl NamedPipeClient {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-info";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let client = ClientOptions::new().open(PIPE_NAME).await?;
///
/// let client_info = client.info()?;
Expand Down Expand Up @@ -481,7 +478,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err1";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let _server = ServerOptions::new()
/// .access_inbound(false)
/// .create(PIPE_NAME)
Expand All @@ -504,7 +501,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound-err2";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new()
/// .access_inbound(false)
/// .create(PIPE_NAME)
Expand Down Expand Up @@ -537,7 +534,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-inbound";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut server = ServerOptions::new()
/// .access_inbound(false)
/// .create(PIPE_NAME)
Expand Down Expand Up @@ -588,7 +585,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err1";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new()
/// .access_outbound(false)
/// .create(PIPE_NAME)
Expand All @@ -611,7 +608,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound-err2";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new()
/// .access_outbound(false)
/// .create(PIPE_NAME)
Expand Down Expand Up @@ -643,7 +640,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-access-outbound";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut server = ServerOptions::new()
/// .access_outbound(false)
/// .create(PIPE_NAME)
Expand Down Expand Up @@ -702,7 +699,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance-error";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server1 = ServerOptions::new()
/// .first_pipe_instance(true)
/// .create(PIPE_NAME)
Expand All @@ -727,7 +724,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-first-instance";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut builder = ServerOptions::new();
/// builder.first_pipe_instance(true);
///
Expand Down Expand Up @@ -770,7 +767,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut pipe_template = ServerOptions::new();
/// pipe_template.write_dac(true);
/// let pipe = pipe_template.create(PIPE_NAME).unwrap();
Expand Down Expand Up @@ -807,7 +804,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut pipe_template = ServerOptions::new();
/// pipe_template.write_dac(false);
/// let pipe = pipe_template.create(PIPE_NAME).unwrap();
Expand Down Expand Up @@ -890,7 +887,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-max-instances";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let mut server = ServerOptions::new();
/// server.max_instances(2);
///
Expand Down Expand Up @@ -971,7 +968,7 @@ impl ServerOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-create";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
/// # })
/// ```
Expand Down Expand Up @@ -1059,7 +1056,7 @@ impl ClientOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe-client-new";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// // Server must be created in order for the client creation to succeed.
/// let server = ServerOptions::new().create(PIPE_NAME).unwrap();
/// let client = ClientOptions::new().open(PIPE_NAME).await.unwrap();
Expand Down Expand Up @@ -1164,7 +1161,7 @@ impl ClientOptions {
///
/// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
///
/// # compio_runtime::block_on(async move {
/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
/// let client = loop {
/// match ClientOptions::new().open(PIPE_NAME).await {
/// Ok(client) => break client,
Expand Down
4 changes: 2 additions & 2 deletions compio-fs/src/open_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::File;
/// ```no_run
/// use compio_fs::OpenOptions;
///
/// # compio_runtime::block_on(async {
/// # compio_runtime::Runtime::new().unwrap().block_on(async {
/// let file = OpenOptions::new().read(true).open("foo.txt").await.unwrap();
/// # });
/// ```
Expand All @@ -44,7 +44,7 @@ use crate::File;
/// ```no_run
/// use compio_fs::OpenOptions;
///
/// # compio_runtime::block_on(async {
/// # compio_runtime::Runtime::new().unwrap().block_on(async {
/// let file = OpenOptions::new()
/// .read(true)
/// .write(true)
Expand Down
4 changes: 2 additions & 2 deletions compio-fs/src/open_options/unix.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{ffi::CString, io, os::unix::prelude::OsStrExt, path::Path};

use compio_driver::{op::OpenFile, FromRawFd, RawFd};
use compio_runtime::submit;
use compio_runtime::Runtime;

use crate::File;

Expand Down Expand Up @@ -96,7 +96,7 @@ impl OpenOptions {
)
})?;
let op = OpenFile::new(p, flags, self.mode);
let fd = submit(op).await.0? as RawFd;
let fd = Runtime::current().submit(op).await.0? as RawFd;
Ok(unsafe { File::from_raw_fd(fd) })
}
}
Loading

0 comments on commit 7274ee5

Please sign in to comment.