Skip to content

Commit

Permalink
Update example-reqwest-response (#2430)
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardcl authored Dec 17, 2023
1 parent 584c328 commit 12676aa
Showing 1 changed file with 69 additions and 67 deletions.
136 changes: 69 additions & 67 deletions examples/reqwest-response/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,82 @@
//! cargo run -p example-reqwest-response
//! ```
fn main() {
// this examples requires reqwest to be updated to hyper and http 1.0
}

// use std::{convert::Infallible, time::Duration};
use std::{convert::Infallible, time::Duration};

// use axum::{
// body::{Body, Bytes},
// extract::State,
// response::{IntoResponse, Response},
// routing::get,
// Router,
// };
// use reqwest::{Client, StatusCode};
// use tokio_stream::StreamExt;
// use tower_http::trace::TraceLayer;
// use tracing::Span;
// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use axum::http::{HeaderMap, StatusCode};
use axum::{
body::{Body, Bytes},
extract::State,
http::{HeaderName, HeaderValue},
response::{IntoResponse, Response},
routing::get,
Router,
};
use reqwest::Client;
use tokio_stream::StreamExt;
use tower_http::trace::TraceLayer;
use tracing::Span;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

// #[tokio::main]
// async fn main() {
// tracing_subscriber::registry()
// .with(
// tracing_subscriber::EnvFilter::try_from_default_env()
// .unwrap_or_else(|_| "example_reqwest_response=debug,tower_http=debug".into()),
// )
// .with(tracing_subscriber::fmt::layer())
// .init();
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "example_reqwest_response=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

// let client = Client::new();
let client = Client::new();

// let app = Router::new()
// .route("/", get(proxy_via_reqwest))
// .route("/stream", get(stream_some_data))
// // Add some logging so we can see the streams going through
// .layer(TraceLayer::new_for_http().on_body_chunk(
// |chunk: &Bytes, _latency: Duration, _span: &Span| {
// tracing::debug!("streaming {} bytes", chunk.len());
// },
// ))
// .with_state(client);
let app = Router::new()
.route("/", get(proxy_via_reqwest))
.route("/stream", get(stream_some_data))
// Add some logging so we can see the streams going through
.layer(TraceLayer::new_for_http().on_body_chunk(
|chunk: &Bytes, _latency: Duration, _span: &Span| {
tracing::debug!("streaming {} bytes", chunk.len());
},
))
.with_state(client);

// let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
// .await
// .unwrap();
// tracing::debug!("listening on {}", listener.local_addr().unwrap());
// axum::serve(listener, app).await.unwrap();
// }
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
tracing::debug!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
}

// async fn proxy_via_reqwest(State(client): State<Client>) -> Response {
// let reqwest_response = match client.get("http://127.0.0.1:3000/stream").send().await {
// Ok(res) => res,
// Err(err) => {
// tracing::error!(%err, "request failed");
// return StatusCode::BAD_GATEWAY.into_response();
// }
// };
async fn proxy_via_reqwest(State(client): State<Client>) -> Response {
let reqwest_response = match client.get("http://127.0.0.1:3000/stream").send().await {
Ok(res) => res,
Err(err) => {
tracing::error!(%err, "request failed");
return (StatusCode::BAD_REQUEST, Body::empty()).into_response();
}
};

// let mut response_builder = Response::builder().status(reqwest_response.status());
let response_builder = Response::builder().status(reqwest_response.status().as_u16());

// // This unwrap is fine because we haven't insert any headers yet so there can't be any invalid
// // headers
// *response_builder.headers_mut().unwrap() = reqwest_response.headers().clone();
// here the mapping of headers is required due to reqwest and axum differ on http crate versions
let mut headers = HeaderMap::with_capacity(reqwest_response.headers().len());
headers.extend(reqwest_response.headers().into_iter().map(|(name, value)| {
let name = HeaderName::from_bytes(name.as_ref()).unwrap();
let value = HeaderValue::from_bytes(value.as_ref()).unwrap();
(name, value)
}));

// response_builder
// .body(Body::from_stream(reqwest_response.bytes_stream()))
// // Same goes for this unwrap
// .unwrap()
// }
response_builder
.body(Body::from_stream(reqwest_response.bytes_stream()))
// Same goes for this unwrap
.unwrap()
}

// async fn stream_some_data() -> Body {
// let stream = tokio_stream::iter(0..5)
// .throttle(Duration::from_secs(1))
// .map(|n| n.to_string())
// .map(Ok::<_, Infallible>);
// Body::from_stream(stream)
// }
async fn stream_some_data() -> Body {
let stream = tokio_stream::iter(0..5)
.throttle(Duration::from_secs(1))
.map(|n| n.to_string())
.map(Ok::<_, Infallible>);
Body::from_stream(stream)
}

0 comments on commit 12676aa

Please sign in to comment.