Skip to content

Commit

Permalink
improved errors
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Nov 12, 2024
1 parent 9c47cee commit ab389dc
Show file tree
Hide file tree
Showing 31 changed files with 781 additions and 1,394 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions crates/h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
C: quic::Connection<B>,
B: Buf,
{
#[allow(clippy::type_complexity)]
pub(crate) complete_upgrade: Arc<Mutex<Option<OnUpgrade<C::BidiStream, B>>>>,
}

Expand All @@ -61,6 +62,7 @@ where
B: Buf,
{
/// Completes the upgrade to a WebTransport session
#[allow(clippy::type_complexity)]
pub fn upgrade(
&self,
stream: RequestStream<C::BidiStream, B>,
Expand Down Expand Up @@ -139,6 +141,7 @@ where
B: Buf,
{
webtransport_session_map: HashMap<SessionId, WebTransportSession<C, B>>,
#[allow(clippy::type_complexity)]
request_sender: mpsc::Sender<(Request<()>, RequestStream<C::BidiStream, B>)>,
webtransport_request_rx: mpsc::Receiver<WebTransportRequest<C, B>>,
webtransport_request_tx: mpsc::Sender<WebTransportRequest<C, B>>,
Expand Down Expand Up @@ -364,6 +367,7 @@ where
C: quic::Connection<B>,
B: Buf,
{
#[allow(clippy::type_complexity)]
recv: mpsc::Receiver<(Request<()>, RequestStream<C::BidiStream, B>)>,
}

Expand Down Expand Up @@ -408,14 +412,15 @@ where
&mut self.driver
}

/// Accepts an incoming request
/// Internally this method will drive the server until an incoming request is available
/// And returns the request and a request stream.
pub async fn accept(&mut self) -> Result<Option<(Request<()>, RequestStream<C::BidiStream, B>)>, h3::Error> {
match futures_util::future::select(std::pin::pin!(self.incoming.accept()), std::pin::pin!(self.driver.drive())).await {
Either::Left((accept, _)) => Ok(accept),
Either::Right((drive, _)) => drive.map(|_| None),
}
/// Accepts an incoming request
/// Internally this method will drive the server until an incoming request
/// is available And returns the request and a request stream.
pub async fn accept(&mut self) -> Result<Option<(Request<()>, RequestStream<C::BidiStream, B>)>, h3::Error> {
match futures_util::future::select(std::pin::pin!(self.incoming.accept()), std::pin::pin!(self.driver.drive())).await
{
Either::Left((accept, _)) => Ok(accept),
Either::Right((drive, _)) => drive.map(|_| None),
}
}
}

Expand All @@ -441,6 +446,7 @@ where
}

/// Poll the request acceptor
#[allow(clippy::type_complexity)]
pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Request<()>, RequestStream<C::BidiStream, B>)>> {
self.recv.poll_recv(cx)
}
Expand Down
49 changes: 35 additions & 14 deletions crates/h3-webtransport/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,45 @@ where
}

