diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 15f3e0237863..c9b95d3c13cd 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -37,7 +37,7 @@ use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::Result; use crate::key::flow::flow_info::FlowInfoValue; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateFlowTask; @@ -195,7 +195,7 @@ pub struct CreateFlowTaskData { pub(crate) cluster_id: ClusterId, pub(crate) state: CreateFlowTaskState, pub(crate) task: CreateFlowTask, - pub(crate) flow_id: Option, + pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, } diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 8cc3fe0aa80f..4b7000f60a5d 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use tonic::async_trait; use crate::error::Result; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::peer::Peer; use crate::sequence::SequenceRef; @@ -43,13 +43,13 @@ impl FlowMetadataAllocator { } /// Allocates a the [FlowTaskId]. - pub(crate) async fn allocate_flow_id(&self) -> Result { - let flow_id = self.flow_id_sequence.next().await? as FlowTaskId; + pub(crate) async fn allocate_flow_id(&self) -> Result { + let flow_id = self.flow_id_sequence.next().await? as FlowId; Ok(flow_id) } /// Allocates the [FlowTaskId] and [Peer]s. - pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec)> { + pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; let peers = self.partition_peer_allocator.alloc(partitions).await?; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index da30bcf7f2c6..85048f25f48e 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -157,7 +157,7 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [ pub type RegionDistribution = BTreeMap>; /// The id of flow. -pub type FlowTaskId = u32; +pub type FlowId = u32; /// The partition of flow. pub type FlowPartitionId = u32; diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index c9b8925a56a7..1848af42c09f 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -31,7 +31,7 @@ use crate::key::flow::flow_name::FlowNameManager; use crate::key::flow::flownode_flow::FlownodeFlowManager; use crate::key::flow::table_flow::TableFlowManager; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{FlowTaskId, MetaKey}; +use crate::key::{FlowId, MetaKey}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -131,7 +131,7 @@ impl FlowMetadataManager { /// Creates metadata for flow and returns an error if different metadata exists. pub async fn create_flow_metadata( &self, - flow_id: FlowTaskId, + flow_id: FlowId, flow_value: FlowInfoValue, ) -> Result<()> { let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index aca73e8a973b..ab91b9a3e6e0 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -24,7 +24,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowPartitionId, FlowTaskId, MetaKey, TableMetaValue, + txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -57,13 +57,13 @@ impl MetaKey for FlowInfoKey { impl FlowInfoKey { /// Returns the [FlowInfoKey]. - pub fn new(flow_id: FlowTaskId) -> FlowInfoKey { + pub fn new(flow_id: FlowId) -> FlowInfoKey { let inner = FlowInfoKeyInner::new(flow_id); FlowInfoKey(FlowScoped::new(inner)) } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } } @@ -71,12 +71,12 @@ impl FlowInfoKey { /// The key of flow metadata. #[derive(Debug, Clone, Copy, PartialEq)] struct FlowInfoKeyInner { - flow_id: FlowTaskId, + flow_id: FlowId, } impl FlowInfoKeyInner { /// Returns a [FlowInfoKey] with the specified `flow_id`. - pub fn new(flow_id: FlowTaskId) -> FlowInfoKeyInner { + pub fn new(flow_id: FlowId) -> FlowInfoKeyInner { FlowInfoKeyInner { flow_id } } } @@ -103,7 +103,7 @@ impl MetaKey for FlowInfoKeyInner { err_msg: format!("Invalid FlowInfoKeyInner '{key}'"), })?; // Safety: pass the regex check above - let flow_id = captures[1].parse::().unwrap(); + let flow_id = captures[1].parse::().unwrap(); Ok(FlowInfoKeyInner { flow_id }) } } @@ -155,7 +155,7 @@ impl FlowInfoManager { } /// Returns the [FlowTaskValue] of specified `flow_id`. - pub async fn get(&self, flow_id: FlowTaskId) -> Result> { + pub async fn get(&self, flow_id: FlowId) -> Result> { let key = FlowInfoKey::new(flow_id).to_bytes(); self.kv_backend .get(&key) @@ -169,7 +169,7 @@ impl FlowInfoManager { /// Otherwise, the transaction will retrieve existing value. pub(crate) fn build_create_txn( &self, - flow_id: FlowTaskId, + flow_id: FlowId, flow_value: &FlowInfoValue, ) -> Result<( Txn, diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 14db164d862d..619d926bf62d 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -21,7 +21,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowTaskId, MetaKey, TableMetaValue, NAME_PATTERN, + txn_helper, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN, }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -125,17 +125,17 @@ impl FlowNameKeyInner { /// The value of [FlowNameKey]. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub struct FlowNameValue { - flow_id: FlowTaskId, + flow_id: FlowId, } impl FlowNameValue { /// Returns a [FlowNameValue] with specified [FlowTaskId]. - pub fn new(flow_id: FlowTaskId) -> Self { + pub fn new(flow_id: FlowId) -> Self { Self { flow_id } } /// Returns the [FlowTaskId] - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.flow_id } } @@ -176,7 +176,7 @@ impl FlowNameManager { &self, catalog_name: &str, flow_name: &str, - flow_id: FlowTaskId, + flow_id: FlowId, ) -> Result<( Txn, impl FnOnce( diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index ad6388dbb915..bcf7d9d09493 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -22,7 +22,7 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::{BytesAdapter, FlowPartitionId, FlowTaskId, MetaKey}; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -60,7 +60,7 @@ impl FlownodeFlowKey { /// Returns a new [FlownodeFlowKey]. pub fn new( flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> FlownodeFlowKey { let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id); @@ -76,7 +76,7 @@ impl FlownodeFlowKey { } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -94,17 +94,13 @@ impl FlownodeFlowKey { /// The key of mapping [FlownodeId] to [FlowTaskId]. pub struct FlownodeFlowKeyInner { flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, } impl FlownodeFlowKeyInner { /// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`. - pub fn new( - flownode_id: FlownodeId, - flow_id: FlowTaskId, - partition_id: FlowPartitionId, - ) -> Self { + pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self { Self { flownode_id, flow_id, @@ -149,7 +145,7 @@ impl MetaKey for FlownodeFlowKeyInner { })?; // Safety: pass the regex check above let flownode_id = captures[1].parse::().unwrap(); - let flow_id = captures[2].parse::().unwrap(); + let flow_id = captures[2].parse::().unwrap(); let partition_id = captures[3].parse::().unwrap(); Ok(FlownodeFlowKeyInner { @@ -180,7 +176,7 @@ impl FlownodeFlowManager { pub fn tasks( &self, flownode_id: FlownodeId, - ) -> BoxStream<'static, Result<(FlowTaskId, FlowPartitionId)>> { + ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> { let start_key = FlownodeFlowKey::range_start_key(flownode_id); let req = RangeRequest::new().with_prefix(start_key); @@ -199,7 +195,7 @@ impl FlownodeFlowManager { /// Puts `__flownode_task/{flownode_id}/{flow_id}/{partition_id}` keys. pub(crate) fn build_create_txn>( &self, - flow_id: FlowTaskId, + flow_id: FlowId, flownode_ids: I, ) -> Txn { let txns = flownode_ids diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 437bb062e29d..f3ac9e98d449 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -22,7 +22,7 @@ use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::{BytesAdapter, FlowPartitionId, FlowTaskId, MetaKey}; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -44,7 +44,7 @@ lazy_static! { struct TableFlowKeyInner { table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, } @@ -71,7 +71,7 @@ impl TableFlowKey { pub fn new( table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> TableFlowKey { let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id); @@ -91,7 +91,7 @@ impl TableFlowKey { } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -111,7 +111,7 @@ impl TableFlowKeyInner { fn new( table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> TableFlowKeyInner { Self { @@ -160,7 +160,7 @@ impl MetaKey for TableFlowKeyInner { // Safety: pass the regex check above let table_id = captures[1].parse::().unwrap(); let flownode_id = captures[2].parse::().unwrap(); - let flow_id = captures[3].parse::().unwrap(); + let flow_id = captures[3].parse::().unwrap(); let partition_id = captures[4].parse::().unwrap(); Ok(TableFlowKeyInner::new( table_id, @@ -206,7 +206,7 @@ impl TableFlowManager { /// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys. pub fn build_create_txn>( &self, - flow_id: FlowTaskId, + flow_id: FlowId, flownode_ids: I, source_table_ids: &[TableId], ) -> Txn {