Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(deps): upgrade opendal to 0.46 #4037

Merged
merged 20 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 214 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 24 additions & 14 deletions src/common/datasource/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,34 +92,44 @@ impl CompressionType {
macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! {
use bytes::{Buf, BufMut, BytesMut};

impl CompressionType {
pub async fn encode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.as_ref().len());
let mut buffer = Vec::with_capacity(content.remaining());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all(content.as_ref()).await?;
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}

pub async fn decode(&self, content: impl AsRef<[u8]>) -> io::Result<Vec<u8>> {
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.as_ref().len() * 2);
let mut buffer = Vec::with_capacity(content.remaining() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all(content.as_ref()).await?;
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => Ok(content.as_ref().to_vec()),
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}

Expand Down Expand Up @@ -151,27 +161,27 @@ macro_rules! impl_compression_type {
$(
#[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes().to_vec();
let string = "foo_bar".as_bytes();
let compress = CompressionType::$enum_item
.encode(&string)
.encode(string)
.await
.unwrap();
let decompress = CompressionType::$enum_item
.decode(&compress)
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);
})*

#[tokio::test]
async fn test_uncompression() {
let string = "foo_bar".as_bytes().to_vec();
let string = "foo_bar".as_bytes();
let compress = CompressionType::Uncompressed
.encode(&string)
.encode(string)
.await
.unwrap();
let decompress = CompressionType::Uncompressed
.decode(&compress)
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);
Expand Down
5 changes: 4 additions & 1 deletion src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use self::csv::CsvFormat;
use self::json::JsonFormat;
Expand Down Expand Up @@ -146,7 +147,8 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);

let mut upstream = compression_type.convert_stream(reader).fuse();

Expand Down Expand Up @@ -203,6 +205,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});

Expand Down
9 changes: 8 additions & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -164,10 +165,16 @@ impl FileOpener for CsvOpener {
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
9 changes: 8 additions & 1 deletion src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::io::SyncIoBridge;

use super::stream_to_file;
Expand Down Expand Up @@ -82,10 +83,16 @@ impl Default for JsonFormat {
#[async_trait]
impl FileFormat for JsonFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let decoded = self.compression_type.convert_async_read(reader);

Expand Down
71 changes: 57 additions & 14 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,83 @@ use std::sync::Arc;

use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::{StreamExt, TryStreamExt};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};

use crate::error::{self, Result};
use crate::file_format::FileFormat;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
#[derive(Clone)]
pub struct ReaderAdapter {
reader: object_store::Reader,
len: u64,
}

impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}

impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}

fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}

pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}

pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}

#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;

let schema = infer_orc_schema(reader).await?;

let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
Ok(schema)
}
}
Expand Down Expand Up @@ -97,15 +132,23 @@ impl FileOpener for OrcOpener {
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let reader = object_store
.reader(meta.location().to_string().as_str())
let path = meta.location().to_string();

let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream_reader = new_orc_stream_reader(reader)
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream =
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);

Expand Down
29 changes: 22 additions & 7 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{ObjectStore, Reader, Writer};
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter};
use crate::error::{self, Result};
Expand All @@ -45,10 +46,16 @@ pub struct ParquetFormat {}
#[async_trait]
impl FileFormat for ParquetFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let meta = store
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;
let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let metadata = reader
.get_metadata()
Expand Down Expand Up @@ -98,7 +105,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {

pub struct LazyParquetFileReader {
object_store: ObjectStore,
reader: Option<Reader>,
reader: Option<Compat<FuturesAsyncReader>>,
path: String,
}

Expand All @@ -114,7 +121,13 @@ impl LazyParquetFileReader {
/// Must initialize the reader, or throw an error from the future.
async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> {
if self.reader.is_none() {
let reader = self.object_store.reader(&self.path).await?;
let meta = self.object_store.stat(&self.path).await?;
let reader = self
.object_store
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.compat();
self.reader = Some(reader);
}

Expand Down Expand Up @@ -167,23 +180,25 @@ pub struct BufferedWriter {
}

type InnerBufferedWriter = LazyBufferedWriter<
object_store::Writer,
Compat<object_store::FuturesAsyncWriter>,
ArrowWriter<SharedBuffer>,
impl Fn(String) -> BoxFuture<'static, Result<Writer>>,
impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>,
>;

impl BufferedWriter {
fn make_write_factory(
store: ObjectStore,
concurrency: usize,
) -> impl Fn(String) -> BoxFuture<'static, Result<Writer>> {
) -> impl Fn(String) -> BoxFuture<'static, Result<Compat<object_store::FuturesAsyncWriter>>>
{
move |path| {
let store = store.clone();
Box::pin(async move {
store
.writer_with(&path)
.concurrent(concurrency)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}

pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz

let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
assert_eq_lines(written.to_vec(), origin.to_vec());
}

// Ignore the CRLF difference across operating systems.
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl StateStore for ObjectStateStore {
))
})
.context(ListStateSnafu { path: key })?;
yield (key.into(), value);
yield (key.into(), value.to_vec());
}
}
});
Expand Down
Loading
Loading