Skip to content

Commit

Permalink
feat: don't update manifest if manager is stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Apr 10, 2024
1 parent a0022dd commit 6d57942
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ pub enum Error {

#[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))]
ChecksumMismatch { actual: u32, expected: u32 },

#[snafu(display("Region {} is stopped", region_id))]
RegionStopped {
region_id: RegionId,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -684,6 +690,7 @@ impl ErrorExt for Error {
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::Internal,
}
}

Expand Down
15 changes: 13 additions & 2 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::metadata::RegionMetadataRef;

use crate::error::{self, Result};
use crate::error::{self, RegionStoppedSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
Expand Down Expand Up @@ -116,6 +116,7 @@ pub struct RegionManifestManager {
/// The last version included in checkpoint file.
last_checkpoint_version: ManifestVersion,
manifest: Arc<RegionManifest>,
stopped: bool,
}

impl RegionManifestManager {
Expand Down Expand Up @@ -160,6 +161,7 @@ impl RegionManifestManager {
last_version: version,
last_checkpoint_version: MIN_VERSION,
manifest: Arc::new(manifest),
stopped: false,
})
}

Expand Down Expand Up @@ -250,11 +252,13 @@ impl RegionManifestManager {
last_version: version,
last_checkpoint_version,
manifest: Arc::new(manifest),
stopped: false,
}))
}

/// Stops the manager.
pub async fn stop(&mut self) -> Result<()> {
self.stopped = true;
Ok(())
}

Expand All @@ -264,6 +268,13 @@ impl RegionManifestManager {
.with_label_values(&["update"])
.start_timer();

ensure!(
!self.stopped,
RegionStoppedSnafu {
region_id: self.manifest.metadata.region_id,
}
);

let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;

Expand Down
9 changes: 3 additions & 6 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async fn manager_with_checkpoint_distance_1() {
async fn test_corrupted_data_causing_checksum_error() {
// Initialize manager
common_telemetry::init_default_ut_logging();
let (_env, manager) = build_manager(1, CompressionType::Uncompressed).await;
let (_env, mut manager) = build_manager(1, CompressionType::Uncompressed).await;

// Apply actions
for _ in 0..10 {
Expand All @@ -171,7 +171,6 @@ async fn test_corrupted_data_causing_checksum_error() {
// Check if there is a checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
Expand All @@ -180,22 +179,20 @@ async fn test_corrupted_data_causing_checksum_error() {
// Corrupt the last checkpoint data
let mut corrupted_bytes = manager
.store()
.await
.read_file(&manager.store().await.last_checkpoint_path())
.read_file(&manager.store().last_checkpoint_path())
.await
.unwrap();
corrupted_bytes[0] ^= 1;

// Overwrite the latest checkpoint data
manager
.store()
.await
.write_last_checkpoint(9, &corrupted_bytes)
.await
.unwrap();

// Attempt to load the corrupted checkpoint
let load_corrupted_result = manager.store().await.load_last_checkpoint().await;
let load_corrupted_result = manager.store().load_last_checkpoint().await;

// Check if the result is an error and if it's of type VerifyChecksum
assert_matches!(load_corrupted_result, Err(ChecksumMismatch { .. }));
Expand Down

0 comments on commit 6d57942

Please sign in to comment.