Skip to content

Commit

Permalink
add custom future type, ResponseFuture, for HTTP responses
Browse files Browse the repository at this point in the history
ResponseFuture allows us to simplify user experience while preserving support for lazy/raw responses
  • Loading branch information
analogrelay authored Oct 3, 2024
1 parent 048717f commit 185e065
Show file tree
Hide file tree
Showing 36 changed files with 740 additions and 1,059 deletions.
2 changes: 2 additions & 0 deletions eng/dict/rust-custom.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
bindgen
impl
impls
newtype
repr
rustc
Expand Down
4 changes: 2 additions & 2 deletions eng/test/mock_transport/src/mock_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl From<MockResponse> for Response {
fn from(mock_response: MockResponse) -> Self {
let bytes_stream: azure_core::BytesStream = mock_response.body.into();

Self::new(
Self::from_stream(
mock_response.status,
mock_response.headers,
Box::pin(bytes_stream),
Expand All @@ -46,7 +46,7 @@ impl MockResponse {
"an error occurred fetching the next part of the byte stream",
)?;

let response = Response::new(
let response = Response::from_stream(
status_code,
header_map.clone(),
Box::pin(BytesStream::new(response_bytes.clone())),
Expand Down
1 change: 0 additions & 1 deletion sdk/core/azure_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,4 @@ features = [
"reqwest_rustls",
"hmac_rust",
"hmac_openssl",
"xml",
]
12 changes: 11 additions & 1 deletion sdk/core/azure_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
#![deny(missing_debug_implementations, nonstandard_style)]
// #![warn(missing_docs, future_incompatible, unreachable_pub)]

// Docs.rs build is done with the nightly compiler, so we can enable nightly features in that build.
// In this case we enable two features:
// - `doc_auto_cfg`: Automatically scans `cfg` attributes and uses them to show those required configurations in the generated documentation.
// - `doc_cfg_hide`: Ignore the `doc` configuration for `doc_auto_cfg`.
// See https://doc.rust-lang.org/rustdoc/unstable-features.html#doc_auto_cfg-automatically-generate-doccfg for more details.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(docsrs, feature(doc_cfg_hide))]

#[macro_use]
mod macros;

Expand All @@ -38,7 +46,9 @@ pub use models::*;
pub use options::*;
pub use pipeline::*;
pub use policies::*;
pub use typespec_client_core::http::response::{Model, PinnedStream, Response, ResponseBody};
pub use typespec_client_core::http::{
LazyResponse, PinnedStream, Response, ResponseBody, ResponseFuture,
};

// Re-export typespec types that are not specific to Azure.
pub use typespec::{Error, Result};
Expand Down
12 changes: 4 additions & 8 deletions sdk/cosmos/azure_data_cosmos/examples/cosmos_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,12 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_client = client.database_client(&args.database);
if let Some(container_name) = args.container {
let container_client = db_client.container_client(container_name);
let response = container_client
.read(None)
.await?
.deserialize_body()
.await?;
println!("{:?}", response);
let response = container_client.read(None).await?;
println!("{:?}", response.into_body());
return Ok(());
} else {
let response = db_client.read(None).await?.deserialize_body().await?;
println!("{:?}", response);
let response = db_client.read(None).await?;
println!("{:?}", response.into_body());
}
Ok(())
}
Expand Down
32 changes: 9 additions & 23 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ pub trait ContainerClientMethods {
/// # async fn doc() {
/// # use azure_data_cosmos::clients::{ContainerClient, ContainerClientMethods};
/// # let container_client: ContainerClient = panic!("this is a non-running example");
/// let response = container_client.read(None)
/// .await.unwrap()
/// .deserialize_body()
/// .await.unwrap();
/// let response = container_client.read(None).await.unwrap();
/// # }
/// ```
#[allow(async_fn_in_trait)] // REASON: See https://github.com/Azure/azure-sdk-for-rust/issues/1796 for detailed justification
async fn read(
fn read(
&self,
options: Option<ReadContainerOptions>,
) -> azure_core::Result<azure_core::Response<ContainerProperties>>;
) -> azure_core::ResponseFuture<ContainerProperties>;

/// Executes a single-partition query against items in the container.
///
Expand Down Expand Up @@ -129,17 +126,16 @@ impl ContainerClient {
}

impl ContainerClientMethods for ContainerClient {
async fn read(
fn read(
&self,

#[allow(unused_variables)]
// This is a documented public API so prefixing with '_' is undesirable.
options: Option<ReadContainerOptions>,
) -> azure_core::Result<azure_core::Response<ContainerProperties>> {
let mut req = Request::new(self.container_url.clone(), azure_core::Method::Get);
) -> azure_core::ResponseFuture<ContainerProperties> {
let req = Request::new(self.container_url.clone(), azure_core::Method::Get);
self.pipeline
.send(Context::new(), &mut req, ResourceType::Containers)
.await
.send(Context::new(), req, ResourceType::Containers)
}

fn query_items<T: DeserializeOwned + Send>(
Expand All @@ -159,16 +155,6 @@ impl ContainerClientMethods for ContainerClient {
documents: Vec<M>,
}

// We have to manually implement Model, because the derive macro doesn't support auto-inferring type and lifetime bounds.
// See https://github.com/Azure/azure-sdk-for-rust/issues/1803
impl<M: DeserializeOwned> azure_core::Model for QueryResponseModel<M> {
async fn from_response_body(
body: azure_core::ResponseBody,
) -> typespec_client_core::Result<Self> {
body.json().await
}
}

let mut url = self.container_url.clone();
url.append_path_segments(["docs"]);
let mut base_req = Request::new(url, azure_core::Method::Post);
Expand Down Expand Up @@ -196,7 +182,7 @@ impl ContainerClientMethods for ContainerClient {
}

let resp = pipeline
.send(Context::new(), &mut req, ResourceType::Items)
.send(Context::new(), req, ResourceType::Items)
.await?;

let query_metrics = resp
Expand All @@ -208,7 +194,7 @@ impl ContainerClientMethods for ContainerClient {
let continuation_token =
resp.headers().get_optional_string(&constants::CONTINUATION);

let query_response: QueryResponseModel<T> = resp.deserialize_body().await?;
let query_response: QueryResponseModel<T> = resp.into_body();

let query_results = QueryResults {
items: query_response.documents,
Expand Down
18 changes: 7 additions & 11 deletions sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ pub trait DatabaseClientMethods {
/// # async fn doc() {
/// # use azure_data_cosmos::clients::{DatabaseClient, DatabaseClientMethods};
/// # let database_client: DatabaseClient = panic!("this is a non-running example");
/// let response = database_client.read(None)
/// .await.unwrap()
/// .deserialize_body()
/// .await.unwrap();
/// let response = database_client.read(None).await.unwrap();
/// # }
/// ```
#[allow(async_fn_in_trait)] // REASON: See https://github.com/Azure/azure-sdk-for-rust/issues/1796 for detailed justification
async fn read(
fn read(
&self,
options: Option<ReadDatabaseOptions>,
) -> azure_core::Result<azure_core::Response<DatabaseProperties>>;
) -> azure_core::ResponseFuture<DatabaseProperties>;

/// Gets a [`ContainerClient`] that can be used to access the collection with the specified name.
///
Expand Down Expand Up @@ -70,17 +67,16 @@ impl DatabaseClient {
}

impl DatabaseClientMethods for DatabaseClient {
async fn read(
fn read(
&self,

#[allow(unused_variables)]
// This is a documented public API so prefixing with '_' is undesirable.
options: Option<ReadDatabaseOptions>,
) -> azure_core::Result<azure_core::Response<DatabaseProperties>> {
let mut req = Request::new(self.database_url.clone(), azure_core::Method::Get);
) -> azure_core::ResponseFuture<DatabaseProperties> {
let req = Request::new(self.database_url.clone(), azure_core::Method::Get);
self.pipeline
.send(Context::new(), &mut req, ResourceType::Databases)
.await
.send(Context::new(), req, ResourceType::Databases)
}

fn container_client(&self, name: impl AsRef<str>) -> ContainerClient {
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/azure_data_cosmos/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use azure_core::{
date::{ComponentRange, OffsetDateTime},
Continuable, Model,
Continuable,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -72,7 +72,7 @@ pub struct SystemProperties {
/// Properties of a Cosmos DB database.
///
/// Returned by [`DatabaseClient::read()`](crate::clients::DatabaseClient::read()).
#[derive(Model, Debug, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct DatabaseProperties {
/// The ID of the database.
pub id: String,
Expand All @@ -85,7 +85,7 @@ pub struct DatabaseProperties {
/// Properties of a Cosmos DB container.
///
/// Returned by [`ContainerClient::read()`](crate::clients::ContainerClient::read()).
#[derive(Model, Debug, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ContainerProperties {
/// The ID of the container.
pub id: String,
Expand Down
14 changes: 8 additions & 6 deletions sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod authorization_policy;
use std::sync::Arc;

pub(crate) use authorization_policy::{AuthorizationPolicy, ResourceType};
use serde::de::DeserializeOwned;

/// Newtype that wraps an Azure Core pipeline to provide a Cosmos-specific pipeline which configures our authorization policy and enforces that a [`ResourceType`] is set on the context.
#[derive(Debug, Clone)]
Expand All @@ -25,13 +26,14 @@ impl CosmosPipeline {
))
}

pub async fn send<T>(
&self,
ctx: azure_core::Context<'_>,
request: &mut azure_core::Request,
pub fn send<'a, T: DeserializeOwned>(
&'a self,
ctx: azure_core::Context<'a>,
request: azure_core::Request,
resource_type: ResourceType,
) -> azure_core::Result<azure_core::Response<T>> {
) -> azure_core::ResponseFuture<'a, T> {
// We know all our APIs use JSON, so we can just create a wrapper that calls '.json' for us.
let ctx = ctx.with_value(resource_type);
self.0.send(&ctx, request).await
self.0.send(ctx, request).json()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use openssl::{
use serde::Deserialize;
use std::{str, sync::Arc, time::Duration};
use time::OffsetDateTime;
use typespec_client_core::http::Model;
use url::form_urlencoded;

/// Refresh time to use in seconds.
Expand Down Expand Up @@ -255,7 +254,7 @@ impl ClientCertificateCredential {
return Err(http_response_from_body(rsp_status, &rsp_body).into_error());
}

let response: AadTokenResponse = rsp.deserialize_body_into().await?;
let response: AadTokenResponse = rsp.into_body().json().await?;
Ok(AccessToken::new(
response.access_token,
OffsetDateTime::now_utc() + Duration::from_secs(response.expires_in),
Expand Down Expand Up @@ -326,7 +325,7 @@ impl ClientCertificateCredential {
}
}

#[derive(Model, Deserialize, Debug, Default)]
#[derive(Deserialize, Debug, Default)]
#[serde(default)]
struct AadTokenResponse {
token_type: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn authorize(
let rsp_status = rsp.status();
debug!("rsp_status == {:?}", rsp_status);
if rsp_status.is_success() {
rsp.deserialize_body_into().await
rsp.into_body().json().await
} else {
let rsp_body = rsp.into_body().collect().await?;
let text = std::str::from_utf8(&rsp_body)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use azure_core::credentials::Secret;
use serde::{Deserialize, Deserializer};
use time::OffsetDateTime;
use typespec_client_core::Model;

#[derive(Debug, Clone, Deserialize)]
struct RawLoginResponse {
Expand All @@ -19,7 +18,7 @@ struct RawLoginResponse {
access_token: String,
}

#[derive(Model, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct LoginResponse {
pub token_type: String,
pub expires_in: u64,
Expand Down
7 changes: 2 additions & 5 deletions sdk/identity/azure_identity/src/refresh_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use azure_core::{
use serde::Deserialize;
use std::fmt;
use std::sync::Arc;
use typespec_client_core::Model;
use url::form_urlencoded;

/// Exchange a refresh token for a new access token and refresh token.
Expand Down Expand Up @@ -54,9 +53,7 @@ pub async fn exchange(
let rsp_status = rsp.status();

if rsp_status.is_success() {
rsp.deserialize_body_into()
.await
.map_kind(ErrorKind::Credential)
rsp.into_body().json().await.map_kind(ErrorKind::Credential)
} else {
let rsp_body = rsp.into_body().collect().await?;
let token_error: RefreshTokenError =
Expand All @@ -67,7 +64,7 @@ pub async fn exchange(

/// A refresh token
#[allow(dead_code)]
#[derive(Model, Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct RefreshTokenResponse {
token_type: String,
#[serde(rename = "scope", deserialize_with = "deserialize::split")]
Expand Down
1 change: 0 additions & 1 deletion sdk/typespec/typespec_client_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ tokio = { workspace = true, features = ["macros", "rt", "time"] }
[dev-dependencies]
once_cell.workspace = true
tokio.workspace = true
typespec_derive.workspace = true

[features]
default = [
Expand Down
2 changes: 1 addition & 1 deletion sdk/typespec/typespec_client_core/src/error/http_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl HttpError {
/// Create an error from an HTTP response.
///
/// This does not check whether the response was successful and should only be used with unsuccessful responses.
pub async fn new(response: Response<()>) -> Self {
pub async fn new(response: Response) -> Self {
let status = response.status();
let headers: HashMap<String, String> = response
.headers()
Expand Down
9 changes: 6 additions & 3 deletions sdk/typespec/typespec_client_core/src/http/clients/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use crate::http::{
headers::{HeaderName, HeaderValue, Headers},
request::{Body, Request},
response::PinnedStream,
HttpClient, Method, Response, StatusCode,
HttpClient, Method, PinnedStream, Response, StatusCode,
};
use async_trait::async_trait;
use futures::TryStreamExt;
Expand Down Expand Up @@ -77,7 +76,11 @@ impl HttpClient for ::reqwest::Client {
)
}));

Ok(Response::new(try_from_status(status)?, headers, body))
Ok(Response::from_stream(
try_from_status(status)?,
headers,
body,
))
}
}

Expand Down
Loading

0 comments on commit 185e065

Please sign in to comment.