Skip to content

Commit

Permalink
fix(index): S3 EntityTooSmall error (#3192)
Browse files Browse the repository at this point in the history
* fix(index): S3 `EntityTooSmall` error

Signed-off-by: Zhenchi <[email protected]>

* fix: config api

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jan 19, 2024
1 parent cde5a36 commit 2e4c48a
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 22 deletions.
3 changes: 3 additions & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl AccessLayer {
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
write_buffer_size: request.index_write_buffer_size,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
Expand Down Expand Up @@ -183,6 +184,8 @@ pub(crate) struct SstWriteRequest {
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
/// The size of write buffer for index.
pub(crate) index_write_buffer_size: Option<usize>,
}

/// Creates a fs object store with atomic write dir.
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl WriteCache {
let indexer = IndexerBuilder {
create_inverted_index: write_request.create_inverted_index,
mem_threshold_index_create: write_request.mem_threshold_index_create,
write_buffer_size: write_request.index_write_buffer_size,
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
Expand Down Expand Up @@ -281,6 +282,7 @@ mod tests {
storage: None,
create_inverted_index: true,
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: Default::default(),
};

Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ impl TwcsCompactionTask {
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);

let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
Expand All @@ -334,6 +340,7 @@ impl TwcsCompactionTask {
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
},
&write_opts,
)
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ pub struct InvertedIndexConfig {
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
/// Memory threshold for performing an external sort during index creation.
/// `None` means all sorting will happen in memory.
#[serde_as(as = "NoneAsEmptyString")]
Expand All @@ -266,6 +268,7 @@ impl Default for InvertedIndexConfig {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
write_buffer_size: ReadableSize::mb(8),
mem_threshold_on_create: Some(ReadableSize::mb(64)),
intermediate_path: String::new(),
}
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ impl RegionFlushTask {
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);

// Flush to level 0.
let write_request = SstWriteRequest {
Expand All @@ -331,6 +337,7 @@ impl RegionFlushTask {
storage: version.options.storage.clone(),
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
};
let Some(sst_info) = self
.access_layer
Expand Down
53 changes: 40 additions & 13 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,17 @@ impl Indexer {
pub async fn update(&mut self, batch: &Batch) {
if let Some(creator) = self.inner.as_mut() {
if let Err(err) = creator.update(batch).await {
warn!(
err; "Failed to update index, skip creating index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update index, skip creating index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}

// Skip index creation if error occurs.
self.inner = None;
Expand All @@ -76,10 +83,17 @@ impl Indexer {
return Some(byte_count);
}
Err(err) => {
warn!(
err; "Failed to create index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
}
Expand All @@ -91,10 +105,17 @@ impl Indexer {
pub async fn abort(&mut self) {
if let Some(mut creator) = self.inner.take() {
if let Err(err) = creator.abort().await {
warn!(
err; "Failed to abort index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to abort index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to abort index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
}
Expand All @@ -103,6 +124,7 @@ impl Indexer {
pub(crate) struct IndexerBuilder<'a> {
pub(crate) create_inverted_index: bool,
pub(crate) mem_threshold_index_create: Option<usize>,
pub(crate) write_buffer_size: Option<usize>,
pub(crate) file_id: FileId,
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
Expand Down Expand Up @@ -147,7 +169,8 @@ impl<'a> IndexerBuilder<'a> {
self.intermediate_manager,
self.mem_threshold_index_create,
row_group_size,
);
)
.with_buffer_size(self.write_buffer_size);

Indexer {
file_id: self.file_id,
Expand Down Expand Up @@ -236,6 +259,7 @@ mod tests {
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
Expand All @@ -254,6 +278,7 @@ mod tests {
let indexer = IndexerBuilder {
create_inverted_index: false,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
Expand All @@ -272,6 +297,7 @@ mod tests {
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
Expand All @@ -290,6 +316,7 @@ mod tests {
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
Expand Down
24 changes: 22 additions & 2 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ impl SstIndexCreator {
}
}

/// Sets the write buffer size of the store.
pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.store = self.store.with_write_buffer_size(write_buffer_size);
self
}

/// Updates index with a batch of rows.
/// Garbage will be cleaned up if failed to update.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
Expand All @@ -125,7 +131,14 @@ impl SstIndexCreator {
if let Err(update_err) = self.do_update(batch).await {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to clean up index creator, file_path: {}, err: {}",
self.file_path, err
);
} else {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
}
}
return Err(update_err);
}
Expand All @@ -146,7 +159,14 @@ impl SstIndexCreator {
let finish_res = self.do_finish().await;
// clean up garbage no matter finish successfully or not
if let Err(err) = self.do_cleanup().await {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to clean up index creator, file_path: {}, err: {}",
self.file_path, err
);
} else {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
}
}

finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/sst/index/intermediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl IntermediateManager {
Ok(Self { store })
}

/// Set the write buffer size for the store.
pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.store = self.store.with_write_buffer_size(write_buffer_size);
self
}

/// Returns the store to access to intermediate files.
pub(crate) fn store(&self) -> &InstrumentedStore {
&self.store
Expand Down
23 changes: 21 additions & 2 deletions src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,23 @@ use crate::error::{OpenDalSnafu, Result};
pub(crate) struct InstrumentedStore {
/// The underlying object store.
object_store: ObjectStore,
/// The size of the write buffer.
write_buffer_size: Option<usize>,
}

impl InstrumentedStore {
/// Create a new `InstrumentedStore`.
pub fn new(object_store: ObjectStore) -> Self {
Self { object_store }
Self {
object_store,
write_buffer_size: None,
}
}

/// Set the size of the write buffer.
pub fn with_write_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.write_buffer_size = write_buffer_size.filter(|&size| size > 0);
self
}

/// Returns an [`InstrumentedAsyncRead`] for the given path.
Expand Down Expand Up @@ -67,7 +78,15 @@ impl InstrumentedStore {
write_count: &'a IntCounter,
flush_count: &'a IntCounter,
) -> Result<InstrumentedAsyncWrite<'a, object_store::Writer>> {
let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?;
let writer = match self.write_buffer_size {
Some(size) => self
.object_store
.writer_with(path)
.buffer(size)
.await
.context(OpenDalSnafu)?,
None => self.object_store.writer(path).await.context(OpenDalSnafu)?,
};
Ok(InstrumentedAsyncWrite::new(
writer,
write_byte_count,
Expand Down
18 changes: 15 additions & 3 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,21 @@ impl ParquetReaderBuilder {
if self.file_handle.meta().inverted_index_available() {
match index_applier.apply(self.file_handle.file_id()).await {
Ok(row_groups) => row_group_ids = row_groups,
Err(err) => warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()),
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ impl WorkerGroup {
config.global_write_buffer_size.as_bytes() as usize,
));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?;
IntermediateManager::init_fs(&config.inverted_index.intermediate_path)
.await?
.with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let write_cache = write_cache_from_config(
&config,
Expand Down Expand Up @@ -233,7 +235,9 @@ impl WorkerGroup {
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?;
IntermediateManager::init_fs(&config.inverted_index.intermediate_path)
.await?
.with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
Expand Down
1 change: 1 addition & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ allow_stale_entries = false
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
write_buffer_size = "8MiB"
mem_threshold_on_create = "64.0MiB"
intermediate_path = ""
Expand Down

0 comments on commit 2e4c48a

Please sign in to comment.