Skip to content

Commit

Permalink
test_orc_opener
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed May 25, 2024
1 parent ce41aa7 commit 2e17a5f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 35 deletions.
77 changes: 55 additions & 22 deletions src/common/datasource/src/file_format/orc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,67 @@ use std::sync::Arc;

use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::{StreamExt, TryStreamExt};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
use orc_rust::reader::AsyncChunkReader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::error::{self, Result};
use crate::file_format::FileFormat;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;

/// TODO: it's better to avoid AsyncRead + AsyncSeek, use range based read instead.
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
#[derive(Clone)]
pub struct ReaderAdapter {
reader: object_store::Reader,
len: u64,
}

impl ReaderAdapter {
pub fn new(reader: object_store::Reader, len: u64) -> Self {
Self { reader, len }
}
}

impl AsyncChunkReader for ReaderAdapter {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
async move { Ok(self.len) }.boxed()
}

fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
async move {
let bytes = self
.reader
.read(offset_from_start..offset_from_start + length)
.await?;
Ok(bytes.to_bytes())
}
.boxed()
}
}

pub async fn new_orc_stream_reader(
reader: ReaderAdapter,
) -> Result<ArrowStreamReader<ReaderAdapter>> {
let reader_build = ArrowReaderBuilder::try_new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
Ok(reader_build.build_async())
}

pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
let reader = new_orc_stream_reader(reader).await?;
Ok(reader.schema().as_ref().clone())
}
Expand All @@ -60,12 +91,8 @@ impl FileFormat for OrcFormat {
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();

let schema = infer_orc_schema(reader).await?;

.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
Ok(schema)
}
}
Expand Down Expand Up @@ -105,17 +132,23 @@ impl FileOpener for OrcOpener {
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let reader = object_store
.reader(meta.location().to_string().as_str())
let path = meta.location().to_string();

let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_futures_async_read(0..meta.object_meta.size as u64)
.compat();
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream_reader = new_orc_stream_reader(reader)
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let stream =
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);

Expand Down
25 changes: 12 additions & 13 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use client::{Output, OutputData, OutputMeta};
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader};
use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
Expand Down Expand Up @@ -172,14 +172,13 @@ impl StatementExecutor {
.stat(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?;

let reader = object_store
.reader(&path)
.await
.context(error::ReadObjectSnafu { path: &path })?
.into_futures_async_read(0..meta.content_length())
.compat();
.context(error::ReadObjectSnafu { path: &path })?;

let schema = infer_orc_schema(reader)
let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
.await
.context(error::ReadOrcSnafu)?;

Expand Down Expand Up @@ -322,19 +321,19 @@ impl StatementExecutor {
}
FileMetadata::Orc { path, .. } => {
let meta = object_store
.stat(path)
.stat(&path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path: &path })?;

let reader = object_store
.reader_with(path)
.chunk(DEFAULT_READ_BUFFER)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.compat();
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;
.context(error::ReadObjectSnafu { path })?;
let stream =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.context(error::ReadOrcSnafu)?;

let projected_schema = Arc::new(
compat_schema
Expand Down

0 comments on commit 2e17a5f

Please sign in to comment.