From 8ec1e42754f76f69f0334cafcc10c784be5eb5c5 Mon Sep 17 00:00:00 2001 From: Wei <15172118655@163.com> Date: Thu, 11 Jan 2024 20:06:28 +0800 Subject: [PATCH] feat: read data from write cache (#3128) * feat: read from write cache * chore: add read ranges test * fix: use get instead of contains_key * chore: clippy * chore: cr comment Co-authored-by: Yingwen * fix: with_label_values --------- Co-authored-by: Yingwen --- src/mito2/src/cache/file_cache.rs | 66 +++++++++++ src/mito2/src/cache/write_cache.rs | 8 ++ src/mito2/src/sst/parquet.rs | 2 +- src/mito2/src/sst/parquet/helper.rs | 85 ++++++++++++++ src/mito2/src/sst/parquet/row_group.rs | 151 +++++++++---------------- 5 files changed, 212 insertions(+), 100 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 94a8518dd1c0..35dc5ef59525 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -14,9 +14,11 @@ //! A cache for files. +use std::ops::{Range, RangeBounds}; use std::sync::Arc; use std::time::Instant; +use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::{info, warn}; use futures::{FutureExt, TryStreamExt}; @@ -31,6 +33,7 @@ use crate::cache::FILE_TYPE; use crate::error::{OpenDalSnafu, Result}; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; +use crate::sst::parquet::helper::fetch_byte_ranges; /// Subdirectory of cached files. const FILE_DIR: &str = "files/"; @@ -129,6 +132,39 @@ impl FileCache { None } + /// Reads ranges from the cache. + pub(crate) async fn read_ranges( + &self, + key: IndexKey, + ranges: &[Range], + ) -> Option> { + if self.memory_index.get(&key).await.is_none() { + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + return None; + } + + let file_path = self.cache_file_path(key); + // In most cases, it will use blocking read, + // because FileCache is normally based on local file system, which supports blocking read. + let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await; + match bytes_result { + Ok(bytes) => { + CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + Some(bytes) + } + Err(e) => { + if e.kind() != ErrorKind::NotFound { + warn!("Failed to get file for key {:?}, err: {}", key, e); + } + + // We removes the file from the index. + self.memory_index.remove(&key).await; + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + None + } + } + } + /// Removes a file from the cache explicitly. pub(crate) async fn remove(&self, key: IndexKey) { let file_path = self.cache_file_path(key); @@ -372,6 +408,36 @@ mod tests { } } + #[tokio::test] + async fn test_file_cache_read_ranges() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = file_cache.cache_file_path(key); + // Write a file. + let data = b"hello greptime database"; + local_store + .write(&file_path, data.as_slice()) + .await + .unwrap(); + // Add to the cache. + file_cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + // Ranges + let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64]; + let bytes = file_cache.read_ranges(key, &ranges).await.unwrap(); + + assert_eq!(4, bytes.len()); + assert_eq!(b"hello", bytes[0].as_ref()); + assert_eq!(b"grep", bytes[1].as_ref()); + assert_eq!(b"data", bytes[2].as_ref()); + assert_eq!(data, bytes[3].as_ref()); + } + #[test] fn test_cache_file_path() { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index f0cf58de9140..c450ef6b735c 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -14,9 +14,11 @@ //! A write-through cache for remote object stores. +use std::ops::Range; use std::sync::Arc; use api::v1::region; +use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; use object_store::manager::ObjectStoreManagerRef; @@ -24,6 +26,7 @@ use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use super::file_cache::IndexKey; use crate::access_layer::new_fs_object_store; use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue}; use crate::error::{self, Result}; @@ -150,6 +153,11 @@ impl WriteCache { Ok(Some(sst_info)) } + + /// Returns the file cache of the write cache. + pub(crate) fn file_cache(&self) -> FileCacheRef { + self.file_cache.clone() + } } /// Request to write and upload a SST. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a6e78ca94835..6c9ec3e9802f 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -15,7 +15,7 @@ //! SST in parquet format. mod format; -mod helper; +pub(crate) mod helper; mod page_reader; pub mod reader; pub mod row_group; diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index 6e059bd963e5..34196df7c002 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; use std::sync::Arc; +use bytes::Bytes; +use object_store::{ErrorKind, ObjectStore}; use parquet::basic::ColumnOrder; use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; use parquet::format; @@ -84,3 +87,85 @@ fn parse_column_orders( None => None, } } + +/// Fetches data from object store. +/// If the object store supports blocking, use sequence blocking read. +/// Otherwise, use concurrent read. +pub async fn fetch_byte_ranges( + file_path: &str, + object_store: ObjectStore, + ranges: &[Range], +) -> object_store::Result> { + if object_store.info().full_capability().blocking { + fetch_ranges_seq(file_path, object_store, ranges).await + } else { + fetch_ranges_concurrent(file_path, object_store, ranges).await + } +} + +/// Fetches data from object store sequentially +async fn fetch_ranges_seq( + file_path: &str, + object_store: ObjectStore, + ranges: &[Range], +) -> object_store::Result> { + let block_object_store = object_store.blocking(); + let file_path = file_path.to_string(); + let ranges = ranges.to_vec(); + + let f = move || -> object_store::Result> { + ranges + .into_iter() + .map(|range| { + let data = block_object_store + .read_with(&file_path) + .range(range.start..range.end) + .call()?; + Ok::<_, object_store::Error>(Bytes::from(data)) + }) + .collect::>>() + }; + + maybe_spawn_blocking(f).await +} + +/// Fetches data from object store concurrently. +async fn fetch_ranges_concurrent( + file_path: &str, + object_store: ObjectStore, + ranges: &[Range], +) -> object_store::Result> { + // TODO(QuenKar): may merge small ranges to a bigger range to optimize. + let mut handles = Vec::with_capacity(ranges.len()); + for range in ranges { + let future_read = object_store.read_with(file_path); + handles.push(async move { + let data = future_read.range(range.start..range.end).await?; + Ok::<_, object_store::Error>(Bytes::from(data)) + }); + } + let results = futures::future::try_join_all(handles).await?; + Ok(results) +} + +// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 +/// Takes a function and spawns it to a tokio blocking pool if available +async fn maybe_spawn_blocking(f: F) -> object_store::Result +where + F: FnOnce() -> object_store::Result + Send + 'static, + T: Send + 'static, +{ + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime + .spawn_blocking(f) + .await + .map_err(new_task_join_error)?, + Err(_) => f(), + } +} + +// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23 +/// Parse tokio error into opendal::Error. +fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error { + object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) +} diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index b24413e43f69..64bd4d4cd590 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -29,7 +29,10 @@ use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; use store_api::storage::RegionId; +use super::helper::fetch_byte_ranges; +use crate::cache::file_cache::IndexKey; use crate::cache::{CacheManagerRef, PageKey, PageValue}; +use crate::metrics::READ_STAGE_ELAPSED; use crate::sst::file::FileId; use crate::sst::parquet::page_reader::CachedPageReader; @@ -100,7 +103,7 @@ impl<'a> InMemoryRowGroup<'a> { // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges: Vec<_> = self + let fetch_ranges = self .column_chunks .iter() .zip(self.metadata.columns()) @@ -115,21 +118,25 @@ impl<'a> InMemoryRowGroup<'a> { let (start, _len) = chunk_meta.byte_range(); match page_locations[idx].first() { Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); + ranges.push(start..first.offset as u64); } _ => (), } - ranges.extend(selection.scan_ranges(&page_locations[idx])); - page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges.extend( + selection + .scan_ranges(&page_locations[idx]) + .iter() + .map(|range| range.start as u64..range.end as u64), + ); + page_start_offsets + .push(ranges.iter().map(|range| range.start as usize).collect()); ranges }) - .collect(); - let mut chunk_data = - fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) - .await? - .into_iter(); + .collect::>(); + + let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); @@ -154,7 +161,7 @@ impl<'a> InMemoryRowGroup<'a> { // Now we only use cache in dense chunk data. self.fetch_pages_from_cache(projection); - let fetch_ranges: Vec<_> = self + let fetch_ranges = self .column_chunks .iter() .zip(&self.column_cached_pages) @@ -166,19 +173,16 @@ impl<'a> InMemoryRowGroup<'a> { .map(|(idx, (_chunk, _cached_pages))| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + start..(start + length) }) - .collect(); + .collect::>(); if fetch_ranges.is_empty() { // Nothing to fetch. return Ok(()); } - let mut chunk_data = - fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) - .await? - .into_iter(); + let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -221,6 +225,38 @@ impl<'a> InMemoryRowGroup<'a> { }); } + /// Try to fetch data from WriteCache, + /// if not in WriteCache, fetch data from object store directly. + async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { + let key = (self.region_id, self.file_id); + match self.fetch_ranges_from_write_cache(key, ranges).await { + Some(data) => Ok(data), + None => { + // Fetch data from object store. + let _timer = READ_STAGE_ELAPSED + .with_label_values(&["cache_miss_read"]) + .start_timer(); + let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok(data) + } + } + } + + /// Fetches data from write cache. + /// Returns `None` if the data is not in the cache. + async fn fetch_ranges_from_write_cache( + &self, + key: IndexKey, + ranges: &[Range], + ) -> Option> { + if let Some(cache) = self.cache_manager.as_ref()?.write_cache() { + return cache.file_cache().read_ranges(key, ranges).await; + } + None + } + /// Creates a page reader to read column at `i`. fn column_page_reader(&self, i: usize) -> Result> { if let Some(cached_pages) = &self.column_cached_pages[i] { @@ -349,86 +385,3 @@ impl Iterator for ColumnChunkIterator { } impl PageIterator for ColumnChunkIterator {} - -/// Fetches data from object store. -/// If the object store supports blocking, use sequence blocking read. -/// Otherwise, use concurrent read. -async fn fetch_byte_ranges( - file_path: &str, - object_store: ObjectStore, - ranges: Vec>, -) -> Result> { - let ranges: Vec<_> = ranges - .iter() - .map(|range| range.start as u64..range.end as u64) - .collect(); - if object_store.info().full_capability().blocking { - fetch_ranges_seq(file_path, object_store, ranges).await - } else { - fetch_ranges_concurrent(file_path, object_store, ranges).await - } -} - -/// Fetches data from object store sequentially -async fn fetch_ranges_seq( - file_path: &str, - object_store: ObjectStore, - ranges: Vec>, -) -> Result> { - let block_object_store = object_store.blocking(); - let file_path = file_path.to_string(); - - let f = move || -> Result> { - ranges - .into_iter() - .map(|range| { - let data = block_object_store - .read_with(&file_path) - .range(range) - .call() - .map_err(|e| ParquetError::External(Box::new(e)))?; - Ok::<_, ParquetError>(Bytes::from(data)) - }) - .collect::>>() - }; - - maybe_spawn_blocking(f).await -} - -/// Fetches data from object store concurrently. -async fn fetch_ranges_concurrent( - file_path: &str, - object_store: ObjectStore, - ranges: Vec>, -) -> Result> { - // TODO(QuenKar): may merge small ranges to a bigger range to optimize. - let mut handles = Vec::with_capacity(ranges.len()); - for range in ranges { - let future_read = object_store.read_with(file_path); - handles.push(async move { - let data = future_read - .range(range.start..range.end) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?; - Ok::<_, ParquetError>(Bytes::from(data)) - }); - } - let results = futures::future::try_join_all(handles).await?; - Ok(results) -} - -// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 -/// Takes a function and spawns it to a tokio blocking pool if available -pub async fn maybe_spawn_blocking(f: F) -> Result -where - F: FnOnce() -> Result + Send + 'static, - T: Send + 'static, -{ - match tokio::runtime::Handle::try_current() { - Ok(runtime) => runtime - .spawn_blocking(f) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?, - Err(_) => f(), - } -}