Skip to content

Commit

Permalink
allow physical region alter region options
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Nov 26, 2024
1 parent a6571d3 commit 5faf2f4
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 13 deletions.
29 changes: 27 additions & 2 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};

use crate::error::{
ColumnTypeMismatchSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, Result,
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result,
};
use crate::metrics::MITO_DDL_DURATION;
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::utils;

const MAX_RETRIES: usize = 5;
Expand Down Expand Up @@ -186,6 +187,30 @@ impl DataRegion {
.context(MitoReadOperationSnafu)?;
Ok(metadata.column_metadatas.clone())
}

pub async fn alter_region_options(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))
.await
.context(MitoWriteOperationSnafu)
.map(|result| result.affected_rows)
}
_ => {
info!("Metric region received alter request {request:?} on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();

ForbiddenPhysicalAlterSnafu.fail()
}
}
}
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ use crate::utils;
/// | Read | ✅ | ✅ |
/// | Close | ✅ | ✅ |
/// | Open | ✅ | ✅ |
/// | Alter | ✅ | |
/// | Alter | ✅ | ❓* |
///
/// *: Physical region can be dropped only when all related logical regions are dropped.
/// *: Alter: Physical regions only support altering region options.
///
/// ## Internal Columns
///
Expand Down
31 changes: 21 additions & 10 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@

use std::collections::HashMap;

use common_telemetry::{error, info};
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{
ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::metrics::FORBIDDEN_OPERATION_COUNT;
use crate::error::{LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu};
use crate::utils::{to_data_region_id, to_metadata_region_id};

impl MetricEngineInner {
Expand Down Expand Up @@ -150,20 +147,22 @@ impl MetricEngineInner {
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
info!("Metric region received alter request {request:?} on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();

ForbiddenPhysicalAlterSnafu.fail()
self.data_region
.alter_region_options(region_id, request)
.await?;
Ok(())
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::AddColumn;
use store_api::region_request::{AddColumn, SetRegionOption};

use super::*;
use crate::test_util::TestEnv;
Expand Down Expand Up @@ -204,6 +203,18 @@ mod test {
"Alter request to physical region is forbidden".to_string()
);

// alter physical region's option should work
let alter_region_option_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::TTL(Duration::from_secs(500))],
},
};
let result = engine_inner
.alter_physical_region(physical_region_id, alter_region_option_request.clone())
.await;
assert!(result.is_ok());

// alter logical region
let metadata_region = env.metadata_region();
let logical_region_id = env.default_logical_region_id();
Expand Down
75 changes: 75 additions & 0 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -573,6 +574,80 @@ async fn test_alter_column_fulltext_options() {
check_region_version(&engine, region_id, 1, 3, 1, 3);
}

#[tokio::test]
async fn test_alter_region_ttl_options() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let engine_cloned = engine.clone();
let alter_ttl_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::TTL(Duration::from_secs(500))],
},
};
let alter_job = tokio::spawn(async move {
engine_cloned
.handle_request(region_id, RegionRequest::Alter(alter_ttl_request))
.await
.unwrap();
});

alter_job.await.unwrap();

let check_ttl = |engine: &MitoEngine, expected: &Duration| {
let current_ttl = engine
.get_region(region_id)
.unwrap()
.version()
.options
.ttl
.unwrap();
assert_eq!(*expected, current_ttl);
};
// Verify the ttl.
check_ttl(&engine, &Duration::from_secs(500));

// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Verify the ttl again after engine restart.
// check_ttl(&engine, &Duration::from_secs(500));
}

#[tokio::test]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
Expand Down
51 changes: 51 additions & 0 deletions tests/cases/standalone/common/alter/alter_table_options.result
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,54 @@ DROP TABLE ato;

Affected Rows: 0

CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");

Affected Rows: 0

ALTER TABLE phy set ttl='2years';

Affected Rows: 0

SHOW CREATE TABLE phy;

+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '', |
| | ttl = '2years' |
| | ) |
+-------+------------------------------------+

ALTER TABLE phy UNSET 'ttl';

Affected Rows: 0

SHOW CREATE TABLE phy;

+-------+------------------------------------+
| Table | Create Table |
+-------+------------------------------------+
| phy | CREATE TABLE IF NOT EXISTS "phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------+------------------------------------+

DROP TABLE phy;

Affected Rows: 0

12 changes: 12 additions & 0 deletions tests/cases/standalone/common/alter/alter_table_options.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,15 @@ SHOW CREATE TABLE ato;
SHOW CREATE TABLE ato;

DROP TABLE ato;

CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "");

ALTER TABLE phy set ttl='2years';

SHOW CREATE TABLE phy;

ALTER TABLE phy UNSET 'ttl';

SHOW CREATE TABLE phy;

DROP TABLE phy;

0 comments on commit 5faf2f4

Please sign in to comment.