diff --git a/Cargo.lock b/Cargo.lock index eee95568ccd1..f0ea63351da4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3883,7 +3883,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aba235025ac5643c12bfdcefd656af11ad58ea8e#aba235025ac5643c12bfdcefd656af11ad58ea8e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b5412f72257c18410fdccbb893fa5d245b846141#b5412f72257c18410fdccbb893fa5d245b846141" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 061e0e8b186f..0f6f7b6fad25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 60af91742d19..bf7786bcaf66 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -478,8 +478,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", - Some(Expr::CreateFlowTask(_)) => "ddl.create_flow", - Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", + Some(Expr::CreateFlow(_)) => "ddl.create_flow", + Some(Expr::DropFlow(_)) => "ddl.drop_flow", None => "ddl.empty", } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 2304d49a70b9..212adf562698 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,14 +18,14 @@ use std::{fs, path}; use async_trait::async_trait; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; -use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; +use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; +use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; -use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; +use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -47,7 +47,7 @@ use frontend::server::Services; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; -use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; +use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; @@ -419,9 +419,9 @@ impl StartCommand { .step(10) .build(), ); - let flow_task_id_sequence = Arc::new( - SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) - .initial(MIN_USER_FLOW_TASK_ID as u64) + let flow_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_ID as u64) .step(10) .build(), ); @@ -431,14 +431,14 @@ impl StartCommand { )); let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); - let flow_task_meta_allocator = Arc::new( - FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), - ); + let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( + flow_id_sequence, + )); let ddl_task_executor = Self::create_ddl_task_executor( procedure_manager.clone(), @@ -446,8 +446,8 @@ impl StartCommand { multi_cache_invalidator, table_metadata_manager, table_meta_allocator, - flow_task_metadata_manager, - flow_task_meta_allocator, + flow_metadata_manager, + flow_meta_allocator, ) .await?; @@ -480,8 +480,8 @@ impl StartCommand { cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, - flow_task_metadata_manager: FlowTaskMetadataManagerRef, - flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, + flow_metadata_manager: FlowMetadataManagerRef, + flow_metadata_allocator: FlowMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( @@ -491,8 +491,8 @@ impl StartCommand { memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), table_metadata_manager, table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, + flow_metadata_manager, + flow_metadata_allocator, }, procedure_manager, true, diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 175435d89842..30024b03fa41 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -19,9 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private"; -/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage. +/// Reserves [0,MIN_USER_FLOW_ID) for internal usage. /// User defined table id starts from this value. -pub const MIN_USER_FLOW_TASK_ID: u32 = 1024; +pub const MIN_USER_FLOW_ID: u32 = 1024; /// Reserves [0,MIN_USER_TABLE_ID) for internal usage. /// User defined table id starts from this value. pub const MIN_USER_TABLE_ID: u32 = 1024; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 5bdcb1f68e2b..bc4563b2f567 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -19,10 +19,10 @@ use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; +use crate::ddl::flow_meta::FlowMetadataAllocatorRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; -use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::error::Result; -use crate::key::flow_task::FlowTaskMetadataManagerRef; +use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; @@ -39,9 +39,9 @@ pub mod create_table; mod create_table_template; pub mod drop_database; pub mod drop_table; +pub mod flow_meta; mod physical_table_metadata; pub mod table_meta; -pub mod task_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] @@ -110,8 +110,8 @@ pub struct DdlContext { pub table_metadata_manager: TableMetadataManagerRef, /// Allocator for table metadata. pub table_metadata_allocator: TableMetadataAllocatorRef, - /// Flow task metadata manager. - pub flow_task_metadata_manager: FlowTaskMetadataManagerRef, - /// Allocator for flow task metadata. - pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, + /// Flow metadata manager. + pub flow_metadata_manager: FlowMetadataManagerRef, + /// Allocator for flow metadata. + pub flow_metadata_allocator: FlowMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 018c7dc84276..195bb3a5679b 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -36,17 +36,17 @@ use super::utils::add_peer_context_if_needed; use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::Result; -use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; -use crate::key::FlowTaskId; -use crate::lock_key::{CatalogLock, FlowTaskNameLock, TableNameLock}; +use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::FlowId; +use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateFlowTask; use crate::{metrics, ClusterId}; -/// The procedure of flow task creation. +/// The procedure of flow creation. pub struct CreateFlowProcedure { pub context: DdlContext, - pub data: CreateFlowTaskData, + pub data: CreateFlowData, } impl CreateFlowProcedure { @@ -56,13 +56,13 @@ impl CreateFlowProcedure { pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { Self { context, - data: CreateFlowTaskData { + data: CreateFlowData { cluster_id, task, - flow_task_id: None, + flow_id: None, peers: vec![], source_table_ids: vec![], - state: CreateFlowTaskState::CreateMetadata, + state: CreateFlowState::CreateMetadata, }, } } @@ -76,21 +76,21 @@ impl CreateFlowProcedure { async fn on_prepare(&mut self) -> Result { self.check_creation().await?; self.collect_source_tables().await?; - self.allocate_flow_task_id().await?; - self.data.state = CreateFlowTaskState::CreateFlows; + self.allocate_flow_id().await?; + self.data.state = CreateFlowState::CreateFlows; Ok(Status::executing(true)) } async fn on_flownode_create_flows(&mut self) -> Result { // Safety: must be allocated. - let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + let mut create_flow = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { body: Some(PbFlowRequest::Create((&self.data).into())), }; - create_flow_task.push(async move { + create_flow.push(async move { requester .handle(request) .await @@ -98,29 +98,29 @@ impl CreateFlowProcedure { }); } - join_all(create_flow_task) + join_all(create_flow) .await .into_iter() .collect::>>()?; - self.data.state = CreateFlowTaskState::CreateMetadata; + self.data.state = CreateFlowState::CreateMetadata; Ok(Status::executing(true)) } - /// Creates flow task metadata. + /// Creates flow metadata. /// /// Abort(not-retry): /// - Failed to create table metadata. async fn on_create_metadata(&mut self) -> Result { - // Safety: The flow task id must be allocated. - let flow_task_id = self.data.flow_task_id.unwrap(); + // Safety: The flow id must be allocated. + let flow_id = self.data.flow_id.unwrap(); // TODO(weny): Support `or_replace`. self.context - .flow_task_metadata_manager - .create_flow_task_metadata(flow_task_id, (&self.data).into()) + .flow_metadata_manager + .create_flow_metadata(flow_id, (&self.data).into()) .await?; - info!("Created flow task metadata for flow {flow_task_id}"); - Ok(Status::done_with_output(flow_task_id)) + info!("Created flow metadata for flow {flow_id}"); + Ok(Status::done_with_output(flow_id)) } } @@ -133,14 +133,14 @@ impl Procedure for CreateFlowProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &self.data.state; - let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW .with_label_values(&[state.as_ref()]) .start_timer(); match state { - CreateFlowTaskState::Prepare => self.on_prepare().await, - CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await, - CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, + CreateFlowState::Prepare => self.on_prepare().await, + CreateFlowState::CreateFlows => self.on_flownode_create_flows().await, + CreateFlowState::CreateMetadata => self.on_create_metadata().await, } .map_err(handle_retry_error) } @@ -151,7 +151,7 @@ impl Procedure for CreateFlowProcedure { fn lock_key(&self) -> LockKey { let catalog_name = &self.data.task.catalog_name; - let task_name = &self.data.task.task_name; + let flow_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; LockKey::new(vec![ @@ -162,14 +162,14 @@ impl Procedure for CreateFlowProcedure { &sink_table_name.catalog_name, ) .into(), - FlowTaskNameLock::new(catalog_name, task_name).into(), + FlowNameLock::new(catalog_name, flow_name).into(), ]) } } -/// The state of [CreateFlowTaskProcedure]. +/// The state of [CreateFlowProcedure]. #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] -pub enum CreateFlowTaskState { +pub enum CreateFlowState { /// Prepares to create the flow. Prepare, /// Creates flows on the flownode. @@ -180,22 +180,22 @@ pub enum CreateFlowTaskState { /// The serializable data. #[derive(Debug, Serialize, Deserialize)] -pub struct CreateFlowTaskData { +pub struct CreateFlowData { pub(crate) cluster_id: ClusterId, - pub(crate) state: CreateFlowTaskState, + pub(crate) state: CreateFlowState, pub(crate) task: CreateFlowTask, - pub(crate) flow_task_id: Option, + pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, } -impl From<&CreateFlowTaskData> for CreateRequest { - fn from(value: &CreateFlowTaskData) -> Self { - let flow_task_id = value.flow_task_id.unwrap(); +impl From<&CreateFlowData> for CreateRequest { + fn from(value: &CreateFlowData) -> Self { + let flow_id = value.flow_id.unwrap(); let source_table_ids = &value.source_table_ids; CreateRequest { - task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), + flow_id: Some(api::v1::flow::TaskId { id: flow_id }), source_table_ids: source_table_ids .iter() .map(|table_id| api::v1::TableId { id: *table_id }) @@ -206,21 +206,21 @@ impl From<&CreateFlowTaskData> for CreateRequest { expire_when: value.task.expire_when.clone(), comment: value.task.comment.clone(), sql: value.task.sql.clone(), - task_options: value.task.options.clone(), + flow_options: value.task.flow_options.clone(), } } } -impl From<&CreateFlowTaskData> for FlowTaskInfoValue { - fn from(value: &CreateFlowTaskData) -> Self { +impl From<&CreateFlowData> for FlowInfoValue { + fn from(value: &CreateFlowData) -> Self { let CreateFlowTask { catalog_name, - task_name, + flow_name, sink_table_name, expire_when, comment, sql, - options, + flow_options: options, .. } = value.task.clone(); @@ -231,12 +231,12 @@ impl From<&CreateFlowTaskData> for FlowTaskInfoValue { .map(|(idx, peer)| (idx as u32, peer.id)) .collect::>(); - FlowTaskInfoValue { + FlowInfoValue { source_table_ids: value.source_table_ids.clone(), sink_table_name, flownode_ids, catalog_name, - task_name, + flow_name, raw_sql: sql, expire_when, comment, diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs index 6aa1ecb3ed00..27d8107991e0 100644 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -24,20 +24,20 @@ impl CreateFlowProcedure { /// - The sink table doesn't exist. pub(crate) async fn check_creation(&self) -> Result<()> { let catalog_name = &self.data.task.catalog_name; - let task_name = &self.data.task.task_name; + let flow_name = &self.data.task.flow_name; let sink_table_name = &self.data.task.sink_table_name; // Ensures the task name doesn't exist. let exists = self .context - .flow_task_metadata_manager - .flow_task_name_manager() - .exists(catalog_name, task_name) + .flow_metadata_manager + .flow_name_manager() + .exists(catalog_name, flow_name) .await?; ensure!( !exists, - error::TaskAlreadyExistsSnafu { - task_name: format!("{}.{}", catalog_name, task_name), + error::FlowAlreadyExistsSnafu { + flow_name: format!("{}.{}", catalog_name, flow_name), } ); diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index ce35ae91ca98..1681479d9173 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -19,16 +19,16 @@ use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { - /// Allocates the [FlowTaskId]. - pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { - // TODO(weny, ruihang): We don't support the partitions. It's always be 1, now. + /// Allocates the [FlowId]. + pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> { + //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now. let partitions = 1; - let (flow_task_id, peers) = self + let (flow_id, peers) = self .context - .flow_task_metadata_allocator + .flow_metadata_allocator .create(partitions) .await?; - self.data.flow_task_id = Some(flow_task_id); + self.data.flow_id = Some(flow_id); self.data.peers = peers; Ok(()) diff --git a/src/common/meta/src/ddl/task_meta.rs b/src/common/meta/src/ddl/flow_meta.rs similarity index 64% rename from src/common/meta/src/ddl/task_meta.rs rename to src/common/meta/src/ddl/flow_meta.rs index 3e8a4fb36cfc..d7aca9b84eaf 100644 --- a/src/common/meta/src/ddl/task_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -17,43 +17,43 @@ use std::sync::Arc; use tonic::async_trait; use crate::error::Result; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::peer::Peer; use crate::sequence::SequenceRef; -/// The reference of [FlowTaskMetadataAllocator]. -pub type FlowTaskMetadataAllocatorRef = Arc; +/// The reference of [FlowMetadataAllocator]. +pub type FlowMetadataAllocatorRef = Arc; -/// [FlowTaskMetadataAllocator] provides the ability of: -/// - [FlowTaskId] Allocation. +/// [FlowMetadataAllocator] provides the ability of: +/// - [FlowId] Allocation. /// - [FlownodeId] Selection. #[derive(Clone)] -pub struct FlowTaskMetadataAllocator { - flow_task_id_sequence: SequenceRef, +pub struct FlowMetadataAllocator { + flow_id_sequence: SequenceRef, partition_peer_allocator: PartitionPeerAllocatorRef, } -impl FlowTaskMetadataAllocator { - /// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator]. - pub fn with_noop_peer_allocator(flow_task_id_sequence: SequenceRef) -> Self { +impl FlowMetadataAllocator { + /// Returns the [FlowMetadataAllocator] with [NoopPartitionPeerAllocator]. + pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self { Self { - flow_task_id_sequence, + flow_id_sequence, partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator), } } - /// Allocates a the [FlowTaskId]. - pub(crate) async fn allocate_flow_task_id(&self) -> Result { - let flow_task_id = self.flow_task_id_sequence.next().await? as FlowTaskId; - Ok(flow_task_id) + /// Allocates a the [FlowId]. + pub(crate) async fn allocate_flow_id(&self) -> Result { + let flow_id = self.flow_id_sequence.next().await? as FlowId; + Ok(flow_id) } - /// Allocates the [FlowTaskId] and [Peer]s. - pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec)> { - let flow_task_id = self.allocate_flow_task_id().await?; + /// Allocates the [FlowId] and [Peer]s. + pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { + let flow_id = self.allocate_flow_id().await?; let peers = self.partition_peer_allocator.alloc(partitions).await?; - Ok((flow_task_id, peers)) + Ok((flow_id, peers)) } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 2f3d3a4eb45c..8af7211210c9 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -616,7 +616,7 @@ async fn handle_create_flow_task( })?); info!( "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", - create_flow_task.catalog_name, create_flow_task.task_name, + create_flow_task.catalog_name, create_flow_task.flow_name, ); Ok(SubmitDdlTaskResponse { @@ -756,11 +756,11 @@ mod tests { use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; + use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; - use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::DdlContext; - use crate::key::flow_task::FlowTaskMetadataManager; + use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; @@ -792,11 +792,10 @@ mod tests { Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), Arc::new(WalOptionsAllocator::default()), )); - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); - let flow_task_metadata_allocator = - Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( - Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()), - )); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()), + )); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); @@ -807,8 +806,8 @@ mod tests { cache_invalidator: Arc::new(DummyCacheInvalidator), table_metadata_manager, table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, + flow_metadata_manager, + flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), }, procedure_manager.clone(), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 40d5070c3bb3..cd7583967188 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -241,9 +241,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Task already exists: {}", task_name))] - TaskAlreadyExists { - task_name: String, + #[snafu(display("Flow already exists: {}", flow_name))] + FlowAlreadyExists { + flow_name: String, location: Location, }, @@ -511,7 +511,7 @@ impl ErrorExt for Error { | InvalidEngineType { .. } | AlterLogicalTablesInvalidArguments { .. } | CreateLogicalTablesInvalidArguments { .. } - | TaskAlreadyExists { .. } + | FlowAlreadyExists { .. } | MismatchPrefix { .. } | DelimiterNotFound { .. } => StatusCode::InvalidArguments, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 618b092c3f3b..30455181dcc3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -36,16 +36,16 @@ //! - The value is a [TableNameValue] struct; it contains the table id. //! - Used in the table name to table id lookup. //! -//! 6. Flow task info key: `__flow_task/{catalog}/info/{flow_task_id}` -//! - Stores metadata of the flow task. +//! 6. Flow info key: `__flow/{catalog}/info/{flow_id}` +//! - Stores metadata of the flow. //! -//! 7. Flow task name key: `__flow_task/{catalog}/name/{task_name}` -//! - Mapping {catalog}/{task_name} to {flow_task_id} +//! 7. Flow name key: `__flow/{catalog}/name/{flow_name}` +//! - Mapping {catalog}/{flow_name} to {flow_id} //! -//! 8. Flownode task key: `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}` -//! - Mapping {flownode_id} to {flow_task_id} +//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` +//! - Mapping {flownode_id} to {flow_id} //! -//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}` +//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. //! @@ -54,37 +54,35 @@ //! //! To simplify the managers used in struct fields and function parameters, we define "unify" //! table metadata manager: [TableMetadataManager] -//! and flow task metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager). +//! and flow metadata manager: [FlowMetadataManager](crate::key::flow::FlowMetadataManager). //! It contains all the managers defined above. It's recommended to just use this manager only. //! -//! The whole picture of flow task keys will be like this: +//! The whole picture of flow keys will be like this: //! -//! __flow_task/ +//! __flow/ //! {catalog}/ //! info/ //! {tsak_id} //! //! name/ -//! {task_name} +//! {flow_name} //! //! flownode/ -//! flownode_id/ -//! {flownode_id}/ -//! {task_id}/ -//! {partition_id} +//! {flownode_id}/ +//! {flow_id}/ +//! {partition_id} //! //! source_table/ -//! flow_task/ -//! {table_id}/ -//! {flownode_id}/ -//! {task_id}/ -//! {partition_id} +//! {table_id}/ +//! {flownode_id}/ +//! {flow_id}/ +//! {partition_id} pub mod catalog_name; pub mod datanode_table; /// TODO(weny):removes id. #[allow(unused)] -pub mod flow_task; +pub mod flow; pub mod schema_name; pub mod scope; pub mod table_info; @@ -123,8 +121,8 @@ use table_name::{TableNameKey, TableNameManager, TableNameValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; use self::datanode_table::RegionInfo; -use self::flow_task::flow_task_info::FlowTaskInfoValue; -use self::flow_task::flow_task_name::FlowTaskNameValue; +use self::flow::flow_info::FlowInfoValue; +use self::flow::flow_name::FlowNameValue; use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; @@ -159,10 +157,10 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [ pub type RegionDistribution = BTreeMap>; -/// The id of flow task. -pub type FlowTaskId = u32; -/// The partition of flow task. -pub type FlowTaskPartitionId = u32; +/// The id of flow. +pub type FlowId = u32; +/// The partition of flow. +pub type FlowPartitionId = u32; lazy_static! { static ref DATANODE_TABLE_KEY_PATTERN: Regex = @@ -1054,8 +1052,8 @@ impl_table_meta_value! { TableNameValue, TableInfoValue, DatanodeTableValue, - FlowTaskInfoValue, - FlowTaskNameValue + FlowInfoValue, + FlowNameValue } impl_optional_meta_value! { diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow.rs similarity index 52% rename from src/common/meta/src/key/flow_task.rs rename to src/common/meta/src/key/flow.rs index 5e1c2e427ab6..cbda6aa88276 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod flow_task_info; -pub(crate) mod flow_task_name; -pub(crate) mod flownode_task; -pub(crate) mod table_task; +pub(crate) mod flow_info; +pub(crate) mod flow_name; +pub(crate) mod flownode_flow; +pub(crate) mod table_flow; use std::ops::Deref; use std::sync::Arc; @@ -23,26 +23,26 @@ use std::sync::Arc; use common_telemetry::info; use snafu::{ensure, OptionExt}; -use self::flow_task_info::FlowTaskInfoValue; +use self::flow_info::FlowInfoValue; use crate::ensure_values; use crate::error::{self, Result}; -use crate::key::flow_task::flow_task_info::FlowTaskInfoManager; -use crate::key::flow_task::flow_task_name::FlowTaskNameManager; -use crate::key::flow_task::flownode_task::FlownodeTaskManager; -use crate::key::flow_task::table_task::TableTaskManager; +use crate::key::flow::flow_info::FlowInfoManager; +use crate::key::flow::flow_name::FlowNameManager; +use crate::key::flow::flownode_flow::FlownodeFlowManager; +use crate::key::flow::table_flow::TableFlowManager; use crate::key::scope::MetaKey; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; -/// The key of `__flow_task/` scope. +/// The key of `__flow/` scope. #[derive(Debug, PartialEq)] -pub struct FlowTaskScoped { +pub struct FlowScoped { inner: T, } -impl Deref for FlowTaskScoped { +impl Deref for FlowScoped { type Target = T; fn deref(&self) -> &Self::Target { @@ -50,18 +50,18 @@ impl Deref for FlowTaskScoped { } } -impl FlowTaskScoped { - const PREFIX: &'static str = "__flow_task/"; +impl FlowScoped { + const PREFIX: &'static str = "__flow/"; - /// Returns a new [FlowTaskScoped] key. - pub fn new(inner: T) -> FlowTaskScoped { + /// Returns a new [FlowScoped] key. + pub fn new(inner: T) -> FlowScoped { Self { inner } } } -impl> MetaKey> for FlowTaskScoped { +impl> MetaKey> for FlowScoped { fn to_bytes(&self) -> Vec { - let prefix = FlowTaskScoped::::PREFIX.as_bytes(); + let prefix = FlowScoped::::PREFIX.as_bytes(); let inner = self.inner.to_bytes(); let mut bytes = Vec::with_capacity(prefix.len() + inner.len()); bytes.extend(prefix); @@ -69,8 +69,8 @@ impl> MetaKey> for FlowTaskScoped { bytes } - fn from_bytes(bytes: &[u8]) -> Result> { - let prefix = FlowTaskScoped::::PREFIX.as_bytes(); + fn from_bytes(bytes: &[u8]) -> Result> { + let prefix = FlowScoped::::PREFIX.as_bytes(); ensure!( bytes.starts_with(prefix), error::MismatchPrefixSnafu { @@ -79,140 +79,134 @@ impl> MetaKey> for FlowTaskScoped { } ); let inner = T::from_bytes(&bytes[prefix.len()..])?; - Ok(FlowTaskScoped { inner }) + Ok(FlowScoped { inner }) } } -pub type FlowTaskMetadataManagerRef = Arc; +pub type FlowMetadataManagerRef = Arc; /// The manager of metadata, provides ability to: -/// - Create metadata of the task. -/// - Retrieve metadata of the task. -/// - Delete metadata of the task. -pub struct FlowTaskMetadataManager { - flow_task_info_manager: FlowTaskInfoManager, - flownode_task_manager: FlownodeTaskManager, - table_task_manager: TableTaskManager, - flow_task_name_manager: FlowTaskNameManager, +/// - Create metadata of the flow. +/// - Retrieve metadata of the flow. +/// - Delete metadata of the flow. +pub struct FlowMetadataManager { + flow_info_manager: FlowInfoManager, + flownode_flow_manager: FlownodeFlowManager, + table_flow_manager: TableFlowManager, + flow_name_manager: FlowNameManager, kv_backend: KvBackendRef, } -impl FlowTaskMetadataManager { - /// Returns a new [FlowTaskMetadataManager]. +impl FlowMetadataManager { + /// Returns a new [FlowMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()), - flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), - flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), - table_task_manager: TableTaskManager::new(kv_backend.clone()), + flow_info_manager: FlowInfoManager::new(kv_backend.clone()), + flow_name_manager: FlowNameManager::new(kv_backend.clone()), + flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()), + table_flow_manager: TableFlowManager::new(kv_backend.clone()), kv_backend, } } - /// Returns the [FlowTaskNameManager]. - pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager { - &self.flow_task_name_manager + /// Returns the [FlowNameManager]. + pub fn flow_name_manager(&self) -> &FlowNameManager { + &self.flow_name_manager } - /// Returns the [FlowTaskInfoManager]. - pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager { - &self.flow_task_info_manager + /// Returns the [FlowManager]. + pub fn flow_info_manager(&self) -> &FlowInfoManager { + &self.flow_info_manager } - /// Returns the [FlownodeTaskManager]. - pub fn flownode_task_manager(&self) -> &FlownodeTaskManager { - &self.flownode_task_manager + /// Returns the [FlownodeFlowManager]. + pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager { + &self.flownode_flow_manager } - /// Returns the [TableTaskManager]. - pub fn table_task_manager(&self) -> &TableTaskManager { - &self.table_task_manager + /// Returns the [TableFlowManager]. + pub fn table_flow_manager(&self) -> &TableFlowManager { + &self.table_flow_manager } - /// Creates metadata for task and returns an error if different metadata exists. - pub async fn create_flow_task_metadata( + /// Creates metadata for flow and returns an error if different metadata exists. + pub async fn create_flow_metadata( &self, - flow_task_id: FlowTaskId, - flow_task_value: FlowTaskInfoValue, + flow_id: FlowId, + flow_value: FlowInfoValue, ) -> Result<()> { - let (create_flow_task_name_txn, on_create_flow_task_name_failure) = - self.flow_task_name_manager.build_create_txn( - &flow_task_value.catalog_name, - &flow_task_value.task_name, - flow_task_id, - )?; - - let (create_flow_task_txn, on_create_flow_task_failure) = - self.flow_task_info_manager.build_create_txn( - &flow_task_value.catalog_name, - flow_task_id, - &flow_task_value, - )?; - - let create_flownode_task_txn = self.flownode_task_manager.build_create_txn( - &flow_task_value.catalog_name, - flow_task_id, - flow_task_value.flownode_ids().clone(), + let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self + .flow_name_manager + .build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?; + + let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager.build_create_txn( + &flow_value.catalog_name, + flow_id, + &flow_value, + )?; + + let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn( + &flow_value.catalog_name, + flow_id, + flow_value.flownode_ids().clone(), ); - let create_table_task_txn = self.table_task_manager.build_create_txn( - &flow_task_value.catalog_name, - flow_task_id, - flow_task_value.flownode_ids().clone(), - flow_task_value.source_table_ids(), + let create_table_flow_txn = self.table_flow_manager.build_create_txn( + &flow_value.catalog_name, + flow_id, + flow_value.flownode_ids().clone(), + flow_value.source_table_ids(), ); let txn = Txn::merge_all(vec![ - create_flow_task_name_txn, - create_flow_task_txn, - create_flownode_task_txn, - create_table_task_txn, + create_flow_flow_name_txn, + create_flow_txn, + create_flownode_flow_txn, + create_table_flow_txn, ]); info!( - "Creating flow task {}.{}({}), with {} txn operations", - flow_task_value.catalog_name, - flow_task_value.task_name, - flow_task_id, + "Creating flow {}.{}({}), with {} txn operations", + flow_value.catalog_name, + flow_value.flow_name, + flow_id, txn.max_operations() ); let mut resp = self.kv_backend.txn(txn).await?; if !resp.succeeded { let mut set = TxnOpGetResponseSet::from(&mut resp.responses); - let remote_flow_task_name = on_create_flow_task_name_failure(&mut set)? - .with_context(||error::UnexpectedSnafu { - err_msg: format!( - "Reads the empty flow task name during the creating flow task, flow_task_id: {flow_task_id}" + let remote_flow_flow_name = + on_create_flow_flow_name_failure(&mut set)?.with_context(|| { + error::UnexpectedSnafu { + err_msg: format!( + "Reads the empty flow name during the creating flow, flow_id: {flow_id}" ), + } })?; - if remote_flow_task_name.flow_task_id() != flow_task_id { + if remote_flow_flow_name.flow_id() != flow_id { info!( - "Trying to create flow task {}.{}({}), but flow task({}) already exists", - flow_task_value.catalog_name, - flow_task_value.task_name, - flow_task_id, - remote_flow_task_name.flow_task_id() + "Trying to create flow {}.{}({}), but flow({}) already exists", + flow_value.catalog_name, + flow_value.flow_name, + flow_id, + remote_flow_flow_name.flow_id() ); - return error::TaskAlreadyExistsSnafu { - task_name: format!( - "{}.{}", - flow_task_value.catalog_name, flow_task_value.task_name - ), + return error::FlowAlreadyExistsSnafu { + flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name), } .fail(); } - let remote_flow_task = on_create_flow_task_failure(&mut set)?.with_context(|| { - error::UnexpectedSnafu { + let remote_flow = + on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { err_msg: format!( - "Reads the empty flow task during the creating flow task, flow_task_id: {flow_task_id}" - ), - } - })?; - let op_name = "creating flow task"; - ensure_values!(*remote_flow_task, flow_task_value, op_name); + "Reads the empty flow during the creating flow, flow_id: {flow_id}" + ), + })?; + let op_name = "creating flow"; + ensure_values!(*remote_flow, flow_value, op_name); } Ok(()) @@ -227,7 +221,7 @@ mod tests { use futures::TryStreamExt; use super::*; - use crate::key::flow_task::table_task::TableTaskKey; + use crate::key::flow::table_flow::TableFlowKey; use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; use crate::table_name::TableName; @@ -251,19 +245,19 @@ mod tests { #[test] fn test_flow_scoped_to_bytes() { - let key = FlowTaskScoped::new(CatalogScoped::new( + let key = FlowScoped::new(CatalogScoped::new( "my_catalog".to_string(), MockKey { inner: b"hi".to_vec(), }, )); - assert_eq!(b"__flow_task/my_catalog/hi".to_vec(), key.to_bytes()); + assert_eq!(b"__flow/my_catalog/hi".to_vec(), key.to_bytes()); } #[test] fn test_flow_scoped_from_bytes() { - let bytes = b"__flow_task/my_catalog/hi"; - let key = FlowTaskScoped::>::from_bytes(bytes).unwrap(); + let bytes = b"__flow/my_catalog/hi"; + let key = FlowScoped::>::from_bytes(bytes).unwrap(); assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.inner.inner, b"hi".to_vec()); } @@ -271,24 +265,24 @@ mod tests { #[test] fn test_flow_scoped_from_bytes_mismatch() { let bytes = b"__table/my_catalog/hi"; - let err = FlowTaskScoped::>::from_bytes(bytes).unwrap_err(); + let err = FlowScoped::>::from_bytes(bytes).unwrap_err(); assert_matches!(err, error::Error::MismatchPrefix { .. }); } #[tokio::test] async fn test_create_flow_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); - let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); - let task_id = 10; + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskInfoValue { + let flow_value = FlowInfoValue { catalog_name: catalog_name.to_string(), - task_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -298,42 +292,42 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); let got = flow_metadata_manager - .flow_task_info_manager() - .get(catalog_name, task_id) + .flow_info_manager() + .get(catalog_name, flow_id) .await .unwrap() .unwrap(); - assert_eq!(got, flow_task_value); - let tasks = flow_metadata_manager - .flownode_task_manager() - .tasks(catalog_name, 1) + assert_eq!(got, flow_value); + let flows = flow_metadata_manager + .flownode_flow_manager() + .flows(catalog_name, 1) .try_collect::>() .await .unwrap(); - assert_eq!(tasks, vec![(task_id, 0)]); + assert_eq!(flows, vec![(flow_id, 0)]); for table_id in [1024, 1025, 1026] { let nodes = flow_metadata_manager - .table_task_manager() + .table_flow_manager() .nodes(catalog_name, table_id) .try_collect::>() .await .unwrap(); assert_eq!( nodes, - vec![TableTaskKey::new( + vec![TableFlowKey::new( catalog_name.to_string(), table_id, 1, - task_id, + flow_id, 0 )] ); @@ -341,19 +335,19 @@ mod tests { } #[tokio::test] - async fn test_create_table_metadata_task_exists_err() { + async fn test_create_table_metadata_flow_exists_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); - let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); - let task_id = 10; + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskInfoValue { + let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - task_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), @@ -363,13 +357,13 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. - let flow_task_value = FlowTaskInfoValue { + let flow_value = FlowInfoValue { catalog_name: catalog_name.to_string(), - task_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -379,26 +373,26 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_task_metadata(task_id + 1, flow_task_value) + .create_flow_metadata(flow_id + 1, flow_value) .await .unwrap_err(); - assert_matches!(err, error::Error::TaskAlreadyExists { .. }); + assert_matches!(err, error::Error::FlowAlreadyExists { .. }); } #[tokio::test] async fn test_create_table_metadata_unexpected_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); - let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); - let task_id = 10; + let flow_metadata_manager = FlowMetadataManager::new(mem_kv); + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskInfoValue { + let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - task_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), @@ -408,7 +402,7 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. @@ -417,9 +411,9 @@ mod tests { schema_name: "my_schema".to_string(), table_name: "another_sink_table".to_string(), }; - let flow_task_value = FlowTaskInfoValue { + let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - task_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -429,7 +423,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value) + .create_flow_metadata(flow_id, flow_value) .await .unwrap_err(); assert!(err.to_string().contains("Reads the different value")); diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs new file mode 100644 index 000000000000..f9b9ae4b259d --- /dev/null +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -0,0 +1,212 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; + +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::scope::{CatalogScoped, MetaKey}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; +use crate::FlownodeId; + +const FLOW_INFO_KEY_PREFIX: &str = "info"; + +lazy_static! { + static ref FLOW_INFO_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + +/// The key stores the metadata of the flow. +/// +/// The layout: `__flow/{catalog}/info/{flow_id}`. +pub struct FlowInfoKey(FlowScoped>); + +impl MetaKey for FlowInfoKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlowInfoKey( + FlowScoped::>::from_bytes(bytes)?, + )) + } +} + +impl FlowInfoKey { + /// Returns the [FlowInfoKey]. + pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey { + let inner = FlowInfoKeyInner::new(flow_id); + FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Returns the [FlowId]. + pub fn flow_id(&self) -> FlowId { + self.0.flow_id + } +} + +/// The key of flow metadata. +#[derive(Debug, Clone, Copy, PartialEq)] +struct FlowInfoKeyInner { + flow_id: FlowId, +} + +impl FlowInfoKeyInner { + /// Returns a [FlowInfoKey] with the specified `flow_id`. + pub fn new(flow_id: FlowId) -> FlowInfoKeyInner { + FlowInfoKeyInner { flow_id } + } +} + +impl MetaKey for FlowInfoKeyInner { + fn to_bytes(&self) -> Vec { + format!("{FLOW_INFO_KEY_PREFIX}/{}", self.flow_id).into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOW_INFO_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlowInfoKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flow_id = captures[1].parse::().unwrap(); + Ok(FlowInfoKeyInner { flow_id }) + } +} + +// The metadata of the flow. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FlowInfoValue { + /// The source tables used by the flow. + pub(crate) source_table_ids: Vec, + /// The sink table used by the flow. + pub(crate) sink_table_name: TableName, + /// Which flow nodes this flow is running on. + pub(crate) flownode_ids: BTreeMap, + /// The catalog name. + pub(crate) catalog_name: String, + /// The flow name. + pub(crate) flow_name: String, + /// The raw sql. + pub(crate) raw_sql: String, + /// The expr of expire. + pub(crate) expire_when: String, + /// The comment. + pub(crate) comment: String, + /// The options. + pub(crate) options: HashMap, +} + +impl FlowInfoValue { + /// Returns the `flownode_id`. + pub fn flownode_ids(&self) -> &BTreeMap { + &self.flownode_ids + } + + /// Returns the `source_table`. + pub fn source_table_ids(&self) -> &[TableId] { + &self.source_table_ids + } +} + +/// The manager of [FlowInfoKey]. +pub struct FlowInfoManager { + kv_backend: KvBackendRef, +} + +impl FlowInfoManager { + /// Returns a new [FlowInfoManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Returns the [FlowInfoValue] of specified `flow_id`. + pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result> { + let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); + self.kv_backend + .get(&key) + .await? + .map(|x| FlowInfoValue::try_from_raw_value(&x.value)) + .transpose() + } + + /// Builds a create flow transaction. + /// It is expected that the `__flow/{catalog}/info/{flow_id}` wasn't occupied. + /// Otherwise, the transaction will retrieve existing value. + pub(crate) fn build_create_txn( + &self, + catalog: &str, + flow_id: FlowId, + flow_value: &FlowInfoValue, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); + let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let flow_info = FlowInfoKey::new("my_catalog".to_string(), 2); + assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_info.to_bytes()); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow/my_catalog/info/2".to_vec(); + let key = FlowInfoKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flow_id(), 2); + } +} diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs new file mode 100644 index 000000000000..dbb6d81c35b1 --- /dev/null +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -0,0 +1,203 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::scope::{CatalogScoped, MetaKey}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; + +const FLOW_NAME_KEY_PREFIX: &str = "name"; + +lazy_static! { + static ref FLOW_NAME_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap(); +} + +/// The key of mapping {flow_name} to [FlowId]. +/// +/// The layout: `__flow/{catalog}/name/{flow_name}`. +pub struct FlowNameKey(FlowScoped>); + +impl FlowNameKey { + /// Returns the [FlowNameKey] + pub fn new(catalog: String, flow_name: String) -> FlowNameKey { + let inner = FlowNameKeyInner::new(flow_name); + FlowNameKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Return the `flow_name` + pub fn flow_name(&self) -> &str { + &self.0.flow_name + } +} + +impl MetaKey for FlowNameKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlowNameKey( + FlowScoped::>::from_bytes(bytes)?, + )) + } +} + +/// The key of mapping name to [FlowId] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FlowNameKeyInner { + pub flow_name: String, +} + +impl MetaKey for FlowNameKeyInner { + fn to_bytes(&self) -> Vec { + format!("{FLOW_NAME_KEY_PREFIX}/{}", self.flow_name).into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlowNameKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOW_NAME_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlowNameKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flow_name = captures[1].to_string(); + Ok(FlowNameKeyInner { flow_name }) + } +} + +impl FlowNameKeyInner { + /// Returns a [FlowNameKeyInner]. + pub fn new(flow_name: String) -> Self { + Self { flow_name } + } +} + +/// The value of [FlowNameKey]. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct FlowNameValue { + flow_id: FlowId, +} + +impl FlowNameValue { + /// Returns a [FlowNameValue] with specified [FlowId]. + pub fn new(flow_id: FlowId) -> Self { + Self { flow_id } + } + + /// Returns the [FlowId] + pub fn flow_id(&self) -> FlowId { + self.flow_id + } +} + +/// The manager of [FlowNameKey]. +pub struct FlowNameManager { + kv_backend: KvBackendRef, +} + +impl FlowNameManager { + /// Returns a new [FlowNameManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Returns the [FlowNameValue] of specified `catalog.flow`. + pub async fn get(&self, catalog: &str, flow: &str) -> Result> { + let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| FlowNameValue::try_from_raw_value(&x.value)) + .transpose() + } + + /// Returns true if the `flow` exists. + pub async fn exists(&self, catalog: &str, flow: &str) -> Result { + let key = FlowNameKey::new(catalog.to_string(), flow.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + + /// Builds a create flow name transaction. + /// It's expected that the `__flow/{catalog}/name/{flow_name}` wasn't occupied. + /// Otherwise, the transaction will retrieve existing value. + pub fn build_create_txn( + &self, + catalog: &str, + name: &str, + flow_id: FlowId, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = FlowNameKey::new(catalog.to_string(), name.to_string()); + let raw_key = key.to_bytes(); + let flow_flow_name_value = FlowNameValue::new(flow_id); + let txn = txn_helper::build_put_if_absent_txn( + raw_key.clone(), + flow_flow_name_value.try_as_raw_value()?, + ); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string()); + assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow/my_catalog/name/my_task".to_vec(); + let key = FlowNameKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flow_name(), "my_task"); + } +} diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs new file mode 100644 index 000000000000..360b96b0f56f --- /dev/null +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -0,0 +1,251 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::TryStreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; +use crate::key::{FlowId, FlowPartitionId}; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; +use crate::FlownodeId; + +lazy_static! { + static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!( + "^{FLOWNODE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$" + )) + .unwrap(); +} + +const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode"; + +/// The key of mapping [FlownodeId] to [FlowId]. +/// +/// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` +pub struct FlownodeFlowKey(FlowScoped>); + +impl MetaKey for FlownodeFlowKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlownodeFlowKey(FlowScoped::< + CatalogScoped, + >::from_bytes(bytes)?)) + } +} + +impl FlownodeFlowKey { + /// Returns a new [FlownodeFlowKey]. + pub fn new( + catalog: String, + flownode_id: FlownodeId, + flow_id: FlowId, + partition_id: FlowPartitionId, + ) -> FlownodeFlowKey { + let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id); + FlownodeFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + } + + /// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`. + pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec { + let catalog_scoped_key = CatalogScoped::new( + catalog, + BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes()), + ); + + FlowScoped::new(catalog_scoped_key).to_bytes() + } + + /// Returns the catalog. + pub fn catalog(&self) -> &str { + self.0.catalog() + } + + /// Returns the [FlowId]. + pub fn flow_id(&self) -> FlowId { + self.0.flow_id + } + + /// Returns the [FlownodeId]. + pub fn flownode_id(&self) -> FlownodeId { + self.0.flownode_id + } + + /// Returns the [PartitionId]. + pub fn partition_id(&self) -> FlowPartitionId { + self.0.partition_id + } +} + +/// The key of mapping [FlownodeId] to [FlowId]. +pub struct FlownodeFlowKeyInner { + flownode_id: FlownodeId, + flow_id: FlowId, + partition_id: FlowPartitionId, +} + +impl FlownodeFlowKeyInner { + /// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`. + pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self { + Self { + flownode_id, + flow_id, + partition_id, + } + } + + fn prefix(flownode_id: FlownodeId) -> String { + format!("{}/{flownode_id}", FLOWNODE_FLOW_KEY_PREFIX) + } + + /// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`. + fn range_start_key(flownode_id: FlownodeId) -> String { + format!("{}/", Self::prefix(flownode_id)) + } +} + +impl MetaKey for FlownodeFlowKeyInner { + fn to_bytes(&self) -> Vec { + format!( + "{FLOWNODE_FLOW_KEY_PREFIX}/{}/{}/{}", + self.flownode_id, self.flow_id, self.partition_id, + ) + .into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlownodeFlowKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOWNODE_FLOW_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlownodeFlowKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flownode_id = captures[1].parse::().unwrap(); + let flow_id = captures[2].parse::().unwrap(); + let partition_id = captures[3].parse::().unwrap(); + + Ok(FlownodeFlowKeyInner { + flownode_id, + flow_id, + partition_id, + }) + } +} + +/// The manager of [FlownodeFlowKey]. +pub struct FlownodeFlowManager { + kv_backend: KvBackendRef, +} + +/// Decodes `KeyValue` to [FlownodeFlowKey]. +pub fn flownode_flow_key_decoder(kv: KeyValue) -> Result { + FlownodeFlowKey::from_bytes(&kv.key) +} + +impl FlownodeFlowManager { + /// Returns a new [FlownodeFlowManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`. + pub fn flows( + &self, + catalog: &str, + flownode_id: FlownodeId, + ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> { + let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id); + let req = RangeRequest::new().with_prefix(start_key); + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(flownode_flow_key_decoder), + ); + + Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id()))) + } + + /// Builds a create flownode flow transaction. + /// + /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys. + pub(crate) fn build_create_txn>( + &self, + catalog: &str, + flow_id: FlowId, + flownode_ids: I, + ) -> Txn { + let txns = flownode_ids + .into_iter() + .map(|(partition_id, flownode_id)| { + let key = + FlownodeFlowKey::new(catalog.to_string(), flownode_id, flow_id, partition_id) + .to_bytes(); + TxnOp::Put(key, vec![]) + }) + .collect::>(); + + Txn::new().and_then(txns) + } +} + +#[cfg(test)] +mod tests { + use crate::key::flow::flownode_flow::FlownodeFlowKey; + use crate::key::scope::MetaKey; + + #[test] + fn test_key_serialization() { + let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0); + assert_eq!( + b"__flow/my_catalog/flownode/1/2/0".to_vec(), + flownode_flow.to_bytes() + ); + let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1); + assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix); + } + + #[test] + fn test_key_deserialization() { + let bytes = b"__flow/my_catalog/flownode/1/2/0".to_vec(); + let key = FlownodeFlowKey::from_bytes(&bytes).unwrap(); + assert_eq!(key.catalog(), "my_catalog"); + assert_eq!(key.flownode_id(), 1); + assert_eq!(key.flow_id(), 2); + assert_eq!(key.partition_id(), 0); + } +} diff --git a/src/common/meta/src/key/flow_task/table_task.rs b/src/common/meta/src/key/flow/table_flow.rs similarity index 56% rename from src/common/meta/src/key/flow_task/table_task.rs rename to src/common/meta/src/key/flow/table_flow.rs index dd0d34adcfba..d3cabd86f276 100644 --- a/src/common/meta/src/key/flow_task/table_task.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -21,9 +21,9 @@ use snafu::OptionExt; use table::metadata::TableId; use crate::error::{self, Result}; -use crate::key::flow_task::FlowTaskScoped; +use crate::key::flow::FlowScoped; use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowTaskId, FlowTaskPartitionId}; +use crate::key::{FlowId, FlowPartitionId}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -31,63 +31,63 @@ use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; use crate::FlownodeId; -const TABLE_TASK_KEY_PREFIX: &str = "source_table"; +const TABLE_FLOW_KEY_PREFIX: &str = "source_table"; lazy_static! { - static ref TABLE_TASK_KEY_PATTERN: Regex = Regex::new(&format!( - "^{TABLE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$" + static ref TABLE_FLOW_KEY_PATTERN: Regex = Regex::new(&format!( + "^{TABLE_FLOW_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$" )) .unwrap(); } -/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +/// The key of mapping [TableId] to [FlownodeId] and [FlowId]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct TableTaskKeyInner { +struct TableFlowKeyInner { table_id: TableId, flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, + flow_id: FlowId, + partition_id: FlowPartitionId, } -/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId]. +/// The key of mapping [TableId] to [FlownodeId] and [FlowId]. /// -/// The layout: `__flow_task/{catalog}/table/{table_id}/{flownode_id}/{flow_task_id}/{partition_id}`. +/// The layout: `__flow/{catalog}/table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`. #[derive(Debug, PartialEq)] -pub struct TableTaskKey(FlowTaskScoped>); +pub struct TableFlowKey(FlowScoped>); -impl MetaKey for TableTaskKey { +impl MetaKey for TableFlowKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { - Ok(TableTaskKey(FlowTaskScoped::< - CatalogScoped, - >::from_bytes(bytes)?)) + fn from_bytes(bytes: &[u8]) -> Result { + Ok(TableFlowKey( + FlowScoped::>::from_bytes(bytes)?, + )) } } -impl TableTaskKey { - /// Returns a new [TableTaskKey]. +impl TableFlowKey { + /// Returns a new [TableFlowKey]. pub fn new( catalog: String, table_id: TableId, flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, - ) -> TableTaskKey { - let inner = TableTaskKeyInner::new(table_id, flownode_id, flow_task_id, partition_id); - TableTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + flow_id: FlowId, + partition_id: FlowPartitionId, + ) -> TableFlowKey { + let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id); + TableFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) } - /// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`. + /// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`. pub fn range_start_key(catalog: String, table_id: TableId) -> Vec { let catalog_scoped_key = CatalogScoped::new( catalog, - BytesAdapter::from(TableTaskKeyInner::range_start_key(table_id).into_bytes()), + BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes()), ); - FlowTaskScoped::new(catalog_scoped_key).to_bytes() + FlowScoped::new(catalog_scoped_key).to_bytes() } /// Returns the catalog. @@ -100,9 +100,9 @@ impl TableTaskKey { self.0.table_id } - /// Returns the [FlowTaskId]. - pub fn flow_task_id(&self) -> FlowTaskId { - self.0.flow_task_id + /// Returns the [FlowId]. + pub fn flow_id(&self) -> FlowId { + self.0.flow_id } /// Returns the [FlownodeId]. @@ -111,117 +111,117 @@ impl TableTaskKey { } /// Returns the [PartitionId]. - pub fn partition_id(&self) -> FlowTaskPartitionId { + pub fn partition_id(&self) -> FlowPartitionId { self.0.partition_id } } -impl TableTaskKeyInner { - /// Returns a new [TableTaskKey]. +impl TableFlowKeyInner { + /// Returns a new [TableFlowKey]. fn new( table_id: TableId, flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, - ) -> TableTaskKeyInner { + flow_id: FlowId, + partition_id: FlowPartitionId, + ) -> TableFlowKeyInner { Self { table_id, flownode_id, - flow_task_id, + flow_id, partition_id, } } fn prefix(table_id: TableId) -> String { - format!("{}/{table_id}", TABLE_TASK_KEY_PREFIX) + format!("{}/{table_id}", TABLE_FLOW_KEY_PREFIX) } - /// The prefix used to retrieve all [TableTaskKey]s with the specified `table_id`. + /// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`. fn range_start_key(table_id: TableId) -> String { format!("{}/", Self::prefix(table_id)) } } -impl MetaKey for TableTaskKeyInner { +impl MetaKey for TableFlowKeyInner { fn to_bytes(&self) -> Vec { format!( - "{TABLE_TASK_KEY_PREFIX}/{}/{}/{}/{}", - self.table_id, self.flownode_id, self.flow_task_id, self.partition_id + "{TABLE_FLOW_KEY_PREFIX}/{}/{}/{}/{}", + self.table_id, self.flownode_id, self.flow_id, self.partition_id ) .into_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { + fn from_bytes(bytes: &[u8]) -> Result { let key = std::str::from_utf8(bytes).map_err(|e| { error::InvalidTableMetadataSnafu { err_msg: format!( - "TableTaskKeyInner '{}' is not a valid UTF8 string: {e}", + "TableFlowKeyInner '{}' is not a valid UTF8 string: {e}", String::from_utf8_lossy(bytes) ), } .build() })?; let captures = - TABLE_TASK_KEY_PATTERN + TABLE_FLOW_KEY_PATTERN .captures(key) .context(error::InvalidTableMetadataSnafu { - err_msg: format!("Invalid TableTaskKeyInner '{key}'"), + err_msg: format!("Invalid TableFlowKeyInner '{key}'"), })?; // Safety: pass the regex check above let table_id = captures[1].parse::().unwrap(); let flownode_id = captures[2].parse::().unwrap(); - let flow_task_id = captures[3].parse::().unwrap(); - let partition_id = captures[4].parse::().unwrap(); - Ok(TableTaskKeyInner::new( + let flow_id = captures[3].parse::().unwrap(); + let partition_id = captures[4].parse::().unwrap(); + Ok(TableFlowKeyInner::new( table_id, flownode_id, - flow_task_id, + flow_id, partition_id, )) } } -/// Decodes `KeyValue` to [TableTaskKey]. -pub fn table_task_decoder(kv: KeyValue) -> Result { - TableTaskKey::from_bytes(&kv.key) +/// Decodes `KeyValue` to [TableFlowKey]. +pub fn table_flow_decoder(kv: KeyValue) -> Result { + TableFlowKey::from_bytes(&kv.key) } -/// The manager of [TableTaskKey]. -pub struct TableTaskManager { +/// The manager of [TableFlowKey]. +pub struct TableFlowManager { kv_backend: KvBackendRef, } -impl TableTaskManager { - /// Returns a new [TableTaskManager]. +impl TableFlowManager { + /// Returns a new [TableFlowManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } } - /// Retrieves all [TableTaskKey]s of the specified `table_id`. + /// Retrieves all [TableFlowKey]s of the specified `table_id`. pub fn nodes( &self, catalog: &str, table_id: TableId, - ) -> BoxStream<'static, Result> { - let start_key = TableTaskKey::range_start_key(catalog.to_string(), table_id); + ) -> BoxStream<'static, Result> { + let start_key = TableFlowKey::range_start_key(catalog.to_string(), table_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, - Arc::new(table_task_decoder), + Arc::new(table_flow_decoder), ); Box::pin(stream) } - /// Builds a create table task transaction. + /// Builds a create table flow transaction. /// - /// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys. - pub fn build_create_txn>( + /// Puts `__table_flow/{table_id}/{node_id}/{partition_id}` keys. + pub fn build_create_txn>( &self, catalog: &str, - flow_task_id: FlowTaskId, + flow_id: FlowId, flownode_ids: I, source_table_ids: &[TableId], ) -> Txn { @@ -230,11 +230,11 @@ impl TableTaskManager { .flat_map(|(partition_id, flownode_id)| { source_table_ids.iter().map(move |table_id| { TxnOp::Put( - TableTaskKey::new( + TableFlowKey::new( catalog.to_string(), *table_id, flownode_id, - flow_task_id, + flow_id, partition_id, ) .to_bytes(), @@ -254,26 +254,23 @@ mod tests { #[test] fn test_key_serialization() { - let table_task_key = TableTaskKey::new("my_catalog".to_string(), 1024, 1, 2, 0); - assert_eq!( - b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(), - table_task_key.to_bytes(), - ); - let prefix = TableTaskKey::range_start_key("my_catalog".to_string(), 1024); + let table_flow_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0); assert_eq!( - b"__flow_task/my_catalog/source_table/1024/".to_vec(), - prefix + b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(), + table_flow_key.to_bytes(), ); + let prefix = TableFlowKey::range_start_key("my_catalog".to_string(), 1024); + assert_eq!(b"__flow/my_catalog/source_table/1024/".to_vec(), prefix); } #[test] fn test_key_deserialization() { - let bytes = b"__flow_task/my_catalog/source_table/1024/1/2/0".to_vec(); - let key = TableTaskKey::from_bytes(&bytes).unwrap(); + let bytes = b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(); + let key = TableFlowKey::from_bytes(&bytes).unwrap(); assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.source_table_id(), 1024); assert_eq!(key.flownode_id(), 1); - assert_eq!(key.flow_task_id(), 2); + assert_eq!(key.flow_id(), 2); assert_eq!(key.partition_id(), 0); } } diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs deleted file mode 100644 index 371ab96a1e41..000000000000 --- a/src/common/meta/src/key/flow_task/flow_task_info.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{BTreeMap, HashMap}; - -use lazy_static::lazy_static; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use snafu::OptionExt; -use table::metadata::TableId; - -use crate::error::{self, Result}; -use crate::key::flow_task::FlowTaskScoped; -use crate::key::scope::{CatalogScoped, MetaKey}; -use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowTaskId, FlowTaskPartitionId, TableMetaValue, -}; -use crate::kv_backend::txn::Txn; -use crate::kv_backend::KvBackendRef; -use crate::table_name::TableName; -use crate::FlownodeId; - -const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; - -lazy_static! { - static ref FLOW_TASK_INFO_KEY_PATTERN: Regex = - Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); -} - -/// The key stores the metadata of the task. -/// -/// The layout: `__flow_task/{catalog}/info/{flow_task_id}`. -pub struct FlowTaskInfoKey(FlowTaskScoped>); - -impl MetaKey for FlowTaskInfoKey { - fn to_bytes(&self) -> Vec { - self.0.to_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlowTaskInfoKey(FlowTaskScoped::< - CatalogScoped, - >::from_bytes(bytes)?)) - } -} - -impl FlowTaskInfoKey { - /// Returns the [FlowTaskInfoKey]. - pub fn new(catalog: String, flow_task_id: FlowTaskId) -> FlowTaskInfoKey { - let inner = FlowTaskInfoKeyInner::new(flow_task_id); - FlowTaskInfoKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) - } - - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() - } - - /// Returns the [FlowTaskId]. - pub fn flow_task_id(&self) -> FlowTaskId { - self.0.flow_task_id - } -} - -/// The key of flow task metadata. -#[derive(Debug, Clone, Copy, PartialEq)] -struct FlowTaskInfoKeyInner { - flow_task_id: FlowTaskId, -} - -impl FlowTaskInfoKeyInner { - /// Returns a [FlowTaskInfoKey] with the specified `flow_task_id`. - pub fn new(flow_task_id: FlowTaskId) -> FlowTaskInfoKeyInner { - FlowTaskInfoKeyInner { flow_task_id } - } -} - -impl MetaKey for FlowTaskInfoKeyInner { - fn to_bytes(&self) -> Vec { - format!("{FLOW_TASK_INFO_KEY_PREFIX}/{}", self.flow_task_id).into_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - let key = std::str::from_utf8(bytes).map_err(|e| { - error::InvalidTableMetadataSnafu { - err_msg: format!( - "FlowTaskInfoKeyInner '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(bytes) - ), - } - .build() - })?; - let captures = - FLOW_TASK_INFO_KEY_PATTERN - .captures(key) - .context(error::InvalidTableMetadataSnafu { - err_msg: format!("Invalid FlowTaskInfoKeyInner '{key}'"), - })?; - // Safety: pass the regex check above - let flow_task_id = captures[1].parse::().unwrap(); - Ok(FlowTaskInfoKeyInner { flow_task_id }) - } -} - -// The metadata of the flow task. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct FlowTaskInfoValue { - /// The source tables used by the task. - pub(crate) source_table_ids: Vec, - /// The sink table used by the task. - pub(crate) sink_table_name: TableName, - /// Which flow nodes this task is running on. - pub(crate) flownode_ids: BTreeMap, - /// The catalog name. - pub(crate) catalog_name: String, - /// The task name. - pub(crate) task_name: String, - /// The raw sql. - pub(crate) raw_sql: String, - /// The expr of expire. - pub(crate) expire_when: String, - /// The comment. - pub(crate) comment: String, - /// The options. - pub(crate) options: HashMap, -} - -impl FlowTaskInfoValue { - /// Returns the `flownode_id`. - pub fn flownode_ids(&self) -> &BTreeMap { - &self.flownode_ids - } - - /// Returns the `source_table`. - pub fn source_table_ids(&self) -> &[TableId] { - &self.source_table_ids - } -} - -/// The manager of [FlowTaskInfoKey]. -pub struct FlowTaskInfoManager { - kv_backend: KvBackendRef, -} - -impl FlowTaskInfoManager { - /// Returns a new [FlowTaskInfoManager]. - pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } - } - - /// Returns the [FlowTaskInfoValue] of specified `flow_task_id`. - pub async fn get( - &self, - catalog: &str, - flow_task_id: FlowTaskId, - ) -> Result> { - let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes(); - self.kv_backend - .get(&key) - .await? - .map(|x| FlowTaskInfoValue::try_from_raw_value(&x.value)) - .transpose() - } - - /// Builds a create flow task transaction. - /// It is expected that the `__flow_task/{catalog}/info/{flow_task_id}` wasn't occupied. - /// Otherwise, the transaction will retrieve existing value. - pub(crate) fn build_create_txn( - &self, - catalog: &str, - flow_task_id: FlowTaskId, - flow_task_value: &FlowTaskInfoValue, - ) -> Result<( - Txn, - impl FnOnce( - &mut TxnOpGetResponseSet, - ) -> Result>>, - )> { - let key = FlowTaskInfoKey::new(catalog.to_string(), flow_task_id).to_bytes(); - let txn = - txn_helper::build_put_if_absent_txn(key.clone(), flow_task_value.try_as_raw_value()?); - - Ok(( - txn, - TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_key_serialization() { - let flow_task = FlowTaskInfoKey::new("my_catalog".to_string(), 2); - assert_eq!( - b"__flow_task/my_catalog/info/2".to_vec(), - flow_task.to_bytes() - ); - } - - #[test] - fn test_key_deserialization() { - let bytes = b"__flow_task/my_catalog/info/2".to_vec(); - let key = FlowTaskInfoKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); - assert_eq!(key.flow_task_id(), 2); - } -} diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs deleted file mode 100644 index eaf6da5ae848..000000000000 --- a/src/common/meta/src/key/flow_task/flow_task_name.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use lazy_static::lazy_static; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use snafu::OptionExt; - -use crate::error::{self, Result}; -use crate::key::flow_task::FlowTaskScoped; -use crate::key::scope::{CatalogScoped, MetaKey}; -use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN, -}; -use crate::kv_backend::txn::Txn; -use crate::kv_backend::KvBackendRef; - -const FLOW_TASK_NAME_KEY_PREFIX: &str = "name"; - -lazy_static! { - static ref FLOW_TASK_NAME_KEY_PATTERN: Regex = - Regex::new(&format!("^{FLOW_TASK_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap(); -} - -/// The key of mapping {task_name} to [FlowTaskId]. -/// -/// The layout: `__flow_task/{catalog}/name/{task_name}`. -pub struct FlowTaskNameKey(FlowTaskScoped>); - -impl FlowTaskNameKey { - /// Returns the [FlowTaskNameKey] - pub fn new(catalog: String, task_name: String) -> FlowTaskNameKey { - let inner = FlowTaskNameKeyInner::new(task_name); - FlowTaskNameKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) - } - - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() - } - - /// Return the `task_name` - pub fn task_name(&self) -> &str { - &self.0.task_name - } -} - -impl MetaKey for FlowTaskNameKey { - fn to_bytes(&self) -> Vec { - self.0.to_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlowTaskNameKey(FlowTaskScoped::< - CatalogScoped, - >::from_bytes(bytes)?)) - } -} - -/// The key of mapping name to [FlowTaskId] -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FlowTaskNameKeyInner { - pub task_name: String, -} - -impl MetaKey for FlowTaskNameKeyInner { - fn to_bytes(&self) -> Vec { - format!("{FLOW_TASK_NAME_KEY_PREFIX}/{}", self.task_name).into_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - let key = std::str::from_utf8(bytes).map_err(|e| { - error::InvalidTableMetadataSnafu { - err_msg: format!( - "FlowTaskNameKeyInner '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(bytes) - ), - } - .build() - })?; - let captures = - FLOW_TASK_NAME_KEY_PATTERN - .captures(key) - .context(error::InvalidTableMetadataSnafu { - err_msg: format!("Invalid FlowTaskNameKeyInner '{key}'"), - })?; - // Safety: pass the regex check above - let task = captures[1].to_string(); - Ok(FlowTaskNameKeyInner { task_name: task }) - } -} - -impl FlowTaskNameKeyInner { - /// Returns a [FlowTaskNameKeyInner]. - pub fn new(task: String) -> Self { - Self { task_name: task } - } -} - -/// The value of [FlowTaskNameKey]. -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] -pub struct FlowTaskNameValue { - flow_task_id: FlowTaskId, -} - -impl FlowTaskNameValue { - /// Returns a [FlowTaskNameValue] with specified [FlowTaskId]. - pub fn new(flow_task_id: FlowTaskId) -> Self { - Self { flow_task_id } - } - - /// Returns the [FlowTaskId] - pub fn flow_task_id(&self) -> FlowTaskId { - self.flow_task_id - } -} - -/// The manager of [FlowTaskNameKey]. -pub struct FlowTaskNameManager { - kv_backend: KvBackendRef, -} - -impl FlowTaskNameManager { - /// Returns a new [FlowTaskNameManager]. - pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } - } - - /// Returns the [FlowTaskNameValue] of specified `catalog.task`. - pub async fn get(&self, catalog: &str, task: &str) -> Result> { - let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); - let raw_key = key.to_bytes(); - self.kv_backend - .get(&raw_key) - .await? - .map(|x| FlowTaskNameValue::try_from_raw_value(&x.value)) - .transpose() - } - - /// Returns true if the `task` exists. - pub async fn exists(&self, catalog: &str, task: &str) -> Result { - let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); - let raw_key = key.to_bytes(); - self.kv_backend.exists(&raw_key).await - } - - /// Builds a create flow task name transaction. - /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. - /// Otherwise, the transaction will retrieve existing value. - pub fn build_create_txn( - &self, - catalog: &str, - name: &str, - flow_task_id: FlowTaskId, - ) -> Result<( - Txn, - impl FnOnce( - &mut TxnOpGetResponseSet, - ) -> Result>>, - )> { - let key = FlowTaskNameKey::new(catalog.to_string(), name.to_string()); - let raw_key = key.to_bytes(); - let flow_task_name_value = FlowTaskNameValue::new(flow_task_id); - let txn = txn_helper::build_put_if_absent_txn( - raw_key.clone(), - flow_task_name_value.try_as_raw_value()?, - ); - - Ok(( - txn, - TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_key_serialization() { - let table_task_key = FlowTaskNameKey::new("my_catalog".to_string(), "my_task".to_string()); - assert_eq!( - b"__flow_task/my_catalog/name/my_task".to_vec(), - table_task_key.to_bytes(), - ); - } - - #[test] - fn test_key_deserialization() { - let bytes = b"__flow_task/my_catalog/name/my_task".to_vec(); - let key = FlowTaskNameKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); - assert_eq!(key.task_name(), "my_task"); - } -} diff --git a/src/common/meta/src/key/flow_task/flownode_task.rs b/src/common/meta/src/key/flow_task/flownode_task.rs deleted file mode 100644 index bacff5326e08..000000000000 --- a/src/common/meta/src/key/flow_task/flownode_task.rs +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use futures::stream::BoxStream; -use futures::TryStreamExt; -use lazy_static::lazy_static; -use regex::Regex; -use snafu::OptionExt; - -use crate::error::{self, Result}; -use crate::key::flow_task::FlowTaskScoped; -use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowTaskId, FlowTaskPartitionId}; -use crate::kv_backend::txn::{Txn, TxnOp}; -use crate::kv_backend::KvBackendRef; -use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; -use crate::rpc::store::RangeRequest; -use crate::rpc::KeyValue; -use crate::FlownodeId; - -lazy_static! { - static ref FLOWNODE_TASK_KEY_PATTERN: Regex = Regex::new(&format!( - "^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$" - )) - .unwrap(); -} - -const FLOWNODE_TASK_KEY_PREFIX: &str = "flownode"; - -/// The key of mapping [FlownodeId] to [FlowTaskId]. -/// -/// The layout `__flow_task/{catalog}/flownode/{flownode_id}/{flow_task_id}/{partition_id}` -pub struct FlownodeTaskKey(FlowTaskScoped>); - -impl MetaKey for FlownodeTaskKey { - fn to_bytes(&self) -> Vec { - self.0.to_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlownodeTaskKey(FlowTaskScoped::< - CatalogScoped, - >::from_bytes(bytes)?)) - } -} - -impl FlownodeTaskKey { - /// Returns a new [FlownodeTaskKey]. - pub fn new( - catalog: String, - flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, - ) -> FlownodeTaskKey { - let inner = FlownodeTaskKeyInner::new(flownode_id, flow_task_id, partition_id); - FlownodeTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) - } - - /// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`. - pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec { - let catalog_scoped_key = CatalogScoped::new( - catalog, - BytesAdapter::from(FlownodeTaskKeyInner::range_start_key(flownode_id).into_bytes()), - ); - - FlowTaskScoped::new(catalog_scoped_key).to_bytes() - } - - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() - } - - /// Returns the [FlowTaskId]. - pub fn flow_task_id(&self) -> FlowTaskId { - self.0.flow_task_id - } - - /// Returns the [FlownodeId]. - pub fn flownode_id(&self) -> FlownodeId { - self.0.flownode_id - } - - /// Returns the [PartitionId]. - pub fn partition_id(&self) -> FlowTaskPartitionId { - self.0.partition_id - } -} - -/// The key of mapping [FlownodeId] to [FlowTaskId]. -pub struct FlownodeTaskKeyInner { - flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, -} - -impl FlownodeTaskKeyInner { - /// Returns a [FlownodeTaskKey] with the specified `flownode_id`, `flow_task_id` and `partition_id`. - pub fn new( - flownode_id: FlownodeId, - flow_task_id: FlowTaskId, - partition_id: FlowTaskPartitionId, - ) -> Self { - Self { - flownode_id, - flow_task_id, - partition_id, - } - } - - fn prefix(flownode_id: FlownodeId) -> String { - format!("{}/{flownode_id}", FLOWNODE_TASK_KEY_PREFIX) - } - - /// The prefix used to retrieve all [FlownodeTaskKey]s with the specified `flownode_id`. - fn range_start_key(flownode_id: FlownodeId) -> String { - format!("{}/", Self::prefix(flownode_id)) - } -} - -impl MetaKey for FlownodeTaskKeyInner { - fn to_bytes(&self) -> Vec { - format!( - "{FLOWNODE_TASK_KEY_PREFIX}/{}/{}/{}", - self.flownode_id, self.flow_task_id, self.partition_id, - ) - .into_bytes() - } - - fn from_bytes(bytes: &[u8]) -> Result { - let key = std::str::from_utf8(bytes).map_err(|e| { - error::InvalidTableMetadataSnafu { - err_msg: format!( - "FlownodeTaskKeyInner '{}' is not a valid UTF8 string: {e}", - String::from_utf8_lossy(bytes) - ), - } - .build() - })?; - let captures = - FLOWNODE_TASK_KEY_PATTERN - .captures(key) - .context(error::InvalidTableMetadataSnafu { - err_msg: format!("Invalid FlownodeTaskKeyInner '{key}'"), - })?; - // Safety: pass the regex check above - let flownode_id = captures[1].parse::().unwrap(); - let flow_task_id = captures[2].parse::().unwrap(); - let partition_id = captures[3].parse::().unwrap(); - - Ok(FlownodeTaskKeyInner { - flownode_id, - flow_task_id, - partition_id, - }) - } -} - -/// The manager of [FlownodeTaskKey]. -pub struct FlownodeTaskManager { - kv_backend: KvBackendRef, -} - -/// Decodes `KeyValue` to [FlownodeTaskKey]. -pub fn flownode_task_key_decoder(kv: KeyValue) -> Result { - FlownodeTaskKey::from_bytes(&kv.key) -} - -impl FlownodeTaskManager { - /// Returns a new [FlownodeTaskManager]. - pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } - } - - /// Retrieves all [FlowTaskId] and [PartitionId]s of the specified `flownode_id`. - pub fn tasks( - &self, - catalog: &str, - flownode_id: FlownodeId, - ) -> BoxStream<'static, Result<(FlowTaskId, FlowTaskPartitionId)>> { - let start_key = FlownodeTaskKey::range_start_key(catalog.to_string(), flownode_id); - let req = RangeRequest::new().with_prefix(start_key); - - let stream = PaginationStream::new( - self.kv_backend.clone(), - req, - DEFAULT_PAGE_SIZE, - Arc::new(flownode_task_key_decoder), - ); - - Box::pin(stream.map_ok(|key| (key.flow_task_id(), key.partition_id()))) - } - - /// Builds a create flownode task transaction. - /// - /// Puts `__flownode_task/{flownode_id}/{flow_task_id}/{partition_id}` keys. - pub(crate) fn build_create_txn>( - &self, - catalog: &str, - flow_task_id: FlowTaskId, - flownode_ids: I, - ) -> Txn { - let txns = flownode_ids - .into_iter() - .map(|(partition_id, flownode_id)| { - let key = FlownodeTaskKey::new( - catalog.to_string(), - flownode_id, - flow_task_id, - partition_id, - ) - .to_bytes(); - TxnOp::Put(key, vec![]) - }) - .collect::>(); - - Txn::new().and_then(txns) - } -} - -#[cfg(test)] -mod tests { - use crate::key::flow_task::flownode_task::FlownodeTaskKey; - use crate::key::scope::MetaKey; - - #[test] - fn test_key_serialization() { - let flownode_task = FlownodeTaskKey::new("my_catalog".to_string(), 1, 2, 0); - assert_eq!( - b"__flow_task/my_catalog/flownode/1/2/0".to_vec(), - flownode_task.to_bytes() - ); - let prefix = FlownodeTaskKey::range_start_key("my_catalog".to_string(), 1); - assert_eq!(b"__flow_task/my_catalog/flownode/1/".to_vec(), prefix); - } - - #[test] - fn test_key_deserialization() { - let bytes = b"__flow_task/my_catalog/flownode/1/2/0".to_vec(); - let key = FlownodeTaskKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); - assert_eq!(key.flownode_id(), 1); - assert_eq!(key.flow_task_id(), 2); - assert_eq!(key.partition_id(), 0); - } -} diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index 456d1ccffad7..7fbc07655ea7 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -22,7 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; -const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_task_name_lock"; +const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; /// [CatalogLock] acquires the lock on the tenant level. @@ -111,28 +111,28 @@ impl From for StringKey { } } -/// [FlowTaskNameLock] prevents any procedures trying to create a flow task named it. -pub enum FlowTaskNameLock { +/// [FlowNameLock] prevents any procedures trying to create a flow named it. +pub enum FlowNameLock { Write(String), } -impl FlowTaskNameLock { +impl FlowNameLock { pub fn new(catalog: &str, table: &str) -> Self { Self::Write(format!("{catalog}.{table}")) } } -impl Display for FlowTaskNameLock { +impl Display for FlowNameLock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let FlowTaskNameLock::Write(name) = self; - write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name) + let FlowNameLock::Write(name) = self; + write!(f, "{}/{}", FLOW_NAME_LOCK_PREFIX, name) } } -impl From for StringKey { - fn from(value: FlowTaskNameLock) -> Self { +impl From for StringKey { + fn from(value: FlowNameLock) -> Self { match value { - FlowTaskNameLock::Write(_) => StringKey::Exclusive(value.to_string()), + FlowNameLock::Write(_) => StringKey::Exclusive(value.to_string()), } } } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 0a47b1de1463..34bb95dc0cb7 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,9 +39,9 @@ lazy_static! { &["step"] ) .unwrap(); - pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( - "greptime_meta_procedure_create_flow_task", - "meta procedure create flow task", + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow", + "meta procedure create flow", &["step"] ) .unwrap(); diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 911cab18df4c..9b75bd6c3963 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -26,8 +26,8 @@ use api::v1::meta::{ TruncateTableTask as PbTruncateTableTask, }; use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr, - DropFlowTaskExpr, DropTableExpr, TruncateTableExpr, + AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr, + DropTableExpr, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -729,11 +729,11 @@ impl TryFrom for PbDropDatabaseTask { } } -/// Create flow task +/// Create flow #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -741,16 +741,16 @@ pub struct CreateFlowTask { pub expire_when: String, pub comment: String, pub sql: String, - pub options: HashMap, + pub flow_options: HashMap, } impl TryFrom for CreateFlowTask { type Error = error::Error; fn try_from(pb: PbCreateFlowTask) -> Result { - let CreateFlowTaskExpr { + let CreateFlowExpr { catalog_name, - task_name, + flow_name, source_table_names, sink_table_name, or_replace, @@ -758,14 +758,14 @@ impl TryFrom for CreateFlowTask { expire_when, comment, sql, - task_options, - } = pb.create_flow_task.context(error::InvalidProtoMsgSnafu { - err_msg: "expected create_flow_task", + flow_options, + } = pb.create_flow.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create_flow", })?; Ok(CreateFlowTask { catalog_name, - task_name, + flow_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -777,7 +777,7 @@ impl TryFrom for CreateFlowTask { expire_when, comment, sql, - options: task_options, + flow_options, }) } } @@ -786,7 +786,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - task_name, + flow_name, source_table_names, sink_table_name, or_replace, @@ -794,13 +794,13 @@ impl From for PbCreateFlowTask { expire_when, comment, sql, - options, + flow_options, }: CreateFlowTask, ) -> Self { PbCreateFlowTask { - create_flow_task: Some(CreateFlowTaskExpr { + create_flow: Some(CreateFlowExpr { catalog_name, - task_name, + flow_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: Some(sink_table_name.into()), or_replace, @@ -808,31 +808,31 @@ impl From for PbCreateFlowTask { expire_when, comment, sql, - task_options: options, + flow_options, }), } } } -/// Drop flow task +/// Drop flow pub struct DropFlowTask { pub catalog_name: String, - pub task_name: String, + pub flow_name: String, } impl TryFrom for DropFlowTask { type Error = error::Error; fn try_from(pb: PbDropFlowTask) -> Result { - let DropFlowTaskExpr { + let DropFlowExpr { catalog_name, - task_name, - } = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu { + flow_name, + } = pb.drop_flow.context(error::InvalidProtoMsgSnafu { err_msg: "expected sink_table_name", })?; Ok(DropFlowTask { catalog_name, - task_name, + flow_name, }) } } @@ -841,13 +841,13 @@ impl From for PbDropFlowTask { fn from( DropFlowTask { catalog_name, - task_name, + flow_name, }: DropFlowTask, ) -> Self { PbDropFlowTask { - drop_flow_task: Some(DropFlowTaskExpr { + drop_flow: Some(DropFlowExpr { catalog_name, - task_name, + flow_name, }), } } diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index fe5bf0c439be..7c9ed13f5e88 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -20,11 +20,11 @@ pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; +use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; -use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; -use crate::key::flow_task::FlowTaskMetadataManager; +use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; @@ -109,21 +109,20 @@ pub fn new_ddl_context_with_kv_backend( ), Arc::new(WalOptionsAllocator::default()), )); - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); - let flow_task_metadata_allocator = Arc::new( - FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let flow_metadata_allocator = + Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new( SequenceBuilder::new("flow-test", kv_backend) .initial(1024) .build(), - )), - ); + ))); DdlContext { node_manager, cache_invalidator: Arc::new(DummyCacheInvalidator), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), table_metadata_allocator, table_metadata_manager, - flow_task_metadata_allocator, - flow_task_metadata_manager, + flow_metadata_allocator, + flow_metadata_manager, } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 551a7da85d31..2009bc56381c 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -137,10 +137,10 @@ impl GrpcQueryHandler for Instance { TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.statement_executor.truncate_table(table_name).await? } - DdlExpr::CreateFlowTask(_) => { + DdlExpr::CreateFlow(_) => { unimplemented!() } - DdlExpr::DropFlowTask(_) => { + DdlExpr::DropFlow(_) => { unimplemented!() } } @@ -181,12 +181,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte Expr::TruncateTable(expr) => { check_and_fill!(expr); } - Expr::CreateFlowTask(expr) => { + Expr::CreateFlow(expr) => { if expr.catalog_name.is_empty() { expr.catalog_name = catalog.to_string(); } } - Expr::DropFlowTask(expr) => { + Expr::DropFlow(expr) => { if expr.catalog_name.is_empty() { expr.catalog_name = catalog.to_string(); } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 308d08a20ae0..bb0e0255f1db 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -59,7 +59,7 @@ use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::{become_follower, become_leader, StateRef}; pub const TABLE_ID_SEQ: &str = "table_id"; -pub const FLOW_TASK_ID_SEQ: &str = "flow_id"; +pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ab17088745c5..a2cd95e67b2c 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,14 +18,14 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; +use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; +use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; -use common_meta::key::flow_task::FlowTaskMetadataManager; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -38,7 +38,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; -use super::FLOW_TASK_ID_SEQ; +use super::FLOW_ID_SEQ; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; @@ -205,7 +205,7 @@ impl MetasrvBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, )); - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new( + let flow_metadata_manager = Arc::new(FlowMetadataManager::new( leader_cached_kv_backend.clone() as _, )); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); @@ -239,14 +239,13 @@ impl MetasrvBuilder { )) }); // TODO(weny): use the real allocator. - let flow_task_metadata_allocator = Arc::new( - FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( - SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) - .initial(MIN_USER_FLOW_TASK_ID as u64) + let flow_metadata_allocator = + Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new( + SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_ID as u64) .step(10) .build(), - )), - ); + ))); let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); let node_manager = node_manager.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -273,8 +272,8 @@ impl MetasrvBuilder { memory_region_keeper: memory_region_keeper.clone(), table_metadata_manager: table_metadata_manager.clone(), table_metadata_allocator: table_metadata_allocator.clone(), - flow_task_metadata_manager: flow_task_metadata_manager.clone(), - flow_task_metadata_allocator: flow_task_metadata_allocator.clone(), + flow_metadata_manager: flow_metadata_manager.clone(), + flow_metadata_allocator: flow_metadata_allocator.clone(), }, procedure_manager.clone(), true, diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 55a9119db3ed..042c31a78741 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,10 +105,10 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; + use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; - use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; - use common_meta::key::flow_task::FlowTaskMetadataManager; + use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::node_manager::NodeManagerRef; @@ -201,11 +201,10 @@ pub mod test_data { Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), Arc::new(WalOptionsAllocator::default()), )); - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); - let flow_task_metadata_allocator = - Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( - Arc::new(SequenceBuilder::new("test", kv_backend).build()), - )); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + )); DdlContext { node_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( @@ -216,8 +215,8 @@ pub mod test_data { )), table_metadata_manager, table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, + flow_metadata_manager, + flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), } } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 4e7aef084fb2..f0305ed981a5 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -541,7 +541,7 @@ pub fn to_create_flow_task_expr( Ok(CreateFlowTask { catalog_name: query_ctx.current_catalog().to_string(), - task_name: create_flow.flow_name.to_string(), + flow_name: create_flow.flow_name.to_string(), source_table_names, sink_table_name, or_replace: create_flow.or_replace, @@ -552,7 +552,7 @@ pub fn to_create_flow_task_expr( .unwrap_or_default(), comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - options: HashMap::new(), + flow_options: HashMap::new(), }) } diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 1d333c0069ef..4876931eea56 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -175,7 +175,7 @@ impl<'a> ParserContext<'a> { fn parse_create_flow(&mut self, or_replace: bool) -> Result { let if_not_exists = self.parse_if_not_exist()?; - let task_name = self.intern_parse_table_name()?; + let flow_name = self.intern_parse_table_name()?; self.parser .expect_token(&Token::make_keyword(SINK)) @@ -219,7 +219,7 @@ impl<'a> ParserContext<'a> { let query = Box::new(self.parser.parse_query().context(error::SyntaxSnafu)?); Ok(Statement::CreateFlow(CreateFlow { - flow_name: task_name, + flow_name, sink_table_name: output_table_name, or_replace, if_not_exists, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0931e87d05d4..2748291957bc 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; use cmd::options::MixOptions; use common_base::Plugins; -use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; +use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; use common_meta::cache_invalidator::MultiCacheInvalidator; +use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; -use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::ddl_manager::DdlManager; -use common_meta::key::flow_task::FlowTaskMetadataManager; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; @@ -38,7 +38,7 @@ use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; -use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; +use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use servers::Mode; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -129,7 +129,7 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; @@ -142,9 +142,9 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - let flow_task_id_sequence = Arc::new( - SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) - .initial(MIN_USER_FLOW_TASK_ID as u64) + let flow_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_ID as u64) .step(10) .build(), ); @@ -156,9 +156,9 @@ impl GreptimeDbStandaloneBuilder { table_id_sequence, wal_options_allocator.clone(), )); - let flow_task_metadata_allocator = Arc::new( - FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), - ); + let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( + flow_id_sequence, + )); let ddl_task_executor = Arc::new( DdlManager::try_new( @@ -168,8 +168,8 @@ impl GreptimeDbStandaloneBuilder { memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), table_metadata_manager, table_metadata_allocator, - flow_task_metadata_manager, - flow_task_metadata_allocator, + flow_metadata_manager, + flow_metadata_allocator, }, procedure_manager.clone(), register_procedure_loaders,