diff --git a/Cargo.lock b/Cargo.lock index 0085294272e2..f677ee269d4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2446,6 +2446,8 @@ dependencies = [ "chrono-tz 0.8.6", "common-error", "common-macro", + "humantime", + "humantime-serde", "once_cell", "rand", "serde", @@ -6592,6 +6594,7 @@ dependencies = [ "aquamarine", "async-trait", "base64 0.21.7", + "common-base", "common-error", "common-macro", "common-query", @@ -7656,6 +7659,7 @@ dependencies = [ name = "operator" version = "0.11.0" dependencies = [ + "ahash 0.8.11", "api", "async-stream", "async-trait", diff --git a/src/common/meta/src/ddl/alter_database.rs b/src/common/meta/src/ddl/alter_database.rs index 5e992a7d4e81..68f0f5428e08 100644 --- a/src/common/meta/src/ddl/alter_database.rs +++ b/src/common/meta/src/ddl/alter_database.rs @@ -46,11 +46,7 @@ fn build_new_schema_value( for option in options.0.iter() { match option { SetDatabaseOption::Ttl(ttl) => { - if ttl.is_zero() { - value.ttl = None; - } else { - value.ttl = Some(*ttl); - } + value.ttl = Some(*ttl); } } } @@ -230,12 +226,12 @@ mod tests { #[test] fn test_build_new_schema_value() { let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![ - SetDatabaseOption::Ttl(Duration::from_secs(10)), + SetDatabaseOption::Ttl(Duration::from_secs(10).into()), ])); let current_schema_value = SchemaNameValue::default(); let new_schema_value = build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap(); - assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10))); + assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into())); let unset_ttl_alter_kind = AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![ diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 1ec8c17eb5a1..35413433a445 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -15,9 +15,9 @@ use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; -use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_time::DatabaseTimeToLive; use futures::stream::BoxStream; use humantime_serde::re::humantime; use serde::{Deserialize, Serialize}; @@ -57,15 +57,13 @@ impl Default for SchemaNameKey<'_> { #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] pub struct SchemaNameValue { #[serde(default)] - #[serde(with = "humantime_serde")] - pub ttl: Option, + pub ttl: Option, } impl Display for SchemaNameValue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some(ttl) = self.ttl { - let ttl = humantime::format_duration(ttl); - write!(f, "ttl='{ttl}'")?; + if let Some(ttl) = self.ttl.map(|i| i.to_string()) { + write!(f, "ttl='{}'", ttl)?; } Ok(()) @@ -96,11 +94,8 @@ impl TryFrom<&HashMap> for SchemaNameValue { impl From for HashMap { fn from(value: SchemaNameValue) -> Self { let mut opts = HashMap::new(); - if let Some(ttl) = value.ttl { - opts.insert( - OPT_KEY_TTL.to_string(), - format!("{}", humantime::format_duration(ttl)), - ); + if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) { + opts.insert(OPT_KEY_TTL.to_string(), ttl); } opts } @@ -313,6 +308,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> { #[cfg(test)] mod tests { + use std::time::Duration; use super::*; use crate::kv_backend::memory::MemoryKvBackend; @@ -323,9 +319,14 @@ mod tests { assert_eq!("", schema_value.to_string()); let schema_value = SchemaNameValue { - ttl: Some(Duration::from_secs(9)), + ttl: Some(Duration::from_secs(9).into()), }; assert_eq!("ttl='9s'", schema_value.to_string()); + + let schema_value = SchemaNameValue { + ttl: Some(Duration::from_secs(0).into()), + }; + assert_eq!("ttl='forever'", schema_value.to_string()); } #[test] @@ -338,17 +339,36 @@ mod tests { assert_eq!(key, parsed); let value = SchemaNameValue { - ttl: Some(Duration::from_secs(10)), + ttl: Some(Duration::from_secs(10).into()), }; let mut opts: HashMap = HashMap::new(); opts.insert("ttl".to_string(), "10s".to_string()); let from_value = SchemaNameValue::try_from(&opts).unwrap(); assert_eq!(value, from_value); - let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap(); + let parsed = SchemaNameValue::try_from_raw_value( + serde_json::json!({"ttl": "10s"}).to_string().as_bytes(), + ) + .unwrap(); assert_eq!(Some(value), parsed); + + let forever = SchemaNameValue { + ttl: Some(Default::default()), + }; + let parsed = SchemaNameValue::try_from_raw_value( + serde_json::json!({"ttl": "forever"}).to_string().as_bytes(), + ) + .unwrap(); + assert_eq!(Some(forever), parsed); + + let instant_err = SchemaNameValue::try_from_raw_value( + serde_json::json!({"ttl": "instant"}).to_string().as_bytes(), + ); + assert!(instant_err.is_err()); + let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap(); assert!(none.is_none()); + let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes()); assert!(err_empty.is_err()); } @@ -374,7 +394,7 @@ mod tests { let current_schema_value = manager.get(schema_key).await.unwrap().unwrap(); let new_schema_value = SchemaNameValue { - ttl: Some(Duration::from_secs(10)), + ttl: Some(Duration::from_secs(10).into()), }; manager .update(schema_key, ¤t_schema_value, &new_schema_value) @@ -388,10 +408,10 @@ mod tests { .unwrap(); let new_schema_value = SchemaNameValue { - ttl: Some(Duration::from_secs(40)), + ttl: Some(Duration::from_secs(40).into()), }; let incorrect_schema_value = SchemaNameValue { - ttl: Some(Duration::from_secs(20)), + ttl: Some(Duration::from_secs(20).into()), } .try_as_raw_value() .unwrap(); @@ -402,5 +422,15 @@ mod tests { .update(schema_key, &incorrect_schema_value, &new_schema_value) .await .unwrap_err(); + + let current_schema_value = manager.get(schema_key).await.unwrap().unwrap(); + let new_schema_value = SchemaNameValue { ttl: None }; + manager + .update(schema_key, ¤t_schema_value, &new_schema_value) + .await + .unwrap(); + + let current_schema_value = manager.get(schema_key).await.unwrap().unwrap(); + assert_eq!(new_schema_value, *current_schema_value); } } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 562ecb8ee660..bec12796e791 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -14,7 +14,6 @@ use std::collections::{HashMap, HashSet}; use std::result; -use std::time::Duration; use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind; use api::v1::meta::ddl_task_request::Task; @@ -36,7 +35,7 @@ use api::v1::{ }; use base64::engine::general_purpose; use base64::Engine as _; -use humantime_serde::re::humantime; +use common_time::DatabaseTimeToLive; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DefaultOnNull}; @@ -1009,12 +1008,8 @@ impl TryFrom for SetDatabaseOption { fn try_from(PbOption { key, value }: PbOption) -> Result { match key.to_ascii_lowercase().as_str() { TTL_KEY => { - let ttl = if value.is_empty() { - Duration::from_secs(0) - } else { - humantime::parse_duration(&value) - .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())? - }; + let ttl = DatabaseTimeToLive::from_humantime_or_str(&value) + .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?; Ok(SetDatabaseOption::Ttl(ttl)) } @@ -1025,7 +1020,7 @@ impl TryFrom for SetDatabaseOption { #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub enum SetDatabaseOption { - Ttl(Duration), + Ttl(DatabaseTimeToLive), } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index fdd06140f187..28ff40ff90c8 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -13,6 +13,8 @@ chrono.workspace = true chrono-tz = "0.8" common-error.workspace = true common-macro.workspace = true +humantime.workspace = true +humantime-serde.workspace = true once_cell.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true diff --git a/src/common/time/src/error.rs b/src/common/time/src/error.rs index 45d94a782885..0f6b5bdeb999 100644 --- a/src/common/time/src/error.rs +++ b/src/common/time/src/error.rs @@ -93,12 +93,28 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to parse duration"))] + ParseDuration { + #[snafu(source)] + error: humantime::DurationError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Database's TTL can't be `instant`"))] + InvalidDatabaseTtl { + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ParseDateStr { .. } + | Error::ParseDuration { .. } + | Error::InvalidDatabaseTtl { .. } | Error::ParseTimestamp { .. } | Error::InvalidTimezoneOffset { .. } | Error::Format { .. } diff --git a/src/common/time/src/lib.rs b/src/common/time/src/lib.rs index fa025bf661c2..feb19cf9a191 100644 --- a/src/common/time/src/lib.rs +++ b/src/common/time/src/lib.rs @@ -22,6 +22,7 @@ pub mod time; pub mod timestamp; pub mod timestamp_millis; pub mod timezone; +pub mod ttl; pub mod util; pub use date::Date; @@ -32,3 +33,4 @@ pub use range::RangeMillis; pub use timestamp::Timestamp; pub use timestamp_millis::TimestampMillis; pub use timezone::Timezone; +pub use ttl::{DatabaseTimeToLive, TimeToLive, FOREVER, INSTANT}; diff --git a/src/common/time/src/ttl.rs b/src/common/time/src/ttl.rs new file mode 100644 index 000000000000..0544cfb0d198 --- /dev/null +++ b/src/common/time/src/ttl.rs @@ -0,0 +1,266 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{Error, InvalidDatabaseTtlSnafu, ParseDurationSnafu}; +use crate::Timestamp; + +pub const INSTANT: &str = "instant"; +pub const FOREVER: &str = "forever"; + +/// Time To Live for database, which can be `Forever`, or a `Duration`, but can't be `Instant`. +/// +/// unlike `TimeToLive` which can be `Instant`, `Forever`, or a `Duration` +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DatabaseTimeToLive { + /// Keep the data forever + #[default] + Forever, + /// Duration to keep the data, this duration should be non-zero + #[serde(untagged, with = "humantime_serde")] + Duration(Duration), +} + +impl Display for DatabaseTimeToLive { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DatabaseTimeToLive::Forever => write!(f, "{}", FOREVER), + DatabaseTimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)), + } + } +} + +impl DatabaseTimeToLive { + /// Parse a string that is either `forever`, or a duration to `TimeToLive` + /// + /// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too + pub fn from_humantime_or_str(s: &str) -> Result { + let ttl = match s.to_lowercase().as_ref() { + INSTANT => InvalidDatabaseTtlSnafu.fail()?, + FOREVER | "" => Self::Forever, + _ => { + let d = humantime::parse_duration(s).context(ParseDurationSnafu)?; + Self::from(d) + } + }; + Ok(ttl) + } +} + +impl TryFrom for DatabaseTimeToLive { + type Error = Error; + fn try_from(value: TimeToLive) -> Result { + match value { + TimeToLive::Instant => InvalidDatabaseTtlSnafu.fail()?, + TimeToLive::Forever => Ok(Self::Forever), + TimeToLive::Duration(d) => Ok(Self::from(d)), + } + } +} + +impl From for TimeToLive { + fn from(value: DatabaseTimeToLive) -> Self { + match value { + DatabaseTimeToLive::Forever => TimeToLive::Forever, + DatabaseTimeToLive::Duration(d) => TimeToLive::from(d), + } + } +} + +impl From for DatabaseTimeToLive { + fn from(duration: Duration) -> Self { + if duration.is_zero() { + Self::Forever + } else { + Self::Duration(duration) + } + } +} + +impl From for DatabaseTimeToLive { + fn from(duration: humantime::Duration) -> Self { + Self::from(*duration) + } +} + +/// Time To Live +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TimeToLive { + /// Instantly discard upon insert + Instant, + /// Keep the data forever + #[default] + Forever, + /// Duration to keep the data, this duration should be non-zero + #[serde(untagged, with = "humantime_serde")] + Duration(Duration), +} + +impl Display for TimeToLive { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TimeToLive::Instant => write!(f, "{}", INSTANT), + TimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)), + TimeToLive::Forever => write!(f, "{}", FOREVER), + } + } +} + +impl TimeToLive { + /// Parse a string that is either `instant`, `forever`, or a duration to `TimeToLive` + /// + /// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too + pub fn from_humantime_or_str(s: &str) -> Result { + match s.to_lowercase().as_ref() { + INSTANT => Ok(TimeToLive::Instant), + FOREVER | "" => Ok(TimeToLive::Forever), + _ => { + let d = humantime::parse_duration(s).context(ParseDurationSnafu)?; + Ok(TimeToLive::from(d)) + } + } + } + + /// Check if the TimeToLive is expired + /// with the given `created_at` and `now` timestamp + pub fn is_expired( + &self, + created_at: &Timestamp, + now: &Timestamp, + ) -> crate::error::Result { + Ok(match self { + TimeToLive::Instant => true, + TimeToLive::Forever => false, + TimeToLive::Duration(d) => now.sub_duration(*d)? > *created_at, + }) + } + + /// is instant variant + pub fn is_instant(&self) -> bool { + matches!(self, TimeToLive::Instant) + } + + /// Is the default value, which is `Forever` + pub fn is_forever(&self) -> bool { + matches!(self, TimeToLive::Forever) + } +} + +impl From for TimeToLive { + fn from(duration: Duration) -> Self { + if duration.is_zero() { + // compatibility with old code, and inline with cassandra's behavior when ttl set to 0 + TimeToLive::Forever + } else { + TimeToLive::Duration(duration) + } + } +} + +impl From for TimeToLive { + fn from(duration: humantime::Duration) -> Self { + Self::from(*duration) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_db_ttl_table_ttl() { + // test from ttl to db ttl + let ttl = TimeToLive::from(Duration::from_secs(10)); + let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap(); + assert_eq!(db_ttl, DatabaseTimeToLive::from(Duration::from_secs(10))); + assert_eq!(TimeToLive::from(db_ttl), ttl); + + let ttl = TimeToLive::from(Duration::from_secs(0)); + let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap(); + assert_eq!(db_ttl, DatabaseTimeToLive::Forever); + assert_eq!(TimeToLive::from(db_ttl), ttl); + + let ttl = TimeToLive::Instant; + let err_instant = DatabaseTimeToLive::try_from(ttl); + assert!(err_instant.is_err()); + + // test 0 duration + let ttl = Duration::from_secs(0); + let db_ttl: DatabaseTimeToLive = ttl.into(); + assert_eq!(db_ttl, DatabaseTimeToLive::Forever); + + let ttl = Duration::from_secs(10); + let db_ttl: DatabaseTimeToLive = ttl.into(); + assert_eq!( + db_ttl, + DatabaseTimeToLive::Duration(Duration::from_secs(10)) + ); + + let ttl = DatabaseTimeToLive::from_humantime_or_str("10s").unwrap(); + let ttl: TimeToLive = ttl.into(); + assert_eq!(ttl, TimeToLive::from(Duration::from_secs(10))); + + let ttl = DatabaseTimeToLive::from_humantime_or_str("forever").unwrap(); + let ttl: TimeToLive = ttl.into(); + assert_eq!(ttl, TimeToLive::Forever); + + assert!(DatabaseTimeToLive::from_humantime_or_str("instant").is_err()); + + // test 0s + let ttl = DatabaseTimeToLive::from_humantime_or_str("0s").unwrap(); + let ttl: TimeToLive = ttl.into(); + assert_eq!(ttl, TimeToLive::Forever); + } + + #[test] + fn test_serde() { + let cases = vec![ + ("\"instant\"", TimeToLive::Instant), + ("\"forever\"", TimeToLive::Forever), + ("\"10d\"", Duration::from_secs(86400 * 10).into()), + ( + "\"10000 years\"", + humantime::parse_duration("10000 years").unwrap().into(), + ), + ]; + + for (s, expected) in cases { + let serialized = serde_json::to_string(&expected).unwrap(); + let deserialized: TimeToLive = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, expected); + + let deserialized: TimeToLive = serde_json::from_str(s).unwrap_or_else(|err| { + panic!("Actual serialized: {}, s=`{s}`, err: {:?}", serialized, err) + }); + assert_eq!(deserialized, expected); + + // test db ttl too + if s == "\"instant\"" { + assert!(serde_json::from_str::(s).is_err()); + continue; + } + + let db_ttl: DatabaseTimeToLive = serde_json::from_str(s).unwrap(); + let re_serialized = serde_json::to_string(&db_ttl).unwrap(); + assert_eq!(re_serialized, serialized); + } + } +} diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 13aa59fe8b30..85aa371594e8 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -12,6 +12,7 @@ api.workspace = true aquamarine.workspace = true async-trait.workspace = true base64.workspace = true +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-query.workspace = true diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index b6108f133a38..0a961d498833 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -207,7 +207,7 @@ mod test { let alter_region_option_request = RegionAlterRequest { schema_version: 0, kind: AlterKind::SetRegionOptions { - options: vec![SetRegionOption::TTL(Duration::from_secs(500))], + options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))], }, }; let result = engine_inner diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index d897640cc529..b87682d4599e 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_telemetry::{info, warn}; -use common_time::Timestamp; +use common_time::{Timestamp, FOREVER}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::Value; @@ -540,7 +540,7 @@ pub(crate) fn region_options_for_metadata_region( mut original: HashMap, ) -> HashMap { original.remove(APPEND_MODE_KEY); - original.insert(TTL_KEY.to_string(), "10000 years".to_string()); + original.insert(TTL_KEY.to_string(), FOREVER.to_string()); original } @@ -731,7 +731,7 @@ mod test { ); assert_eq!( metadata_region_request.options.get("ttl").unwrap(), - "10000 years" + "forever" ); } } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 44aa03a67df8..31e1b0674f72 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -24,7 +24,7 @@ mod window; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use api::v1::region::compact_request; use common_base::Plugins; @@ -32,7 +32,7 @@ use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; -use common_time::Timestamp; +use common_time::{TimeToLive, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::Expr; use serde::{Deserialize, Serialize}; @@ -273,7 +273,7 @@ impl CompactionScheduler { .await .unwrap_or_else(|e| { warn!(e; "Failed to get ttl for region: {}", region_id); - None + TimeToLive::default() }); debug!( @@ -292,7 +292,7 @@ impl CompactionScheduler { access_layer: access_layer.clone(), manifest_ctx: manifest_ctx.clone(), file_purger: None, - ttl, + ttl: Some(ttl), }; let picker_output = { @@ -437,18 +437,21 @@ impl PendingCompaction { /// Finds TTL of table by first examine table options then database options. async fn find_ttl( table_id: TableId, - table_ttl: Option, + table_ttl: Option, schema_metadata_manager: &SchemaMetadataManagerRef, -) -> Result> { +) -> Result { + // If table TTL is set, we use it. if let Some(table_ttl) = table_ttl { - return Ok(Some(table_ttl)); + return Ok(table_ttl); } let ttl = schema_metadata_manager .get_schema_options_by_table_id(table_id) .await .context(GetSchemaMetadataSnafu)? - .and_then(|options| options.ttl); + .and_then(|options| options.ttl) + .unwrap_or_default() + .into(); Ok(ttl) } @@ -656,24 +659,16 @@ fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result { /// Finds all expired SSTs across levels. fn get_expired_ssts( levels: &[LevelMeta], - ttl: Option, + ttl: Option, now: Timestamp, ) -> Vec { let Some(ttl) = ttl else { return vec![]; }; - let expire_time = match now.sub_duration(ttl) { - Ok(expire_time) => expire_time, - Err(e) => { - error!(e; "Failed to calculate region TTL expire time"); - return vec![]; - } - }; - levels .iter() - .flat_map(|l| l.get_expired_files(&expire_time).into_iter()) + .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter()) .collect() } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 3e0228a4b2a4..792634b2e4a2 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -18,6 +18,7 @@ use std::time::Duration; use api::v1::region::compact_request; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{info, warn}; +use common_time::TimeToLive; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; @@ -63,7 +64,7 @@ pub struct CompactionRegion { pub(crate) manifest_ctx: Arc, pub(crate) current_version: VersionRef, pub(crate) file_purger: Option>, - pub(crate) ttl: Option, + pub(crate) ttl: Option, } /// OpenCompactionRegionRequest represents the request to open a compaction region. @@ -180,7 +181,7 @@ pub async fn open_compaction_region( .await .unwrap_or_else(|e| { warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id); - None + TimeToLive::default() }); Ok(CompactionRegion { region_id: req.region_id, @@ -193,7 +194,7 @@ pub async fn open_compaction_region( manifest_ctx, current_version, file_purger: Some(file_purger), - ttl, + ttl: Some(ttl), }) } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 9d6207bba298..f16b8e4c95d3 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -253,7 +253,7 @@ mod tests { truncated_entry_id: None, compaction_time_window: None, options: RegionOptions { - ttl, + ttl: ttl.map(|t| t.into()), compaction: Default::default(), storage: None, append_mode: false, diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 069d64fb5a87..b774dd8a05cf 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -604,7 +604,7 @@ async fn test_alter_region_ttl_options() { let alter_ttl_request = RegionAlterRequest { schema_version: 0, kind: AlterKind::SetRegionOptions { - options: vec![SetRegionOption::TTL(Duration::from_secs(500))], + options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))], }, }; let alter_job = tokio::spawn(async move { @@ -617,14 +617,8 @@ async fn test_alter_region_ttl_options() { alter_job.await.unwrap(); let check_ttl = |engine: &MitoEngine, expected: &Duration| { - let current_ttl = engine - .get_region(region_id) - .unwrap() - .version() - .options - .ttl - .unwrap(); - assert_eq!(*expected, current_ttl); + let current_ttl = engine.get_region(region_id).unwrap().version().options.ttl; + assert_eq!(current_ttl, Some((*expected).into())); }; // Verify the ttl. check_ttl(&engine, &Duration::from_secs(500)); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 9ce3c53b7661..48b04dc86d91 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -165,8 +165,8 @@ async fn test_engine_create_with_options() { assert!(engine.is_region_exists(region_id)); let region = engine.get_region(region_id).unwrap(); assert_eq!( - Duration::from_secs(3600 * 24 * 10), - region.version().options.ttl.unwrap() + region.version().options.ttl, + Some(Duration::from_secs(3600 * 24 * 10).into()) ); } diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 8fd084a24ffa..6752bbd04b12 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -180,8 +180,8 @@ async fn test_engine_region_open_with_options() { let region = engine.get_region(region_id).unwrap(); assert_eq!( - Duration::from_secs(3600 * 24 * 4), - region.version().options.ttl.unwrap() + region.version().options.ttl, + Some(Duration::from_secs(3600 * 24 * 4).into()) ); } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 4abc5925b705..4514137cc335 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_time::TimeToLive; use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use serde::de::Error as _; use serde::{Deserialize, Deserializer, Serialize}; @@ -55,8 +56,7 @@ pub enum MergeMode { #[serde(default)] pub struct RegionOptions { /// Region SST files TTL. - #[serde(with = "humantime_serde")] - pub ttl: Option, + pub ttl: Option, /// Compaction options. pub compaction: CompactionOptions, /// Custom storage. Uses default storage if it is `None`. @@ -252,8 +252,7 @@ impl Default for TwcsOptions { #[serde(default)] struct RegionOptionsWithoutEnum { /// Region SST files TTL. - #[serde(with = "humantime_serde")] - ttl: Option, + ttl: Option, storage: Option, #[serde_as(as = "DisplayFromStr")] append_mode: bool, @@ -458,7 +457,7 @@ mod tests { let map = make_map(&[("ttl", "7d")]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { - ttl: Some(Duration::from_secs(3600 * 24 * 7)), + ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), ..Default::default() }; assert_eq!(expect, options); @@ -621,7 +620,7 @@ mod tests { ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { - ttl: Some(Duration::from_secs(3600 * 24 * 7)), + ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, max_active_window_files: 11, @@ -654,7 +653,7 @@ mod tests { #[test] fn test_region_options_serde() { let options = RegionOptions { - ttl: Some(Duration::from_secs(3600 * 24 * 7)), + ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, max_active_window_files: usize::MAX, @@ -722,7 +721,7 @@ mod tests { }"#; let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap(); let options = RegionOptions { - ttl: Some(Duration::from_secs(3600 * 24 * 7)), + ttl: Some(Duration::from_secs(3600 * 24 * 7).into()), compaction: CompactionOptions::Twcs(TwcsOptions { max_active_window_runs: 8, max_active_window_files: 11, diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index c677a9541344..891ded08f6bd 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use common_time::Timestamp; +use common_time::{TimeToLive, Timestamp}; use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL}; use crate::sst::file_purger::FilePurgerRef; @@ -160,12 +160,19 @@ impl LevelMeta { } /// Returns expired SSTs from current level. - pub fn get_expired_files(&self, expire_time: &Timestamp) -> Vec { + pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec { self.files .values() .filter(|v| { let (_, end) = v.time_range(); - &end < expire_time + + match ttl.is_expired(&end, now) { + Ok(expired) => expired, + Err(e) => { + common_telemetry::error!(e; "Failed to calculate region TTL expire time"); + false + } + } }) .cloned() .collect() diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 3908cee98be0..10d87e2940c2 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -184,16 +184,12 @@ impl RegionWorkerLoop { let mut current_options = version.options.clone(); for option in options { match option { - SetRegionOption::TTL(new_ttl) => { + SetRegionOption::Ttl(new_ttl) => { info!( "Update region ttl: {}, previous: {:?} new: {:?}", region.region_id, current_options.ttl, new_ttl ); - if new_ttl.is_zero() { - current_options.ttl = None; - } else { - current_options.ttl = Some(new_ttl); - } + current_options.ttl = new_ttl; } SetRegionOption::Twsc(key, value) => { let Twcs(options) = &mut current_options.compaction; diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index d20034155f1b..cd26458fad68 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +ahash.workspace = true api.workspace = true async-stream.workspace = true async-trait = "0.1" diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 4637f7fd10bb..ec01b329457f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use api::v1::alter_table_expr::Kind; -use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader}; +use api::v1::region::{ + InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, + RegionRequestHeader, +}; use api::v1::{ AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests, SemanticType, @@ -91,6 +94,20 @@ impl AutoCreateTableType { } } +/// Split insert requests into normal and instant requests. +/// +/// Where instant requests are requests with ttl=instant, +/// and normal requests are requests with ttl set to other values. +/// +/// This is used to split requests for different processing. +#[derive(Clone)] +pub struct InstantAndNormalInsertRequests { + /// Requests with normal ttl. + pub normal_requests: RegionInsertRequests, + /// Requests with ttl=instant. + pub instant_requests: RegionInsertRequests, +} + impl Inserter { pub fn new( catalog_manager: CatalogManagerRef, @@ -183,12 +200,16 @@ impl Inserter { }); validate_column_count_match(&requests)?; - let table_name_to_ids = self + let (table_name_to_ids, instant_table_ids) = self .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor) .await?; - let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) - .convert(requests) - .await?; + let inserts = RowToRegion::new( + table_name_to_ids, + instant_table_ids, + self.partition_manager.as_ref(), + ) + .convert(requests) + .await?; self.do_request(inserts, &ctx).await } @@ -215,7 +236,7 @@ impl Inserter { .await?; // check and create logical tables - let table_name_to_ids = self + let (table_name_to_ids, instant_table_ids) = self .create_or_alter_tables_on_demand( &requests, &ctx, @@ -223,9 +244,13 @@ impl Inserter { statement_executor, ) .await?; - let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager) - .convert(requests) - .await?; + let inserts = RowToRegion::new( + table_name_to_ids, + instant_table_ids, + &self.partition_manager, + ) + .convert(requests) + .await?; self.do_request(inserts, &ctx).await } @@ -268,7 +293,7 @@ impl Inserter { impl Inserter { async fn do_request( &self, - requests: RegionInsertRequests, + requests: InstantAndNormalInsertRequests, ctx: &QueryContextRef, ) -> Result { let write_cost = write_meter!( @@ -283,8 +308,21 @@ impl Inserter { ..Default::default() }); + let InstantAndNormalInsertRequests { + normal_requests, + instant_requests, + } = requests; + // Mirror requests for source table to flownode - match self.mirror_flow_node_requests(&requests).await { + match self + .mirror_flow_node_requests( + normal_requests + .requests + .iter() + .chain(instant_requests.requests.iter()), + ) + .await + { Ok(flow_requests) => { let node_manager = self.node_manager.clone(); let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| { @@ -320,7 +358,7 @@ impl Inserter { } let write_tasks = self - .group_requests_by_peer(requests) + .group_requests_by_peer(normal_requests) .await? .into_iter() .map(|(peer, inserts)| { @@ -350,14 +388,14 @@ impl Inserter { } /// Mirror requests for source table to flownode - async fn mirror_flow_node_requests( - &self, - requests: &RegionInsertRequests, + async fn mirror_flow_node_requests<'it, 'zelf: 'it>( + &'zelf self, + requests: impl Iterator, ) -> Result> { // store partial source table requests used by flow node(only store what's used) let mut src_table_reqs: HashMap, RegionInsertRequests)>> = HashMap::new(); - for req in &requests.requests { + for req in requests { let table_id = RegionId::from_u64(req.region_id).table_id(); match src_table_reqs.get_mut(&table_id) { Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), @@ -422,7 +460,6 @@ impl Inserter { // group by region ids first to reduce repeatedly call `find_region_leader` // TODO(discord9): determine if a addition clone is worth it let mut requests_per_region: HashMap = HashMap::new(); - for req in requests.requests { let region_id = RegionId::from_u64(req.region_id); requests_per_region @@ -462,7 +499,7 @@ impl Inserter { ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, - ) -> Result> { + ) -> Result<(HashMap, HashSet)> { let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND .with_label_values(&[auto_create_table_type.as_str()]) .start_timer(); @@ -483,6 +520,7 @@ impl Inserter { })? .unwrap_or(true); if !auto_create_table_hint { + let mut instant_table_ids = HashSet::new(); for req in &requests.inserts { let table = self .get_table(catalog, &schema, &req.table_name) @@ -494,17 +532,25 @@ impl Inserter { ), })?; let table_info = table.table_info(); + if table_info.is_ttl_instant_table() { + instant_table_ids.insert(table_info.table_id()); + } table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); } - return Ok(table_name_to_ids); + return Ok((table_name_to_ids, instant_table_ids)); } let mut create_tables = vec![]; let mut alter_tables = vec![]; + let mut instant_table_ids = HashSet::new(); + for req in &requests.inserts { match self.get_table(catalog, &schema, &req.table_name).await? { Some(table) => { let table_info = table.table_info(); + if table_info.is_ttl_instant_table() { + instant_table_ids.insert(table_info.table_id()); + } table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); if let Some(alter_expr) = self.get_alter_table_expr_on_demand(req, &table, ctx)? @@ -543,6 +589,8 @@ impl Inserter { AutoCreateTableType::Physical | AutoCreateTableType::Log | AutoCreateTableType::LastNonNull => { + // note that auto create table shouldn't be ttl instant table + // for it's a very unexpected behavior and should be set by user explicitly for create_table in create_tables { let table = self .create_physical_table(create_table, ctx, statement_executor) @@ -558,7 +606,7 @@ impl Inserter { } } - Ok(table_name_to_ids) + Ok((table_name_to_ids, instant_table_ids)) } async fn create_physical_table_on_demand( diff --git a/src/operator/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs index a33a1329026d..125910ba455f 100644 --- a/src/operator/src/req_convert/insert/row_to_region.rs +++ b/src/operator/src/req_convert/insert/row_to_region.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - +use ahash::{HashMap, HashSet}; use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::RowInsertRequests; use partition::manager::PartitionRuleManager; @@ -21,37 +20,53 @@ use snafu::OptionExt; use table::metadata::TableId; use crate::error::{Result, TableNotFoundSnafu}; +use crate::insert::InstantAndNormalInsertRequests; use crate::req_convert::common::partitioner::Partitioner; pub struct RowToRegion<'a> { table_name_to_ids: HashMap, + instant_table_ids: HashSet, partition_manager: &'a PartitionRuleManager, } impl<'a> RowToRegion<'a> { pub fn new( table_name_to_ids: HashMap, + instant_table_ids: HashSet, partition_manager: &'a PartitionRuleManager, ) -> Self { Self { table_name_to_ids, + instant_table_ids, partition_manager, } } - pub async fn convert(&self, requests: RowInsertRequests) -> Result { + pub async fn convert( + &self, + requests: RowInsertRequests, + ) -> Result { let mut region_request = Vec::with_capacity(requests.inserts.len()); + let mut instant_request = Vec::with_capacity(requests.inserts.len()); for request in requests.inserts { let table_id = self.get_table_id(&request.table_name)?; let requests = Partitioner::new(self.partition_manager) .partition_insert_requests(table_id, request.rows.unwrap_or_default()) .await?; - - region_request.extend(requests); + if self.instant_table_ids.contains(&table_id) { + instant_request.extend(requests); + } else { + region_request.extend(requests); + } } - Ok(RegionInsertRequests { - requests: region_request, + Ok(InstantAndNormalInsertRequests { + normal_requests: RegionInsertRequests { + requests: region_request, + }, + instant_requests: RegionInsertRequests { + requests: instant_request, + }, }) } diff --git a/src/operator/src/req_convert/insert/stmt_to_region.rs b/src/operator/src/req_convert/insert/stmt_to_region.rs index 8124edc19514..cd48b4fca54e 100644 --- a/src/operator/src/req_convert/insert/stmt_to_region.rs +++ b/src/operator/src/req_convert/insert/stmt_to_region.rs @@ -32,6 +32,7 @@ use crate::error::{ ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu, }; +use crate::insert::InstantAndNormalInsertRequests; use crate::req_convert::common::partitioner::Partitioner; use crate::req_convert::insert::semantic_type; @@ -60,7 +61,7 @@ impl<'a> StatementToRegion<'a> { &self, stmt: &Insert, query_ctx: &QueryContextRef, - ) -> Result { + ) -> Result { let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?; let table = self.get_table(&catalog, &schema, &table_name).await?; let table_schema = table.schema(); @@ -134,7 +135,18 @@ impl<'a> StatementToRegion<'a> { let requests = Partitioner::new(self.partition_manager) .partition_insert_requests(table_info.table_id(), Rows { schema, rows }) .await?; - Ok(RegionInsertRequests { requests }) + let requests = RegionInsertRequests { requests }; + if table_info.is_ttl_instant_table() { + Ok(InstantAndNormalInsertRequests { + normal_requests: Default::default(), + instant_requests: requests, + }) + } else { + Ok(InstantAndNormalInsertRequests { + normal_requests: requests, + instant_requests: Default::default(), + }) + } } async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result { diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index 729355cf0159..ac79cce503ed 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -19,6 +19,7 @@ use table::metadata::TableInfo; use table::requests::InsertRequest as TableInsertRequest; use crate::error::Result; +use crate::insert::InstantAndNormalInsertRequests; use crate::req_convert::common::partitioner::Partitioner; use crate::req_convert::common::{column_schema, row_count}; @@ -35,7 +36,10 @@ impl<'a> TableToRegion<'a> { } } - pub async fn convert(&self, request: TableInsertRequest) -> Result { + pub async fn convert( + &self, + request: TableInsertRequest, + ) -> Result { let row_count = row_count(&request.columns_values)?; let schema = column_schema(self.table_info, &request.columns_values)?; let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count); @@ -44,7 +48,19 @@ impl<'a> TableToRegion<'a> { let requests = Partitioner::new(self.partition_manager) .partition_insert_requests(self.table_info.table_id(), rows) .await?; - Ok(RegionInsertRequests { requests }) + + let requests = RegionInsertRequests { requests }; + if self.table_info.is_ttl_instant_table() { + Ok(InstantAndNormalInsertRequests { + normal_requests: Default::default(), + instant_requests: requests, + }) + } else { + Ok(InstantAndNormalInsertRequests { + normal_requests: requests, + instant_requests: Default::default(), + }) + } } } @@ -112,6 +128,7 @@ mod tests { let region_requests = converter.convert(table_request).await.unwrap(); let mut region_id_to_region_requests = region_requests + .normal_requests .requests .into_iter() .map(|r| (r.region_id, r)) diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index 68be43785703..17f1dcdd394b 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -21,7 +21,6 @@ use datatypes::schema::{ ColumnDefaultConstraint, ColumnSchema, SchemaRef, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COMMENT_KEY, }; -use humantime::format_duration; use snafu::ResultExt; use sql::ast::{ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName}; use sql::dialect::GreptimeDbDialect; @@ -46,13 +45,13 @@ fn create_sql_options(table_meta: &TableMeta, schema_options: Option for ModifyColumnType { #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub enum SetRegionOption { - TTL(Duration), + Ttl(Option), // Modifying TwscOptions with values as (option name, new value). Twsc(String, String), } @@ -758,13 +758,10 @@ impl TryFrom<&PbOption> for SetRegionOption { let PbOption { key, value } = value; match key.as_str() { TTL_KEY => { - let ttl = if value.is_empty() { - Duration::from_secs(0) - } else { - humantime::parse_duration(value) - .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())? - }; - Ok(Self::TTL(ttl)) + let ttl = TimeToLive::from_humantime_or_str(value) + .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?; + + Ok(Self::Ttl(Some(ttl))) } TWCS_MAX_ACTIVE_WINDOW_RUNS | TWCS_MAX_ACTIVE_WINDOW_FILES @@ -798,7 +795,7 @@ impl From<&UnsetRegionOption> for SetRegionOption { UnsetRegionOption::TwcsTimeWindow => { SetRegionOption::Twsc(unset_option.to_string(), String::new()) } - UnsetRegionOption::Ttl => SetRegionOption::TTL(Duration::default()), + UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()), } } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 7f4ddb409acd..6dfc47314a36 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -224,12 +224,8 @@ impl TableMeta { for request in requests { match request { - SetRegionOption::TTL(new_ttl) => { - if new_ttl.is_zero() { - new_options.ttl = None; - } else { - new_options.ttl = Some(*new_ttl); - } + SetRegionOption::Ttl(new_ttl) => { + new_options.ttl = *new_ttl; } SetRegionOption::Twsc(key, value) => { if !value.is_empty() { @@ -826,6 +822,15 @@ impl TableInfo { .extra_options .contains_key(PHYSICAL_TABLE_METADATA_KEY) } + + /// Return true if the table's TTL is `instant`. + pub fn is_ttl_instant_table(&self) -> bool { + self.meta + .options + .ttl + .map(|t| t.is_instant()) + .unwrap_or(false) + } } impl TableInfoBuilder { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4c273e03b785..74554631c62d 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -17,12 +17,12 @@ use std::collections::HashMap; use std::fmt; use std::str::FromStr; -use std::time::Duration; use common_base::readable_size::ReadableSize; use common_datasource::object_store::s3::is_supported_in_s3; use common_query::AddColumnLocation; use common_time::range::TimestampRange; +use common_time::TimeToLive; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, FulltextOptions}; @@ -74,8 +74,7 @@ pub struct TableOptions { /// Memtable size of memtable. pub write_buffer_size: Option, /// Time-to-live of table. Expired data will be automatically purged. - #[serde(with = "humantime_serde")] - pub ttl: Option, + pub ttl: Option, /// Extra options that may not applicable to all table engines. pub extra_options: HashMap, } @@ -109,16 +108,13 @@ impl TableOptions { } if let Some(ttl) = kvs.get(TTL_KEY) { - let ttl_value = ttl - .parse::() - .map_err(|_| { - ParseTableOptionSnafu { - key: TTL_KEY, - value: ttl, - } - .build() - })? - .into(); + let ttl_value = TimeToLive::from_humantime_or_str(ttl).map_err(|_| { + ParseTableOptionSnafu { + key: TTL_KEY, + value: ttl, + } + .build() + })?; options.ttl = Some(ttl_value); } @@ -138,8 +134,8 @@ impl fmt::Display for TableOptions { key_vals.push(format!("{}={}", WRITE_BUFFER_SIZE_KEY, size)); } - if let Some(ttl) = self.ttl { - key_vals.push(format!("{}={}", TTL_KEY, humantime::Duration::from(ttl))); + if let Some(ttl) = self.ttl.map(|ttl| ttl.to_string()) { + key_vals.push(format!("{}={}", TTL_KEY, ttl)); } for (k, v) in &self.extra_options { @@ -159,8 +155,7 @@ impl From<&TableOptions> for HashMap { write_buffer_size.to_string(), ); } - if let Some(ttl) = opts.ttl { - let ttl_str = humantime::format_duration(ttl).to_string(); + if let Some(ttl_str) = opts.ttl.map(|ttl| ttl.to_string()) { let _ = res.insert(TTL_KEY.to_string(), ttl_str); } res.extend( @@ -326,6 +321,8 @@ pub struct CopyDatabaseRequest { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; #[test] @@ -343,7 +340,7 @@ mod tests { fn test_serialize_table_options() { let options = TableOptions { write_buffer_size: None, - ttl: Some(Duration::from_secs(1000)), + ttl: Some(Duration::from_secs(1000).into()), extra_options: HashMap::new(), }; let serialized = serde_json::to_string(&options).unwrap(); @@ -355,7 +352,7 @@ mod tests { fn test_convert_hashmap_between_table_options() { let options = TableOptions { write_buffer_size: Some(ReadableSize::mb(128)), - ttl: Some(Duration::from_secs(1000)), + ttl: Some(Duration::from_secs(1000).into()), extra_options: HashMap::new(), }; let serialized_map = HashMap::from(&options); @@ -364,7 +361,7 @@ mod tests { let options = TableOptions { write_buffer_size: None, - ttl: None, + ttl: Default::default(), extra_options: HashMap::new(), }; let serialized_map = HashMap::from(&options); @@ -373,7 +370,7 @@ mod tests { let options = TableOptions { write_buffer_size: Some(ReadableSize::mb(128)), - ttl: Some(Duration::from_secs(1000)), + ttl: Some(Duration::from_secs(1000).into()), extra_options: HashMap::from([("a".to_string(), "A".to_string())]), }; let serialized_map = HashMap::from(&options); @@ -385,7 +382,7 @@ mod tests { fn test_table_options_to_string() { let options = TableOptions { write_buffer_size: Some(ReadableSize::mb(128)), - ttl: Some(Duration::from_secs(1000)), + ttl: Some(Duration::from_secs(1000).into()), extra_options: HashMap::new(), }; @@ -396,7 +393,7 @@ mod tests { let options = TableOptions { write_buffer_size: Some(ReadableSize::mb(128)), - ttl: Some(Duration::from_secs(1000)), + ttl: Some(Duration::from_secs(1000).into()), extra_options: HashMap::from([("a".to_string(), "A".to_string())]), }; diff --git a/tests/cases/standalone/common/alter/alter_database.result b/tests/cases/standalone/common/alter/alter_database.result index a98d48323659..8ff458989e4c 100644 --- a/tests/cases/standalone/common/alter/alter_database.result +++ b/tests/cases/standalone/common/alter/alter_database.result @@ -62,6 +62,9 @@ SHOW CREATE DATABASE alter_database; | Database | Create Database | +----------------+----------------------------------------------+ | alter_database | CREATE DATABASE IF NOT EXISTS alter_database | +| | WITH( | +| | ttl = 'forever' | +| | ) | +----------------+----------------------------------------------+ ALTER DATABASE alter_database SET 'ttl'='😁'; diff --git a/tests/cases/standalone/common/alter/alter_table_options.result b/tests/cases/standalone/common/alter/alter_table_options.result index 8fa08eefea6b..b38a99d8465e 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.result +++ b/tests/cases/standalone/common/alter/alter_table_options.result @@ -103,7 +103,9 @@ SHOW CREATE TABLE ato; | | ) | | | | | | ENGINE=mito | -| | | +| | WITH( | +| | ttl = 'forever' | +| | ) | +-------+------------------------------------+ ALTER TABLE ato SET 'ttl'='1s'; diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result new file mode 100644 index 000000000000..38d14d6b3155 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -0,0 +1,101 @@ +-- test ttl = instant +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = 'instant'); + +Affected Rows: 0 + +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +SELECT + DISTINCT number as dis +FROM + distinct_basic; + +Affected Rows: 0 + +-- SQLNESS ARG restart=true +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_distinct_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SHOW CREATE TABLE distinct_basic; + ++----------------+-----------------------------------------------------------+ +| Table | Create Table | ++----------------+-----------------------------------------------------------+ +| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( | +| | "number" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("number") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'instant' | +| | ) | ++----------------+-----------------------------------------------------------+ + +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | ++-----+ + +SELECT number FROM distinct_basic; + +++ +++ + +DROP FLOW test_distinct_basic; + +Affected Rows: 0 + +DROP TABLE distinct_basic; + +Affected Rows: 0 + +DROP TABLE out_distinct_basic; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql new file mode 100644 index 000000000000..18dfea25db04 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -0,0 +1,39 @@ +-- test ttl = instant +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = 'instant'); + +CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS +SELECT + DISTINCT number as dis +FROM + distinct_basic; + +-- SQLNESS ARG restart=true +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + +SHOW CREATE TABLE distinct_basic; + +SHOW CREATE TABLE out_distinct_basic; + +SELECT + dis +FROM + out_distinct_basic; + +SELECT number FROM distinct_basic; + +DROP FLOW test_distinct_basic; +DROP TABLE distinct_basic; +DROP TABLE out_distinct_basic; \ No newline at end of file diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index cc9b4e038b0f..8ee6a90c83bf 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -227,6 +227,23 @@ ADMIN FLUSH_FLOW('test_distinct_basic'); | FLOW_FLUSHED | +-----------------------------------------+ +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + SELECT dis FROM @@ -478,6 +495,23 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); | FLOW_FLUSHED | +--------------------------------------+ +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------+ + SELECT "ngx_access_log.country" FROM @@ -594,6 +628,24 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); | FLOW_FLUSHED | +--------------------------------------+ +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country", "time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------------------+ + SELECT "ngx_access_log.country", time_window diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 70d7b14157c2..43a42de4dd5f 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -128,6 +128,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); +SHOW CREATE TABLE out_distinct_basic; + SELECT dis FROM @@ -270,6 +272,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +SHOW CREATE TABLE ngx_country; + SELECT "ngx_access_log.country" FROM @@ -333,6 +337,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +SHOW CREATE TABLE ngx_country; + SELECT "ngx_access_log.country", time_window diff --git a/tests/cases/standalone/common/ttl/show_ttl.result b/tests/cases/standalone/common/ttl/show_ttl.result new file mode 100644 index 000000000000..d98c1b612bca --- /dev/null +++ b/tests/cases/standalone/common/ttl/show_ttl.result @@ -0,0 +1,374 @@ +CREATE DATABASE test_ttl_db WITH (ttl = '1 second'); + +Affected Rows: 1 + +USE test_ttl_db; + +Affected Rows: 0 + +CREATE TABLE test_ttl(ts TIMESTAMP TIME INDEX, val INT); + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = '1s' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = '1s' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER DATABASE test_ttl_db SET ttl = '1 day'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = '1day' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = '1day' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER TABLE test_ttl SET 'ttl' = '6 hours'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = '6h' | +| | ) | ++----------+-----------------------------------------+ + +ALTER TABLE test_ttl SET 'ttl' = 'instant'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'instant' | +| | ) | ++----------+-----------------------------------------+ + +ALTER TABLE test_ttl SET 'ttl' = '0s'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++----------+-----------------------------------------+ + +ALTER TABLE test_ttl SET 'ttl' = 'forever'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = '1day' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER TABLE test_ttl UNSET 'ttl'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = '1day' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = '1day' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER DATABASE test_ttl_db SET 'ttl' = 'forever'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER DATABASE test_ttl_db SET 'ttl' = '0s'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER DATABASE test_ttl_db SET 'ttl' = 'instant'; + +Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | +| | WITH( | +| | ttl = 'forever' | +| | ) | ++-------------+-------------------------------------------+ + +ALTER DATABASE test_ttl_db UNSET 'ttl'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | ++-------------+-------------------------------------------+ + +ALTER TABLE test_ttl UNSET 'ttl'; + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ + +SHOW CREATE DATABASE test_ttl_db; + ++-------------+-------------------------------------------+ +| Database | Create Database | ++-------------+-------------------------------------------+ +| test_ttl_db | CREATE DATABASE IF NOT EXISTS test_ttl_db | ++-------------+-------------------------------------------+ + +DROP TABLE test_ttl; + +Affected Rows: 0 + +USE public; + +Affected Rows: 0 + +DROP DATABASE test_ttl_db; + +Affected Rows: 0 + +-- test both set database to instant and alter ttl to instant for a database is forbidden +CREATE DATABASE test_ttl_db WITH (ttl = 'instant'); + +Error: 1002(Unexpected), Failed to parse value instant into key ttl + +CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s'); + +Affected Rows: 1 + +ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant'; + +Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant + diff --git a/tests/cases/standalone/common/ttl/show_ttl.sql b/tests/cases/standalone/common/ttl/show_ttl.sql new file mode 100644 index 000000000000..d226b96211d5 --- /dev/null +++ b/tests/cases/standalone/common/ttl/show_ttl.sql @@ -0,0 +1,82 @@ +CREATE DATABASE test_ttl_db WITH (ttl = '1 second'); + +USE test_ttl_db; + +CREATE TABLE test_ttl(ts TIMESTAMP TIME INDEX, val INT); + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER DATABASE test_ttl_db SET ttl = '1 day'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER TABLE test_ttl SET 'ttl' = '6 hours'; + +SHOW CREATE TABLE test_ttl; + +ALTER TABLE test_ttl SET 'ttl' = 'instant'; + +SHOW CREATE TABLE test_ttl; + +ALTER TABLE test_ttl SET 'ttl' = '0s'; + +SHOW CREATE TABLE test_ttl; + +ALTER TABLE test_ttl SET 'ttl' = 'forever'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER TABLE test_ttl UNSET 'ttl'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER DATABASE test_ttl_db SET 'ttl' = 'forever'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER DATABASE test_ttl_db SET 'ttl' = '0s'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER DATABASE test_ttl_db SET 'ttl' = 'instant'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER DATABASE test_ttl_db UNSET 'ttl'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +ALTER TABLE test_ttl UNSET 'ttl'; + +SHOW CREATE TABLE test_ttl; + +SHOW CREATE DATABASE test_ttl_db; + +DROP TABLE test_ttl; + +USE public; + +DROP DATABASE test_ttl_db; + +-- test both set database to instant and alter ttl to instant for a database is forbidden +CREATE DATABASE test_ttl_db WITH (ttl = 'instant'); + +CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s'); + +ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant'; diff --git a/tests/cases/standalone/common/ttl/ttl_instant.result b/tests/cases/standalone/common/ttl/ttl_instant.result new file mode 100644 index 000000000000..57f12e01ddc7 --- /dev/null +++ b/tests/cases/standalone/common/ttl/ttl_instant.result @@ -0,0 +1,340 @@ +CREATE TABLE test_ttl( + ts TIMESTAMP TIME INDEX, + val INT, + PRIMARY KEY (`val`) +) WITH (ttl = 'instant'); + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("val") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'instant' | +| | ) | ++----------+-----------------------------------------+ + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +Affected Rows: 0 + +SELECT + val +from + test_ttl +ORDER BY + val; + +++ +++ + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + ++-------------------------------+ +| ADMIN flush_table('test_ttl') | ++-------------------------------+ +| 0 | ++-------------------------------+ + +ADMIN compact_table('test_ttl'); + ++---------------------------------+ +| ADMIN compact_table('test_ttl') | ++---------------------------------+ +| 0 | ++---------------------------------+ + +SELECT + val +from + test_ttl +ORDER BY + val; + +++ +++ + +ALTER TABLE + test_ttl UNSET 'ttl'; + +Affected Rows: 0 + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +Affected Rows: 3 + +SELECT + val +from + test_ttl +ORDER BY + val; + ++-----+ +| val | ++-----+ +| 1 | +| 2 | +| 3 | ++-----+ + +DROP TABLE test_ttl; + +Affected Rows: 0 + +CREATE TABLE test_ttl( + ts TIMESTAMP TIME INDEX, + val INT, + PRIMARY KEY (`val`) +) WITH (ttl = '1s'); + +Affected Rows: 0 + +SHOW CREATE TABLE test_ttl; + ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| test_ttl | CREATE TABLE IF NOT EXISTS "test_ttl" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" INT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("val") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = '1s' | +| | ) | ++----------+-----------------------------------------+ + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +Affected Rows: 3 + +SELECT + val +from + test_ttl +ORDER BY + val; + ++-----+ +| val | ++-----+ +| 1 | +| 2 | +| 3 | ++-----+ + +ADMIN flush_table('test_ttl'); + ++-------------------------------+ +| ADMIN flush_table('test_ttl') | ++-------------------------------+ +| 0 | ++-------------------------------+ + +ADMIN compact_table('test_ttl'); + ++---------------------------------+ +| ADMIN compact_table('test_ttl') | ++---------------------------------+ +| 0 | ++---------------------------------+ + +SELECT + val +from + test_ttl +ORDER BY + val; + ++-----+ +| val | ++-----+ +| 1 | +| 2 | +| 3 | ++-----+ + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + ++-------------------------------+ +| ADMIN flush_table('test_ttl') | ++-------------------------------+ +| 0 | ++-------------------------------+ + +ADMIN compact_table('test_ttl'); + ++---------------------------------+ +| ADMIN compact_table('test_ttl') | ++---------------------------------+ +| 0 | ++---------------------------------+ + +SELECT + val +from + test_ttl +ORDER BY + val; + +++ +++ + +ALTER TABLE + test_ttl +SET + ttl = '1d'; + +Affected Rows: 0 + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +Affected Rows: 3 + +SELECT + val +from + test_ttl +ORDER BY + val; + ++-----+ +| val | ++-----+ +| 1 | +| 2 | +| 3 | ++-----+ + +ALTER TABLE + test_ttl +SET + ttl = 'instant'; + +Affected Rows: 0 + +ADMIN flush_table('test_ttl'); + ++-------------------------------+ +| ADMIN flush_table('test_ttl') | ++-------------------------------+ +| 0 | ++-------------------------------+ + +ADMIN compact_table('test_ttl'); + ++---------------------------------+ +| ADMIN compact_table('test_ttl') | ++---------------------------------+ +| 0 | ++---------------------------------+ + +SELECT + val +from + test_ttl +ORDER BY + val; + +++ +++ + +-- to make sure alter back and forth from duration to/from instant wouldn't break anything +ALTER TABLE + test_ttl +SET + ttl = '1s'; + +Affected Rows: 0 + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +Affected Rows: 3 + +SELECT + val +from + test_ttl +ORDER BY + val; + ++-----+ +| val | ++-----+ +| 1 | +| 2 | +| 3 | ++-----+ + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + ++-------------------------------+ +| ADMIN flush_table('test_ttl') | ++-------------------------------+ +| 0 | ++-------------------------------+ + +ADMIN compact_table('test_ttl'); + ++---------------------------------+ +| ADMIN compact_table('test_ttl') | ++---------------------------------+ +| 0 | ++---------------------------------+ + +SELECT + val +from + test_ttl +ORDER BY + val; + +++ +++ + +DROP TABLE test_ttl; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/ttl/ttl_instant.sql b/tests/cases/standalone/common/ttl/ttl_instant.sql new file mode 100644 index 000000000000..b76128ccdf0b --- /dev/null +++ b/tests/cases/standalone/common/ttl/ttl_instant.sql @@ -0,0 +1,166 @@ +CREATE TABLE test_ttl( + ts TIMESTAMP TIME INDEX, + val INT, + PRIMARY KEY (`val`) +) WITH (ttl = 'instant'); + +SHOW CREATE TABLE test_ttl; + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +SELECT + val +from + test_ttl +ORDER BY + val; + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + +ADMIN compact_table('test_ttl'); + +SELECT + val +from + test_ttl +ORDER BY + val; + +ALTER TABLE + test_ttl UNSET 'ttl'; + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +SELECT + val +from + test_ttl +ORDER BY + val; + +DROP TABLE test_ttl; + +CREATE TABLE test_ttl( + ts TIMESTAMP TIME INDEX, + val INT, + PRIMARY KEY (`val`) +) WITH (ttl = '1s'); + +SHOW CREATE TABLE test_ttl; + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +SELECT + val +from + test_ttl +ORDER BY + val; + +ADMIN flush_table('test_ttl'); + +ADMIN compact_table('test_ttl'); + +SELECT + val +from + test_ttl +ORDER BY + val; + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + +ADMIN compact_table('test_ttl'); + +SELECT + val +from + test_ttl +ORDER BY + val; + +ALTER TABLE + test_ttl +SET + ttl = '1d'; + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +SELECT + val +from + test_ttl +ORDER BY + val; + +ALTER TABLE + test_ttl +SET + ttl = 'instant'; + +ADMIN flush_table('test_ttl'); + +ADMIN compact_table('test_ttl'); + +SELECT + val +from + test_ttl +ORDER BY + val; + +-- to make sure alter back and forth from duration to/from instant wouldn't break anything +ALTER TABLE + test_ttl +SET + ttl = '1s'; + +INSERT INTO + test_ttl +VALUES + (now(), 1), + (now(), 2), + (now(), 3); + +SELECT + val +from + test_ttl +ORDER BY + val; + +-- SQLNESS SLEEP 2s +ADMIN flush_table('test_ttl'); + +ADMIN compact_table('test_ttl'); + +SELECT + val +from + test_ttl +ORDER BY + val; + +DROP TABLE test_ttl;