Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(puffin): apply range reader #4928

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
paste = "1.0"
pin-project.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
zeroize = { version = "1.6", default-features = false, features = ["alloc"] }

[dev-dependencies]
Expand Down
176 changes: 160 additions & 16 deletions src/common/base/src/range_read.rs
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::io;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use bytes::{BufMut, Bytes};
use futures::{AsyncReadExt, AsyncSeekExt};
use futures::AsyncRead;
use pin_project::pin_project;
use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
use tokio::sync::Mutex;

/// `Metadata` contains the metadata of a source.
pub struct Metadata {
Expand Down Expand Up @@ -61,7 +69,7 @@ pub trait RangeReader: Send + Unpin {
}

#[async_trait]
impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
impl<R: ?Sized + RangeReader> RangeReader for &mut R {
async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
Expand All @@ -80,26 +88,162 @@ impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
}
}

/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`.
pub struct RangeReaderAdapter<R>(pub R);
/// `AsyncReadAdapter` adapts a `RangeReader` to an `AsyncRead`.
#[pin_project]
pub struct AsyncReadAdapter<R> {
/// The inner `RangeReader`.
/// Use `Mutex` to get rid of the borrow checker issue.
inner: Arc<Mutex<R>>,

/// The current position from the view of the reader.
position: u64,

/// The buffer for the read bytes.
buffer: Vec<u8>,

/// The length of the content.
content_length: u64,

/// The future for reading the next bytes.
#[pin]
read_fut: Option<Pin<Box<dyn Future<Output = io::Result<Bytes>> + Send>>>,
}

impl<R: RangeReader + 'static> AsyncReadAdapter<R> {
pub async fn new(inner: R) -> io::Result<Self> {
let mut inner = inner;
let metadata = inner.metadata().await?;
Ok(AsyncReadAdapter {
inner: Arc::new(Mutex::new(inner)),
position: 0,
buffer: Vec::new(),
content_length: metadata.content_length,
read_fut: None,
})
}
}

impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut this = self.as_mut().project();

if *this.position >= *this.content_length {
return Poll::Ready(Ok(0));
}

if !this.buffer.is_empty() {
let to_read = this.buffer.len().min(buf.len());
buf[..to_read].copy_from_slice(&this.buffer[..to_read]);
this.buffer.drain(..to_read);
*this.position += to_read as u64;
return Poll::Ready(Ok(to_read));
}

if this.read_fut.is_none() {
let range = *this.position..*this.content_length;
let inner = this.inner.clone();
let fut = async move {
let mut inner = inner.lock().await;
inner.read(range).await
};
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

*this.read_fut = Some(Box::pin(fut));
}

match this
.read_fut
.as_mut()
.as_pin_mut()
.expect("checked above")
.poll(cx)
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(bytes)) => {
*this.read_fut = None;

if !bytes.is_empty() {
this.buffer.extend_from_slice(&bytes);
self.poll_read(cx, buf)
} else {
Poll::Ready(Ok(0))
}
}
Poll::Ready(Err(e)) => {
*this.read_fut = None;
Poll::Ready(Err(e))
}
}
}
}

/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader
for RangeReaderAdapter<R>
{
impl RangeReader for Vec<u8> {
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.0.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
Ok(Metadata {
content_length: self.len() as u64,
})
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
range.end = range.end.min(self.len() as u64);

let mut buf = vec![0; (range.end - range.start) as usize];
self.0.seek(io::SeekFrom::Start(range.start)).await?;
self.0.read_exact(&mut buf).await?;
buf.copy_from_slice(&self[range.start as usize..range.end as usize]);
Ok(Bytes::from(buf))
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// `FileReader` is a `RangeReader` for reading a file.
pub struct FileReader {
content_length: u64,
position: u64,
file: tokio::fs::File,
}

impl FileReader {
/// Creates a new `FileReader` for the file at the given path.
pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
let file = tokio::fs::File::open(path).await?;
let metadata = file.metadata().await?;
Ok(FileReader {
content_length: metadata.len(),
position: 0,
file,
})
}
}

#[async_trait]
impl RangeReader for FileReader {
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,
})
}

async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
if range.start != self.position {
self.file.seek(io::SeekFrom::Start(range.start)).await?;
self.position = range.start;
}

range.end = range.end.min(self.content_length);
if range.end <= self.position {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Start of range is out of bounds",
));
}

let mut buf = vec![0; (range.end - range.start) as usize];

self.file.read_exact(&mut buf).await?;
self.position = range.end;

Ok(Bytes::from(buf))
}
}
11 changes: 3 additions & 8 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
#[cfg(test)]
mod tests {
use common_base::bit_vec::prelude::*;
use common_base::range_read::RangeReaderAdapter;
use fst::MapBuilder;
use futures::io::Cursor;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;

Expand Down Expand Up @@ -163,8 +161,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_metadata() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
assert_eq!(metas.metas.len(), 2);
Expand All @@ -191,8 +188,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_fst() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down Expand Up @@ -224,8 +220,7 @@ mod tests {
#[tokio::test]
async fn test_inverted_index_blob_reader_bitmap() {
let blob = create_inverted_index_blob();
let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut blob_reader = InvertedIndexBlobReader::new(cursor);
let mut blob_reader = InvertedIndexBlobReader::new(blob);

let metas = blob_reader.metadata().await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
Expand Down
15 changes: 5 additions & 10 deletions src/index/src/inverted_index/format/reader/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ impl<R: RangeReader> InvertedIndeFooterReader<R> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use prost::Message;

use super::*;
Expand All @@ -141,10 +139,9 @@ mod tests {
..Default::default()
};

let payload_buf = create_test_payload(meta);
let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let metas = reader.read_payload(payload_size).await.unwrap();
Expand All @@ -164,8 +161,7 @@ mod tests {
let mut payload_buf = create_test_payload(meta);
payload_buf.push(0xff); // Add an extra byte to corrupt the footer
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size_result = reader.read_payload_size().await;
assert!(payload_size_result.is_err());
Expand All @@ -180,10 +176,9 @@ mod tests {
..Default::default()
};

let payload_buf = create_test_payload(meta);
let mut payload_buf = create_test_payload(meta);
let blob_size = payload_buf.len() as u64;
let cursor = RangeReaderAdapter(Cursor::new(payload_buf));
let mut reader = InvertedIndeFooterReader::new(cursor, blob_size);
let mut reader = InvertedIndeFooterReader::new(&mut payload_buf, blob_size);

let payload_size = reader.read_payload_size().await.unwrap();
let payload_result = reader.read_payload(payload_size).await;
Expand Down
8 changes: 2 additions & 6 deletions src/index/src/inverted_index/format/writer/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {

#[cfg(test)]
mod tests {
use common_base::range_read::RangeReaderAdapter;
use futures::io::Cursor;
use futures::stream;

use super::*;
Expand All @@ -120,8 +118,7 @@ mod tests {
.await
.unwrap();

let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let mut reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
Expand Down Expand Up @@ -161,8 +158,7 @@ mod tests {
.await
.unwrap();

let cursor = RangeReaderAdapter(Cursor::new(blob));
let mut reader = InvertedIndexBlobReader::new(cursor);
let mut reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
Expand Down
2 changes: 0 additions & 2 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub mod builder;

use std::sync::Arc;

use common_base::range_read::RangeReaderAdapter;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
Expand Down Expand Up @@ -109,7 +108,6 @@ impl InvertedIndexApplier {
self.remote_blob_reader(file_id).await?
}
};
let blob = RangeReaderAdapter(blob);

if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
Expand Down
Loading