diff --git a/Cargo.lock b/Cargo.lock index 16bf0e911aad..62261bce1688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,7 @@ dependencies = [ "axum", "bytes", "cfg-if", - "http", + "http 0.2.12", "indexmap 1.9.3", "schemars", "serde", @@ -764,9 +764,9 @@ dependencies = [ "bytes", "futures-util", "headers", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -794,8 +794,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -1852,7 +1852,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-version", - "hyper", + "hyper 0.14.28", "reqwest", "serde", "tempfile", @@ -1961,7 +1961,7 @@ dependencies = [ "futures-util", "hex", "humantime-serde", - "hyper", + "hyper 0.14.28", "itertools 0.10.5", "lazy_static", "moka", @@ -3523,15 +3523,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "endian-type" version = "0.1.2" @@ -3602,7 +3593,7 @@ name = "etcd-client" version = "0.12.4" source = "git+https://github.com/MichaelScofield/etcd-client.git?rev=4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b#4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b" dependencies = [ - "http", + "http 0.2.12", "prost 0.12.4", "tokio", "tokio-stream", @@ -4217,7 +4208,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -4301,7 +4292,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha1", @@ -4313,7 +4304,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] @@ -4416,6 +4407,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -4423,7 +4425,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -4580,8 +4605,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -4593,18 +4618,40 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" -version = "0.24.2" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http", - "hyper", - "rustls 0.21.12", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.4", + "rustls-pki-types", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", + "tower-service", ] [[package]] @@ -4613,12 +4660,32 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2 0.5.7", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -5602,7 +5669,7 @@ dependencies = [ "etcd-client", "futures", "h2", - "http-body", + "http-body 0.4.6", "humantime", "humantime-serde", "itertools 0.10.5", @@ -6364,7 +6431,6 @@ name = "object-store" version = "0.8.0" dependencies = [ "anyhow", - "async-trait", "bytes", "common-telemetry", "common-test-util", @@ -6413,20 +6479,21 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.45.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c17c077f23fa2d2c25d9d22af98baa43b8bbe2ef0de80cf66339aa70401467" +checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544" dependencies = [ "anyhow", "async-trait", "backon", - "base64 0.21.7", + "base64 0.22.1", "bytes", "chrono", + "crc32c", "flagset", "futures", "getrandom", - "http", + "http 1.1.0", "log", "md-5", "once_cell", @@ -6514,7 +6581,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", - "http", + "http 0.2.12", "opentelemetry 0.21.0", "opentelemetry-proto 0.4.0", "opentelemetry-semantic-conventions", @@ -6651,6 +6718,7 @@ dependencies = [ "substrait 0.8.0", "table", "tokio", + "tokio-util", "tonic 0.11.0", ] @@ -8196,20 +8264,20 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.9" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" +checksum = "01edce6b6c31a16ebc7525ac58c747a6d78bbce33e76bbebd350d6bc25b23e06" dependencies = [ "anyhow", "async-trait", - "base64 0.21.7", + "base64 0.22.1", "chrono", "form_urlencoded", "getrandom", "hex", "hmac", "home", - "http", + "http 1.1.0", "jsonwebtoken", "log", "once_cell", @@ -8218,7 +8286,7 @@ dependencies = [ "rand", "reqwest", "rsa 0.9.6", - "rust-ini 0.20.0", + "rust-ini 0.21.0", "serde", "serde_json", "sha1", @@ -8227,20 +8295,20 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.27" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", "hyper-rustls", + "hyper-util", "ipnet", "js-sys", "log", @@ -8249,16 +8317,16 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.12", + "rustls 0.22.4", "rustls-native-certs", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls 0.25.0", "tokio-util", "tower-service", "url", @@ -8266,7 +8334,8 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "winreg 0.50.0", + "webpki-roots 0.26.1", + "winreg 0.52.0", ] [[package]] @@ -8532,12 +8601,13 @@ dependencies = [ [[package]] name = "rust-ini" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a" +checksum = "0d625ed57d8f49af6cfa514c42e1a71fadcff60eb0b1c517ff82fe41aa025b41" dependencies = [ "cfg-if", "ordered-multimap 0.7.3", + "trim-in-place", ] [[package]] @@ -8679,12 +8749,13 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "schannel", "security-framework", ] @@ -9517,10 +9588,10 @@ dependencies = [ "hashbrown 0.14.5", "headers", "hostname", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "humantime-serde", - "hyper", + "hyper 0.14.28", "influxdb_line_protocol", "itertools 0.10.5", "lazy_static", @@ -11148,9 +11219,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", @@ -11176,9 +11247,9 @@ dependencies = [ "bytes", "flate2", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", @@ -11280,8 +11351,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "http-range-header", "httpdate", "iri-string", @@ -11524,6 +11595,12 @@ dependencies = [ "tree-sitter", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "triomphe" version = "0.1.11" @@ -12189,6 +12266,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -12555,9 +12641,9 @@ dependencies = [ [[package]] name = "winreg" -version = "0.50.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" dependencies = [ "cfg-if", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index a5b1f5aed9cc..8b02b3f0ce96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,7 +146,7 @@ raft-engine = { version = "0.4.1", default-features = false } rand = "0.8" regex = "1.8" regex-automata = { version = "0.4" } -reqwest = { version = "0.11", default-features = false, features = [ +reqwest = { version = "0.12", default-features = false, features = [ "json", "rustls-tls-native-roots", "stream", diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index 77b40f8e9c62..0d7484917751 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -92,34 +92,44 @@ impl CompressionType { macro_rules! impl_compression_type { ($(($enum_item:ident, $prefix:ident)),*) => { paste::item! { + use bytes::{Buf, BufMut, BytesMut}; + impl CompressionType { - pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result> { + pub async fn encode(&self, mut content: B) -> io::Result> { match self { $( CompressionType::$enum_item => { - let mut buffer = Vec::with_capacity(content.as_ref().len()); + let mut buffer = Vec::with_capacity(content.remaining()); let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer); - encoder.write_all(content.as_ref()).await?; + encoder.write_all_buf(&mut content).await?; encoder.shutdown().await?; Ok(buffer) } )* - CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), + CompressionType::Uncompressed => { + let mut bs = BytesMut::with_capacity(content.remaining()); + bs.put(content); + Ok(bs.to_vec()) + }, } } - pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result> { + pub async fn decode(&self, mut content: B) -> io::Result> { match self { $( CompressionType::$enum_item => { - let mut buffer = Vec::with_capacity(content.as_ref().len() * 2); + let mut buffer = Vec::with_capacity(content.remaining() * 2); let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer); - encoder.write_all(content.as_ref()).await?; + encoder.write_all_buf(&mut content).await?; encoder.shutdown().await?; Ok(buffer) } )* - CompressionType::Uncompressed => Ok(content.as_ref().to_vec()), + CompressionType::Uncompressed => { + let mut bs = BytesMut::with_capacity(content.remaining()); + bs.put(content); + Ok(bs.to_vec()) + }, } } @@ -151,13 +161,13 @@ macro_rules! impl_compression_type { $( #[tokio::test] async fn []() { - let string = "foo_bar".as_bytes().to_vec(); + let string = "foo_bar".as_bytes(); let compress = CompressionType::$enum_item - .encode(&string) + .encode(string) .await .unwrap(); let decompress = CompressionType::$enum_item - .decode(&compress) + .decode(compress.as_slice()) .await .unwrap(); assert_eq!(decompress, string); @@ -165,13 +175,13 @@ macro_rules! impl_compression_type { #[tokio::test] async fn test_uncompression() { - let string = "foo_bar".as_bytes().to_vec(); + let string = "foo_bar".as_bytes(); let compress = CompressionType::Uncompressed - .encode(&string) + .encode(string) .await .unwrap(); let decompress = CompressionType::Uncompressed - .decode(&compress) + .decode(compress.as_slice()) .await .unwrap(); assert_eq!(decompress, string); diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 6f80590d26e1..5bb9258ad3d0 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -36,6 +36,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncWriteCompatExt; use self::csv::CsvFormat; use self::json::JsonFormat; @@ -146,7 +147,8 @@ pub fn open_with_decoder DataFusionResult>( let reader = object_store .reader(&path) .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| DataFusionError::External(Box::new(e)))? + .into_bytes_stream(..); let mut upstream = compression_type.convert_stream(reader).fuse(); @@ -203,6 +205,7 @@ pub async fn stream_to_file T>( .writer_with(&path) .concurrent(concurrency) .await + .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 4cf2b9e1336d..ade4e5409e42 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use derive_builder::Builder; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -164,10 +165,16 @@ impl FileOpener for CsvOpener { #[async_trait] impl FileFormat for CsvFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 77fde3fddb74..97057f836200 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -31,6 +31,7 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::SendableRecordBatchStream; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -82,10 +83,16 @@ impl Default for JsonFormat { #[async_trait] impl FileFormat for JsonFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 23e0589c99e9..4a6001f3e7c6 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -16,15 +16,17 @@ use std::sync::Arc; use arrow_schema::{ArrowError, Schema, SchemaRef}; use async_trait::async_trait; +use bytes::Bytes; use common_recordbatch::adapter::RecordBatchStreamTypeAdapter; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion::error::{DataFusionError, Result as DfResult}; -use futures::{StreamExt, TryStreamExt}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt, TryStreamExt}; use object_store::ObjectStore; use orc_rust::arrow_reader::ArrowReaderBuilder; use orc_rust::async_arrow_reader::ArrowStreamReader; +use orc_rust::reader::AsyncChunkReader; use snafu::ResultExt; -use tokio::io::{AsyncRead, AsyncSeek}; use crate::error::{self, Result}; use crate::file_format::FileFormat; @@ -32,18 +34,49 @@ use crate::file_format::FileFormat; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct OrcFormat; -pub async fn new_orc_stream_reader( - reader: R, -) -> Result> { +#[derive(Clone)] +pub struct ReaderAdapter { + reader: object_store::Reader, + len: u64, +} + +impl ReaderAdapter { + pub fn new(reader: object_store::Reader, len: u64) -> Self { + Self { reader, len } + } +} + +impl AsyncChunkReader for ReaderAdapter { + fn len(&mut self) -> BoxFuture<'_, std::io::Result> { + async move { Ok(self.len) }.boxed() + } + + fn get_bytes( + &mut self, + offset_from_start: u64, + length: u64, + ) -> BoxFuture<'_, std::io::Result> { + async move { + let bytes = self + .reader + .read(offset_from_start..offset_from_start + length) + .await?; + Ok(bytes.to_bytes()) + } + .boxed() + } +} + +pub async fn new_orc_stream_reader( + reader: ReaderAdapter, +) -> Result> { let reader_build = ArrowReaderBuilder::try_new_async(reader) .await .context(error::OrcReaderSnafu)?; Ok(reader_build.build_async()) } -pub async fn infer_orc_schema( - reader: R, -) -> Result { +pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result { let reader = new_orc_stream_reader(reader).await?; Ok(reader.schema().as_ref().clone()) } @@ -51,13 +84,15 @@ pub async fn infer_orc_schema #[async_trait] impl FileFormat for OrcFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await .context(error::ReadObjectSnafu { path })?; - - let schema = infer_orc_schema(reader).await?; - + let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?; Ok(schema) } } @@ -97,15 +132,23 @@ impl FileOpener for OrcOpener { }; let projection = self.projection.clone(); Ok(Box::pin(async move { - let reader = object_store - .reader(meta.location().to_string().as_str()) + let path = meta.location().to_string(); + + let meta = object_store + .stat(&path) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let stream_reader = new_orc_stream_reader(reader) + let reader = object_store + .reader(&path) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + let stream_reader = + new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length())) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let stream = RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection); diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 651d5904c874..2e887ac2f7c3 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -29,10 +29,11 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; use futures::future::BoxFuture; use futures::StreamExt; -use object_store::{ObjectStore, Reader, Writer}; +use object_store::{FuturesAsyncReader, ObjectStore}; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use snafu::ResultExt; +use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter}; use crate::error::{self, Result}; @@ -45,10 +46,16 @@ pub struct ParquetFormat {} #[async_trait] impl FileFormat for ParquetFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let mut reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let metadata = reader .get_metadata() @@ -98,7 +105,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { pub struct LazyParquetFileReader { object_store: ObjectStore, - reader: Option, + reader: Option>, path: String, } @@ -114,7 +121,13 @@ impl LazyParquetFileReader { /// Must initialize the reader, or throw an error from the future. async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { if self.reader.is_none() { - let reader = self.object_store.reader(&self.path).await?; + let meta = self.object_store.stat(&self.path).await?; + let reader = self + .object_store + .reader(&self.path) + .await? + .into_futures_async_read(0..meta.content_length()) + .compat(); self.reader = Some(reader); } @@ -167,16 +180,17 @@ pub struct BufferedWriter { } type InnerBufferedWriter = LazyBufferedWriter< - object_store::Writer, + Compat, ArrowWriter, - impl Fn(String) -> BoxFuture<'static, Result>, + impl Fn(String) -> BoxFuture<'static, Result>>, >; impl BufferedWriter { fn make_write_factory( store: ObjectStore, concurrency: usize, - ) -> impl Fn(String) -> BoxFuture<'static, Result> { + ) -> impl Fn(String) -> BoxFuture<'static, Result>> + { move |path| { let store = store.clone(); Box::pin(async move { @@ -184,6 +198,7 @@ impl BufferedWriter { .writer_with(&path) .concurrent(concurrency) .await + .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }) } diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 8f1af59c90e7..d3a24a23d24a 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written, origin); + assert_eq_lines(written.to_vec(), origin.to_vec()); } pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written, origin); + assert_eq_lines(written.to_vec(), origin.to_vec()); } // Ignore the CRLF difference across operating systems. diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index f02ccc793772..096ef84b125d 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore { )) }) .context(ListStateSnafu { path: key })?; - yield (key.into(), value); + yield (key.into(), value.to_vec()); } } }); diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8b778306c466..8b64511598a8 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,7 +20,6 @@ mod gcs; mod oss; mod s3; -use std::sync::Arc; use std::time::Duration; use std::{env, path}; @@ -29,7 +28,7 @@ use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::{HttpClient, ObjectStore}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -107,13 +106,13 @@ async fn create_object_store_with_cache( if let Some(path) = cache_path { let atomic_temp_dir = join_dir(path, ".tmp/"); clean_temp_dir(&atomic_temp_dir)?; - let cache_store = Fs::default() - .root(path) - .atomic_write_dir(&atomic_temp_dir) - .build() - .context(error::InitBackendSnafu)?; + let mut builder = Fs::default(); + builder.root(path).atomic_write_dir(&atomic_temp_dir); + let cache_store = ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish(); - let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) + let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; diff --git a/src/file-engine/src/manifest.rs b/src/file-engine/src/manifest.rs index d2e8255301b1..6310c3ccb912 100644 --- a/src/file-engine/src/manifest.rs +++ b/src/file-engine/src/manifest.rs @@ -71,7 +71,8 @@ impl FileRegionManifest { let bs = object_store .read(path) .await - .context(LoadRegionManifestSnafu { region_id })?; + .context(LoadRegionManifestSnafu { region_id })? + .to_vec(); Self::decode(bs.as_slice()) } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 0afbf5b6695e..931e5062693a 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -112,6 +112,10 @@ impl FileCache { self.memory_index.insert(key, value).await; } + pub(crate) async fn get(&self, key: IndexKey) -> Option { + self.memory_index.get(&key).await + } + /// Reads a file from the cache. pub(crate) async fn reader(&self, key: IndexKey) -> Option { // We must use `get()` to update the estimator of the cache. @@ -372,7 +376,6 @@ fn parse_index_key(name: &str) -> Option { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; - use futures::AsyncReadExt; use object_store::services::Fs; use super::*; @@ -451,10 +454,9 @@ mod tests { .await; // Read file content. - let mut reader = cache.reader(key).await.unwrap(); - let mut buf = String::new(); - reader.read_to_string(&mut buf).await.unwrap(); - assert_eq!("hello", buf); + let reader = cache.reader(key).await.unwrap(); + let buf = reader.read(..).await.unwrap().to_vec(); + assert_eq!("hello", String::from_utf8(buf).unwrap()); // Get weighted size. cache.memory_index.run_pending_tasks().await; @@ -549,10 +551,9 @@ mod tests { for (i, file_id) in file_ids.iter().enumerate() { let key = IndexKey::new(region_id, *file_id, file_type); - let mut reader = cache.reader(key).await.unwrap(); - let mut buf = String::new(); - reader.read_to_string(&mut buf).await.unwrap(); - assert_eq!(i.to_string(), buf); + let reader = cache.reader(key).await.unwrap(); + let buf = reader.read(..).await.unwrap().to_vec(); + assert_eq!(i.to_string(), String::from_utf8(buf).unwrap()); } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 23a84194695d..f7a5af339b48 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -19,6 +19,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; +use futures::AsyncWriteExt; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; @@ -175,19 +176,27 @@ impl WriteCache { }]) .start_timer(); + let cached_value = self + .file_cache + .local_store() + .stat(&cache_path) + .await + .context(error::OpenDalSnafu)?; let reader = self .file_cache .local_store() .reader(&cache_path) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .into_futures_async_read(0..cached_value.content_length()); let mut writer = remote_store .writer_with(upload_path) - .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .into_futures_async_write(); let bytes_written = futures::io::copy(reader, &mut writer) @@ -199,7 +208,11 @@ impl WriteCache { })?; // Must close to upload all data. - writer.close().await.context(error::OpenDalSnafu)?; + writer.close().await.context(error::UploadSnafu { + region_id, + file_id, + file_type, + })?; UPLOAD_BYTES_TOTAL.inc_by(bytes_written); @@ -315,7 +328,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(key)) .await .unwrap(); - assert_eq!(remote_data, cache_data); + assert_eq!(remote_data.to_vec(), cache_data.to_vec()); // Check write cache contains the index key let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); @@ -326,7 +339,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(index_key)) .await .unwrap(); - assert_eq!(remote_index_data, cache_index_data); + assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec()); } #[tokio::test] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index fc9467f65fbb..815afd9f4c6c 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -489,7 +489,7 @@ impl ManifestObjectStore { } }; - let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; + let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?; debug!( "Load checkpoint in path: {}, metadata: {:?}", @@ -501,7 +501,11 @@ impl ManifestObjectStore { #[cfg(test)] pub async fn read_file(&self, path: &str) -> Result> { - self.object_store.read(path).await.context(OpenDalSnafu) + self.object_store + .read(path) + .await + .context(OpenDalSnafu) + .map(|v| v.to_vec()) } #[cfg(test)] diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index f14251afb9d5..eb4e42cd47bf 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -121,9 +121,17 @@ impl SstIndexApplier { return Ok(None); }; + let Some(indexed_value) = file_cache + .get(IndexKey::new(self.region_id, file_id, FileType::Puffin)) + .await + else { + return Ok(None); + }; + Ok(file_cache .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await + .map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64)) .map(PuffinFileReader::new)) } @@ -190,7 +198,13 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + let mut puffin_writer = PuffinFileWriter::new( + object_store + .writer(&path) + .await + .unwrap() + .into_futures_async_write(), + ); puffin_writer .add_blob(Blob { blob_type: INDEX_BLOB_TYPE.to_string(), @@ -236,7 +250,13 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + let mut puffin_writer = PuffinFileWriter::new( + object_store + .writer(&path) + .await + .unwrap() + .into_futures_async_write(), + ); puffin_writer .add_blob(Blob { blob_type: "invalid_blob_type".to_string(), diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 23893cc861fe..9d26118366ad 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -26,6 +26,8 @@ use crate::error::{OpenDalSnafu, Result}; /// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring /// metrics such as bytes read, bytes written, and the number of seek operations. +/// +/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead. #[derive(Clone)] pub(crate) struct InstrumentedStore { /// The underlying object store. @@ -58,8 +60,14 @@ impl InstrumentedStore { read_byte_count: &'a IntCounter, read_count: &'a IntCounter, seek_count: &'a IntCounter, - ) -> Result> { - let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; + ) -> Result> { + let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?; + let reader = self + .object_store + .reader(path) + .await + .context(OpenDalSnafu)? + .into_futures_async_read(0..meta.content_length()); Ok(InstrumentedAsyncRead::new( reader, read_byte_count, @@ -77,15 +85,21 @@ impl InstrumentedStore { write_byte_count: &'a IntCounter, write_count: &'a IntCounter, flush_count: &'a IntCounter, - ) -> Result> { + ) -> Result> { let writer = match self.write_buffer_size { Some(size) => self .object_store .writer_with(path) - .buffer(size) + .chunk(size) + .await + .context(OpenDalSnafu)? + .into_futures_async_write(), + None => self + .object_store + .writer(path) .await - .context(OpenDalSnafu)?, - None => self.object_store.writer(path).await.context(OpenDalSnafu)?, + .context(OpenDalSnafu)? + .into_futures_async_write(), }; Ok(InstrumentedAsyncWrite::new( writer, diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index 34196df7c002..b3cc8f8279d3 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -121,7 +121,7 @@ async fn fetch_ranges_seq( .read_with(&file_path) .range(range.start..range.end) .call()?; - Ok::<_, object_store::Error>(Bytes::from(data)) + Ok::<_, object_store::Error>(data.to_bytes()) }) .collect::>>() }; @@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent( let future_read = object_store.read_with(file_path); handles.push(async move { let data = future_read.range(range.start..range.end).await?; - Ok::<_, object_store::Error>(Bytes::from(data)) + Ok::<_, object_store::Error>(data.to_bytes()) }); } let results = futures::future::try_join_all(handles).await?; @@ -164,7 +164,7 @@ where } } -// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23 +// https://github.com/apache/opendal/blob/v0.46.0/core/src/raw/tokio_util.rs#L21-L24 /// Parse tokio error into opendal::Error. fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error { object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index e0db7b40b889..3cf5a85cf893 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -85,7 +85,8 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(buffer_start..file_size) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .to_vec(); let buffer_len = buffer.len(); let mut footer = [0; 8]; @@ -129,7 +130,8 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(metadata_start..(file_size - FOOTER_SIZE as u64)) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .to_vec(); let metadata = decode_metadata(&data).map_err(|e| { error::InvalidParquetSnafu { diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 814d48bd1b01..c307e437c502 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,6 +16,7 @@ use std::time::Duration; +use bytes::Bytes; use common_telemetry::{error, info, warn}; use futures::TryStreamExt; use object_store::util::join_path; @@ -50,7 +51,7 @@ impl RegionWorkerLoop { region .access_layer .object_store() - .write(&marker_path, vec![]) + .write(&marker_path, Bytes::new()) .await .context(OpenDalSnafu) .inspect_err(|e| { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index da1291cad278..5f4c4c98ed8d 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -11,23 +11,21 @@ workspace = true services-memory = ["opendal/services-memory"] [dependencies] -async-trait = "0.1" bytes.workspace = true common-telemetry.workspace = true futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.45", features = [ +opendal = { version = "0.46", features = [ "layers-tracing", - "rustls", "services-azblob", "services-fs", "services-gcs", "services-http", "services-oss", "services-s3", -], default-features = false } +] } prometheus.workspace = true uuid.workspace = true @@ -35,5 +33,4 @@ uuid.workspace = true anyhow = "1.0" common-telemetry.workspace = true common-test-util.workspace = true -opendal = { version = "0.45", features = ["services-memory"] } tokio.workspace = true diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 70d20710cbbb..bcea36603ca6 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -14,27 +14,26 @@ use std::sync::Arc; -use async_trait::async_trait; -use opendal::raw::oio::Read; +use opendal::raw::oio::ReadDyn; use opendal::raw::{ - Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, + Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::Result; +use opendal::{Operator, Result}; mod read_cache; use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { +pub struct LruCacheLayer { // The read cache - read_cache: ReadCache, + read_cache: ReadCache, } -impl LruCacheLayer { +impl LruCacheLayer { /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Arc, capacity: usize) -> Result { + pub async fn new(file_cache: Operator, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); let (entries, bytes) = read_cache.recover_cache().await?; @@ -57,11 +56,11 @@ impl LruCacheLayer { } } -impl Layer for LruCacheLayer { - type LayeredAccessor = LruCacheAccessor; +impl Layer for LruCacheLayer { + type LayeredAccess = LruCacheAccess; - fn layer(&self, inner: I) -> Self::LayeredAccessor { - LruCacheAccessor { + fn layer(&self, inner: I) -> Self::LayeredAccess { + LruCacheAccess { inner, read_cache: self.read_cache.clone(), } @@ -69,15 +68,14 @@ impl Layer for LruCacheLayer { } #[derive(Debug)] -pub struct LruCacheAccessor { +pub struct LruCacheAccess { inner: I, - read_cache: ReadCache, + read_cache: ReadCache, } -#[async_trait] -impl LayeredAccessor for LruCacheAccessor { +impl LayeredAccess for LruCacheAccess { type Inner = I; - type Reader = Box; + type Reader = Arc; type BlockingReader = I::BlockingReader; type Writer = I::Writer; type BlockingWriter = I::BlockingWriter; diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index f37e4772c31c..81415b8039ca 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt}; -use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; -use opendal::{Error as OpendalError, ErrorKind, Result}; +use opendal::raw::oio::{Read, ReadDyn, Reader}; +use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead}; +use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -52,26 +52,22 @@ fn can_cache(path: &str) -> bool { } /// Generate an unique cache key for the read path and range. -fn read_cache_key(path: &str, args: &OpRead) -> String { - format!( - "{:x}.cache-{}", - md5::compute(path), - args.range().to_header() - ) +fn read_cache_key(path: &str, range: BytesRange) -> String { + format!("{:x}.cache-{}", md5::compute(path), range.to_header()) } /// Local read cache for files in object storage #[derive(Clone, Debug)] -pub(crate) struct ReadCache { +pub(crate) struct ReadCache { /// Local file cache backend - file_cache: Arc, + file_cache: Operator, /// Local memory cache to track local cache files mem_cache: Cache, } -impl ReadCache { +impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. - pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { + pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { let file_cache_cloned = file_cache.clone(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { @@ -83,7 +79,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; + let result = file_cache_cloned.delete(&read_key).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -133,17 +129,17 @@ impl ReadCache { /// Recover existing cache items from `file_cache` to `mem_cache`. /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { - let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; + let mut pager = self.file_cache.lister("/").await?; - while let Some(entry) = pager.next().await? { + while let Some(entry) = pager.next().await.transpose()? { let read_key = entry.path(); // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // because it's private field. let size = { - let stat = self.file_cache.stat(read_key, OpStat::default()).await?; + let stat = self.file_cache.stat(read_key).await?; - stat.into_metadata().content_length() + stat.content_length() }; OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); @@ -159,8 +155,7 @@ impl ReadCache { /// Returns true when the read cache contains the specific file. pub(crate) async fn contains_file(&self, path: &str) -> bool { self.mem_cache.run_pending_tasks().await; - self.mem_cache.contains_key(path) - && self.file_cache.stat(path, OpStat::default()).await.is_ok() + self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() } /// Read from a specific path using the OpRead operation. @@ -173,86 +168,54 @@ impl ReadCache { inner: &I, path: &str, args: OpRead, - ) -> Result<(RpRead, Box)> + ) -> Result<(RpRead, Arc)> where - I: Accessor, + I: Access, { if !can_cache(path) { return inner.read(path, args).await.map(to_output_reader); } - let read_key = read_cache_key(path, &args); - - let read_result = self - .mem_cache - .try_get_with( - read_key.clone(), - self.read_remote(inner, &read_key, path, args.clone()), - ) - .await - .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; - - match read_result { - ReadResult::Success(_) => { - // There is a concurrent issue here, the local cache may be purged - // while reading, we have to fallback to remote read - match self.file_cache.read(&read_key, OpRead::default()).await { - Ok(ret) => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["success"]) - .inc(); - Ok(to_output_reader(ret)) - } - Err(_) => { - OBJECT_STORE_LRU_CACHE_MISS.inc(); - inner.read(path, args).await.map(to_output_reader) - } - } - } - ReadResult::NotFound => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["not_found"]) - .inc(); - - Err(OpendalError::new( - ErrorKind::NotFound, - &format!("File not found: {path}"), - )) - } - } + // FIXME: remove this block after opendal v0.47 released. + let meta = inner.stat(path, OpStat::new()).await?; + let (rp, reader) = inner.read(path, args).await?; + let reader: ReadCacheReader = ReadCacheReader { + path: Arc::new(path.to_string()), + inner_reader: reader, + size: meta.into_metadata().content_length(), + file_cache: self.file_cache.clone(), + mem_cache: self.mem_cache.clone(), + }; + Ok((rp, Arc::new(reader))) } +} - async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result - where - I: Accessor, - { - let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; - let mut total = 0; - while let Some(bytes) = reader.next().await { - let bytes = &bytes?; - total += bytes.len(); - writer.write(bytes).await?; - } - // Call `close` to ensure data is written. - writer.close().await?; - Ok(total) - } +pub struct ReadCacheReader { + /// Path of the file + path: Arc, + /// Remote file reader. + inner_reader: I::Reader, + /// FIXME: remove this field after opendal v0.47 released. + /// + /// OpenDAL's read_at takes `offset, limit` which means the underlying storage + /// services could return less data than limit. We store size here as a workaround. + /// + /// This API has been refactor into `offset, size` instead. After opendal v0.47 released, + /// we don't need this anymore. + size: u64, + /// Local file cache backend + file_cache: Operator, + /// Local memory cache to track local cache files + mem_cache: Cache, +} - /// Read the file from remote storage. If success, write the content into local cache. - async fn read_remote( - &self, - inner: &I, - read_key: &str, - path: &str, - args: OpRead, - ) -> Result - where - I: Accessor, - { +impl ReadCacheReader { + /// TODO: we can return the Buffer directly to avoid another read from cache. + async fn read_remote(&self, offset: u64, limit: usize) -> Result { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let (_, reader) = inner.read(path, args).await?; - let result = self.try_write_cache::(reader, read_key).await; + let buf = self.inner_reader.read_at(offset, limit).await?; + let result = self.try_write_cache(buf, offset).await; match result { Ok(read_bytes) => { @@ -279,10 +242,59 @@ impl ReadCache { } } } + + async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result { + let size = buf.len(); + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); + self.file_cache.write(&read_key, buf).await?; + Ok(size) + } +} + +impl Read for ReadCacheReader { + async fn read_at(&self, offset: u64, limit: usize) -> Result { + let size = self.size.min(offset + limit as u64) - offset; + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); + + let read_result = self + .mem_cache + .try_get_with(read_key.clone(), self.read_remote(offset, limit)) + .await + .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fallback to remote read + match self.file_cache.read(&read_key).await { + Ok(ret) => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["success"]) + .inc(); + Ok(ret) + } + Err(_) => { + OBJECT_STORE_LRU_CACHE_MISS.inc(); + self.inner_reader.read_at(offset, limit).await + } + } + } + ReadResult::NotFound => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["not_found"]) + .inc(); + + Err(OpendalError::new( + ErrorKind::NotFound, + &format!("File not found: {}", self.path), + )) + } + } + } } fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { - (input.0, Box::new(input.1)) + (input.0, Arc::new(input.1)) } #[cfg(test)] diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 51f8689984f8..a609ce7203f5 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -15,16 +15,11 @@ //! code originally from , make a tiny change to avoid crash in multi thread env use std::fmt::{Debug, Formatter}; -use std::io; -use std::task::{Context, Poll}; -use async_trait::async_trait; -use bytes::Bytes; use common_telemetry::debug; -use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; -use opendal::ErrorKind; +use opendal::{Buffer, ErrorKind}; use prometheus::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, Histogram, HistogramTimer, HistogramVec, IntCounterVec, @@ -89,14 +84,14 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) { #[derive(Default, Debug, Clone)] pub struct PrometheusMetricsLayer; -impl Layer for PrometheusMetricsLayer { - type LayeredAccessor = PrometheusAccessor; +impl Layer for PrometheusMetricsLayer { + type LayeredAccess = PrometheusAccess; - fn layer(&self, inner: A) -> Self::LayeredAccessor { + fn layer(&self, inner: A) -> Self::LayeredAccess { let meta = inner.info(); let scheme = meta.scheme(); - PrometheusAccessor { + PrometheusAccess { inner, scheme: scheme.to_string(), } @@ -104,12 +99,12 @@ impl Layer for PrometheusMetricsLayer { } #[derive(Clone)] -pub struct PrometheusAccessor { +pub struct PrometheusAccess { inner: A, scheme: String, } -impl Debug for PrometheusAccessor { +impl Debug for PrometheusAccess { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrometheusAccessor") .field("inner", &self.inner) @@ -117,8 +112,7 @@ impl Debug for PrometheusAccessor { } } -#[async_trait] -impl LayeredAccessor for PrometheusAccessor { +impl LayeredAccess for PrometheusAccess { type Inner = A; type Reader = PrometheusMetricWrapper; type BlockingReader = PrometheusMetricWrapper; @@ -157,27 +151,20 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .start_timer(); - self.inner - .read(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]), - timer, - ), - ) - }) - }) - .await - .map_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - e - }) + let (rp, r) = self.inner.read(path, args).await.map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e + })?; + + Ok(( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Read.into_static()]), + timer, + ), + )) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -189,27 +176,20 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .start_timer(); - self.inner - .write(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]), - timer, - ), - ) - }) - }) - .await - .map_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - e - }) + let (rp, r) = self.inner.write(path, args).await.map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e + })?; + + Ok(( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL.with_label_values(&[&self.scheme, Operation::Write.into_static()]), + timer, + ), + )) } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -461,103 +441,46 @@ impl PrometheusMetricWrapper { } impl oio::Read for PrometheusMetricWrapper { - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - self.inner.poll_read(cx, buf).map(|res| match res { - Ok(bytes) => { - self.bytes += bytes as u64; - Ok(bytes) - } - Err(e) => { - increment_errors_total(self.op, e.kind()); - Err(e) - } - }) - } - - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { - self.inner.poll_seek(cx, pos).map(|res| match res { - Ok(n) => Ok(n), - Err(e) => { - increment_errors_total(self.op, e.kind()); - Err(e) - } - }) - } - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.inner.poll_next(cx).map(|res| match res { - Some(Ok(bytes)) => { - self.bytes += bytes.len() as u64; - Some(Ok(bytes)) - } - Some(Err(e)) => { - increment_errors_total(self.op, e.kind()); - Some(Err(e)) - } - None => None, + async fn read_at(&self, offset: u64, limit: usize) -> Result { + self.inner.read_at(offset, limit).await.map_err(|err| { + increment_errors_total(self.op, err.kind()); + err }) } } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner - .read(buf) - .map(|n| { - self.bytes += n as u64; - n - }) - .map_err(|e| { - increment_errors_total(self.op, e.kind()); - e - }) - } - - fn seek(&mut self, pos: io::SeekFrom) -> Result { - self.inner.seek(pos).map_err(|err| { + fn read_at(&self, offset: u64, limit: usize) -> opendal::Result { + self.inner.read_at(offset, limit).map_err(|err| { increment_errors_total(self.op, err.kind()); err }) } - - fn next(&mut self) -> Option> { - self.inner.next().map(|res| match res { - Ok(bytes) => { - self.bytes += bytes.len() as u64; - Ok(bytes) - } - Err(e) => { - increment_errors_total(self.op, e.kind()); - Err(e) - } - }) - } } -#[async_trait] impl oio::Write for PrometheusMetricWrapper { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { - self.inner - .poll_write(cx, bs) - .map_ok(|n| { + async fn write(&mut self, bs: Buffer) -> Result { + match self.inner.write(bs).await { + Ok(n) => { self.bytes += n as u64; - n - }) - .map_err(|err| { + Ok(n) + } + Err(err) => { increment_errors_total(self.op, err.kind()); - err - }) + Err(err) + } + } } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_abort(cx).map_err(|err| { + async fn close(&mut self) -> Result<()> { + self.inner.close().await.map_err(|err| { increment_errors_total(self.op, err.kind()); err }) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(|err| { + async fn abort(&mut self) -> Result<()> { + self.inner.close().await.map_err(|err| { increment_errors_total(self.op, err.kind()); err }) @@ -565,7 +488,7 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + fn write(&mut self, bs: Buffer) -> Result { self.inner .write(bs) .map(|n| { diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index a26a9bda64ac..ae561d4bbcd6 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -14,8 +14,9 @@ pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::{ - services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey, - Operator as ObjectStore, Reader, Result, Writer, + services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, + FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, + Result, Writer, }; pub mod layers; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index cfe0afe027db..a3d3800054c7 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,7 +22,6 @@ use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{ObjectStore, ObjectStoreBuilder}; -use opendal::raw::Accessor; use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; @@ -36,11 +35,11 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Read data from object; let bs = store.read(file_name).await?; - assert_eq!("Hello, World!", String::from_utf8(bs)?); + assert_eq!("Hello, World!", String::from_utf8(bs.to_vec())?); // Read range from object; let bs = store.read_with(file_name).range(1..=11).await?; - assert_eq!("ello, World", String::from_utf8(bs)?); + assert_eq!("ello, World", String::from_utf8(bs.to_vec())?); // Get object's Metadata let meta = store.stat(file_name).await?; @@ -77,7 +76,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { assert_eq!(p2, entries.first().unwrap().path()); let content = store.read(p2).await?; - assert_eq!("Hello, object2!", String::from_utf8(content)?); + assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?); store.delete(p2).await?; let entries = store.list("/").await?; @@ -236,11 +235,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let file_cache = Arc::new(builder.build().unwrap()); + let file_cache = Operator::new(builder).unwrap().finish(); - LruCacheLayer::new(Arc::new(file_cache.clone()), 32) - .await - .unwrap() + LruCacheLayer::new(file_cache, 32).await.unwrap() }; let store = store.layer(cache_layer.clone()); @@ -253,10 +250,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { Ok(()) } -async fn assert_lru_cache( - cache_layer: &LruCacheLayer, - file_names: &[&str], -) { +async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { assert!(cache_layer.contains_file(file_name).await); } @@ -278,7 +272,7 @@ async fn assert_cache_files( let bs = store.read(o.path()).await.unwrap(); assert_eq!( file_contents[position], - String::from_utf8(bs.clone())?, + String::from_utf8(bs.to_vec())?, "file content not match: {}", o.name() ); @@ -312,9 +306,7 @@ async fn test_object_store_cache_policy() -> Result<()> { let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); // create operator for cache dir to verify cache file - let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38) - .await - .unwrap(); + let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); let store = store.layer(cache_layer.clone()); // create several object handler. @@ -386,7 +378,7 @@ async fn test_object_store_cache_policy() -> Result<()> { // instead of returning `NotFound` during the reader creation. // The entry count is 4, because we have the p2 `NotFound` cache. assert!(store.read_with(p2).range(0..4).await.is_err()); - assert_eq!(cache_layer.read_cache_stat().await, (4, 35)); + assert_eq!(cache_layer.read_cache_stat().await, (3, 35)); assert_cache_files( &cache_store, @@ -414,7 +406,7 @@ async fn test_object_store_cache_policy() -> Result<()> { assert!(store.read(p2).await.is_err()); // Read p1 with range `1..` , the existing p1 with range `0..` must be evicted. let _ = store.read_with(p1).range(1..15).await.unwrap(); - assert_eq!(cache_layer.read_cache_stat().await, (4, 34)); + assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); assert_cache_files( &cache_store, &[ @@ -442,7 +434,7 @@ async fn test_object_store_cache_policy() -> Result<()> { drop(cache_layer); // Test recover - let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap(); // The p2 `NotFound` cache will not be recovered assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index f99ea1c63b5e..8681dca2978e 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -56,6 +56,7 @@ store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true +tokio-util.workspace = true tonic.workspace = true [dev-dependencies] diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 52880d700e29..94892f10a8c9 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta}; use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener}; -use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader}; +use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter}; use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; @@ -46,6 +46,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use table::requests::{CopyTableRequest, InsertRequest}; use table::table_reference::TableReference; +use tokio_util::compat::FuturesAsyncReadCompatExt; use crate::error::{self, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; @@ -146,10 +147,16 @@ impl StatementExecutor { path, }), Format::Parquet(_) => { + let meta = object_store + .stat(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; let mut reader = object_store .reader(&path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path: &path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) .await .context(error::ReadParquetMetadataSnafu)?; @@ -161,12 +168,17 @@ impl StatementExecutor { }) } Format::Orc(_) => { + let meta = object_store + .stat(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; + let reader = object_store .reader(&path) .await .context(error::ReadObjectSnafu { path: &path })?; - let schema = infer_orc_schema(reader) + let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())) .await .context(error::ReadOrcSnafu)?; @@ -279,11 +291,17 @@ impl StatementExecutor { ))) } FileMetadata::Parquet { metadata, path, .. } => { + let meta = object_store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = object_store .reader_with(path) - .buffer(DEFAULT_READ_BUFFER) + .chunk(DEFAULT_READ_BUFFER) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone()); let stream = builder @@ -302,14 +320,20 @@ impl StatementExecutor { ))) } FileMetadata::Orc { path, .. } => { + let meta = object_store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; + let reader = object_store .reader_with(path) - .buffer(DEFAULT_READ_BUFFER) + .chunk(DEFAULT_READ_BUFFER) .await .context(error::ReadObjectSnafu { path })?; - let stream = new_orc_stream_reader(reader) - .await - .context(error::ReadOrcSnafu)?; + let stream = + new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length())) + .await + .context(error::ReadOrcSnafu)?; let projected_schema = Arc::new( compat_schema diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 8932adf59d6e..3f595fc943ca 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -16,14 +16,12 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use axum::http::HeaderValue; use common_base::Plugins; use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter}; use common_telemetry::{error, info}; use common_time::Timestamp; -use hyper::HeaderMap; use prost::Message; -use reqwest::header::HeaderName; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use session::context::QueryContextBuilder; use snafu::{ensure, ResultExt}; @@ -115,7 +113,7 @@ impl ExportMetricsTask { } ); } - let mut headers = reqwest::header::HeaderMap::new(); + let mut headers = HeaderMap::new(); if let Some(remote_write) = &config.remote_write { ensure!( !remote_write.url.is_empty(), diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index 9cb2924ba689..de8284c40a00 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -14,10 +14,10 @@ use std::collections::HashMap; +use axum::headers::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::Json; use common_query::Output; -use reqwest::header::HeaderValue; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/src/servers/src/http/test_helpers.rs b/src/servers/src/http/test_helpers.rs index 11d416f97f15..0142fc5180fc 100644 --- a/src/servers/src/http/test_helpers.rs +++ b/src/servers/src/http/test_helpers.rs @@ -32,6 +32,7 @@ use std::convert::TryFrom; use std::net::{SocketAddr, TcpListener}; +use std::str::FromStr; use axum::body::HttpBody; use axum::BoxError; @@ -169,7 +170,15 @@ impl RequestBuilder { HeaderValue: TryFrom, >::Error: Into, { + // TODO(tisonkun): revert once http bump to 1.x + let key: HeaderName = key.try_into().map_err(Into::into).unwrap(); + let key = reqwest::header::HeaderName::from_bytes(key.as_ref()).unwrap(); + + let value: HeaderValue = value.try_into().map_err(Into::into).unwrap(); + let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).unwrap(); + self.builder = self.builder.header(key, value); + self } @@ -210,12 +219,19 @@ impl TestResponse { /// Get the response status. pub fn status(&self) -> StatusCode { - self.response.status() + StatusCode::from_u16(self.response.status().as_u16()).unwrap() } /// Get the response headers. - pub fn headers(&self) -> &http::HeaderMap { - self.response.headers() + pub fn headers(&self) -> http::HeaderMap { + // TODO(tisonkun): revert once http bump to 1.x + let mut headers = http::HeaderMap::new(); + for (key, value) in self.response.headers() { + let key = HeaderName::from_str(key.as_str()).unwrap(); + let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); + headers.insert(key, value); + } + headers } /// Get the response in chunks.