Skip to content

Commit

Permalink
Add [Body::into_data_stream]
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn committed Nov 20, 2023
1 parent 770ec7d commit 207f346
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 7 deletions.
45 changes: 43 additions & 2 deletions axum-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ impl Body {
stream: SyncWrapper::new(stream),
})
}

/// Convert the body into a [`Stream`] of data frames.
///
/// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
/// you need a [`Stream`] of all frame types.
///
/// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
pub fn into_data_stream(self) -> BodyDataStream {
BodyDataStream { inner: self }
}
}

impl Default for Body {
Expand Down Expand Up @@ -117,13 +127,21 @@ impl http_body::Body for Body {
}
}

impl Stream for Body {
/// A stream of data frames.
///
/// Created with [`Body::into_data_stream`].
#[derive(Debug)]
pub struct BodyDataStream {
inner: Body,
}

impl Stream for BodyDataStream {
type Item = Result<Bytes, Error>;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(self.as_mut().poll_frame(cx)?) {
match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
Expand All @@ -134,6 +152,29 @@ impl Stream for Body {
}
}

impl http_body::Body for BodyDataStream {
type Data = Bytes;
type Error = Error;

#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.inner).poll_frame(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}

pin_project! {
struct StreamBody<S> {
#[pin]
Expand Down
2 changes: 1 addition & 1 deletion axum-extra/src/extract/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
async fn from_request(req: Request<Body>, _state: &S) -> Result<Self, Self::Rejection> {
let boundary = parse_boundary(req.headers()).ok_or(InvalidBoundary)?;
let stream = req.with_limited_body().into_body();
let multipart = multer::Multipart::new(stream, boundary);
let multipart = multer::Multipart::new(stream.into_data_stream(), boundary);
Ok(Self { inner: multipart })
}
}
Expand Down
4 changes: 2 additions & 2 deletions axum-extra/src/json_lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ where
// `Stream::lines` isn't a thing so we have to convert it into an `AsyncRead`
// so we can call `AsyncRead::lines` and then convert it back to a `Stream`
let body = req.into_body();

let stream = TryStreamExt::map_err(body, |err| io::Error::new(io::ErrorKind::Other, err));
let stream = body.into_data_stream();
let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let read = StreamReader::new(stream);
let lines_stream = LinesStream::new(read.lines());

Expand Down
2 changes: 1 addition & 1 deletion axum/src/extract/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let boundary = parse_boundary(req.headers()).ok_or(InvalidBoundary)?;
let stream = req.with_limited_body().into_body();
let multipart = multer::Multipart::new(stream, boundary);
let multipart = multer::Multipart::new(stream.into_data_stream(), boundary);
Ok(Self { inner: multipart })
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/stream-to-file/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn save_request_body(
Path(file_name): Path<String>,
request: Request,
) -> Result<(), (StatusCode, String)> {
stream_to_file(&file_name, request.into_body()).await
stream_to_file(&file_name, request.into_body().into_data_stream()).await
}

// Handler that returns HTML for a multipart form.
Expand Down

0 comments on commit 207f346

Please sign in to comment.