/// Opens a bidi stream
pub async fn open_bi(&self) -> Result<crate::stream::BidiStream<C::BidiStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError> {
pub async fn open_bi(
&self,
) -> Result<crate::stream::BidiStream<C::BidiStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError> {
poll_fn(|cx| self.poll_open_bi(cx)).await
}

/// Polls to open a bidi stream
pub fn poll_open_bi(&self, cx: &mut Context<'_>) -> Poll<Result<crate::stream::BidiStream<C::BidiStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError>> {
self.inner.lock().unwrap().opener.poll_open_bidi(cx).map(|res| res.map(|stream| crate::stream::BidiStream::new(BufRecvStream::new(stream))))
#[allow(clippy::type_complexity)]
pub fn poll_open_bi(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<crate::stream::BidiStream<C::BidiStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError>> {
self.inner
.lock()
.unwrap()
.opener
.poll_open_bidi(cx)
.map(|res| res.map(|stream| crate::stream::BidiStream::new(BufRecvStream::new(stream))))
}

/// Opens a uni stream
pub async fn open_uni(&self) -> Result<crate::stream::SendStream<C::SendStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError> {
pub async fn open_uni(
&self,
) -> Result<crate::stream::SendStream<C::SendStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError> {
poll_fn(|cx| self.poll_open_uni(cx)).await
}

/// Polls to open a uni stream
pub fn poll_open_uni(&self, cx: &mut Context<'_>) -> Poll<Result<crate::stream::SendStream<C::SendStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError>> {
self.inner.lock().unwrap().opener.poll_open_send(cx).map(|res| res.map(|stream| crate::stream::SendStream::new(BufRecvStream::new(stream))))
#[allow(clippy::type_complexity)]
pub fn poll_open_uni(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<crate::stream::SendStream<C::SendStream, B>, <C::OpenStreams as OpenStreams<B>>::OpenError>> {
self.inner
.lock()
.unwrap()
.opener
.poll_open_send(cx)
.map(|res| res.map(|stream| crate::stream::SendStream::new(BufRecvStream::new(stream))))
}
}

Expand All @@ -226,7 +248,8 @@ where
.extension(WebTransportUpgradePending::<C, B> {
complete_upgrade: Arc::new(Mutex::new(Some(Box::new(move |stream| {
Box::pin(async move {
let Some(session) = WebTransportSession::new(stream, can_upgrade.webtransport_request_tx).await? else {
let Some(session) = WebTransportSession::new(stream, can_upgrade.webtransport_request_tx).await?
else {
return Ok(());
};

Expand All @@ -243,10 +266,11 @@ where
}

/// Completes the WebTransport upgrade
#[allow(clippy::type_complexity)]
pub fn complete(
response: &mut Response<B>,
stream: RequestStream<C::BidiStream, B>,
) -> Result<BoxFuture<'static, Result<(), h3::Error>>, RequestStream<C::BidiStream, B>>
) -> Result<BoxFuture<'static, Result<(), h3::Error>>, RequestStream<C::BidiStream, B>>
where
C: quic::Connection<B> + 'static,
B: Buf + 'static + Send + Sync,
Expand Down Expand Up @@ -274,12 +298,9 @@ where
E2: Into<h3::Error>,
{
let Some(can_upgrade) = request.extensions_mut().remove::<WebTransportCanUpgrade<C, B>>() else {
stream.send_response(
http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(())
.unwrap(),
).await?;
stream
.send_response(http::Response::builder().status(StatusCode::BAD_REQUEST).body(()).unwrap())
.await?;
stream.finish().await?;
return Ok(None);
};
Expand Down
33 changes: 10 additions & 23 deletions crates/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ libc = { version = "0.2" }
httpdate = { version = "1" }
itoa = { version = "1" }
smallvec = { version = "1" }
hex = { version = "0.4" }
spin = { version = "0.9" }
async-trait = { version = "0.1" }

# For extra services features
tower-async-service = { version = "0.2", optional = true }
tower-service = { version = "0.3", optional = true }
axum-core = { version = "0.4", optional = true }
async-trait = { version = "0.1", optional = true }

# for tls-rustls features
rustls = { version = "0.23", optional = true }
Expand All @@ -41,7 +40,6 @@ tracing = { version = "0.1", optional = true }
# For http3 features
h3 = { git = "https://github.com/hyperium/h3.git", optional = true }
scuffle-h3-webtransport = { path = "../h3-webtransport", optional = true }
spin = { version = "0.9", optional = true }

# For http3-quinn features
h3-quinn = { git = "https://github.com/hyperium/h3.git", optional = true }
Expand All @@ -61,6 +59,8 @@ tokio-stream = { version = "0.1" }
_tcp = []
_quic = []

error-backtrace = []

http1 = [
"_tcp",
"httparse",
Expand All @@ -73,7 +73,6 @@ http2 = [

http3 = [
"h3",
"spin",
]

http3-webtransport = [
Expand Down Expand Up @@ -106,37 +105,25 @@ tracing = [
"dep:tracing",
]

tower-async = [
"dep:tower-async-service",
]

tower = [
"dep:tower-service",
]

axum = [
"dep:axum-core",
"dep:tower-service",
"tower",
]

boxed-service = [
"async-trait",
http3-default = [
"http3",
"quic-quinn",
"tls-rustls",
]

full = [
default = [
"http1",
"http2",
# "http3-webtransport",
"quic-quinn",
"tls-rustls",
"tls-rustls-pem",
"tracing",
"tower",
"tower-async",
"axum",
"boxed-service",
]

default = [
"full",
]
30 changes: 24 additions & 6 deletions crates/http/examples/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

#[cfg(feature = "axum")]
use axum_core::body::Body;
use bytes::Bytes;
use http::{HeaderMap, Response};
#[cfg(feature = "http3-webtransport")]
use scuffle_h3_webtransport::session::WebTransportSession;
#[cfg(feature = "quic-quinn")]
use scuffle_http::backend::quic::quinn::{QuinnServer, QuinnServerConfig};
#[cfg(feature = "http3-webtransport")]
use scuffle_http::backend::quic::web_transport::upgrade_webtransport;
use scuffle_http::backend::tcp::config::Http1Builder;
use scuffle_http::backend::tcp::{TcpServer, TcpServerConfig};
use scuffle_http::backend::HttpServer;
use scuffle_http::body::BoxError;
use scuffle_http::svc::function_service;

const CERT: &[u8] = include_bytes!("./fullchain.pem");
Expand All @@ -33,6 +34,7 @@ async fn serve_webtransport(web_transport: WebTransportSession<h3_quinn::Connect
async fn main() {
tracing_subscriber::fmt().init();

#[cfg(feature = "quic-quinn")]
let quic_server = QuinnServer::new(
QuinnServerConfig::builder()
.with_bind(SocketAddr::from(([0, 0, 0, 0], 4433)))
Expand All @@ -49,6 +51,7 @@ async fn main() {
.build(),
);

#[cfg(feature = "_tcp")]
let tcp_server = TcpServer::new(
TcpServerConfig::builder()
.with_bind(SocketAddr::from(([0, 0, 0, 0], 4433)))
Expand All @@ -59,6 +62,7 @@ async fn main() {
.build(),
);

#[cfg(feature = "quic-quinn")]
quic_server
.start(
function_service(|req| async move {
Expand All @@ -71,7 +75,7 @@ async fn main() {

drop(req);

Ok::<_, BoxError>(Response::builder().body(Body::from("hi quic")).unwrap())
Ok::<_, scuffle_http::Error>(Response::builder().body(Body::from("hi quic")).unwrap())
}),
1,
)
Expand Down Expand Up @@ -107,11 +111,12 @@ async fn main() {
}
}

#[cfg(feature = "_tcp")]
tcp_server
.start(
function_service(|req| async move {
tracing::info!("new request!: {}", req.uri().path());
Ok::<_, BoxError>(
Ok::<_, scuffle_http::Error>(
Response::builder()
.header(http::header::ALT_SVC, "h3=\":4433\"; ma=3600")
.header(http::header::TRAILER, "hii")
Expand All @@ -132,10 +137,23 @@ async fn main() {
.await
.unwrap();

tracing::info!("Server started on {:?}", quic_server.local_addr().unwrap());
#[cfg(feature = "quic-quinn")]
tracing::info!("Server started on quic {:?}", quic_server.local_addr().unwrap());
#[cfg(feature = "_tcp")]
tracing::info!("Server started on tcp {:?}", tcp_server.local_addr().unwrap());

#[cfg(feature = "quic-quinn")]
let quic_server_wait = quic_server.wait();
#[cfg(feature = "_tcp")]
let tcp_server_wait = tcp_server.wait();

#[cfg(not(feature = "quic-quinn"))]
let quic_server_wait = std::future::pending::<()>();
#[cfg(not(feature = "_tcp"))]
let tcp_server_wait = std::future::pending::<()>();

tokio::select! {
_ = quic_server.wait() => println!("quic server closed"),
_ = tcp_server.wait() => println!("tcp server closed"),
_ = quic_server_wait => println!("quic server closed"),
_ = tcp_server_wait => println!("tcp server closed"),
}
}
Loading

0 comments on commit ab389dc

Please sign in to comment.