diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 1b8f0abd6bce..93dbdfa99cf7 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -37,7 +37,7 @@ use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::Result; use crate::key::flow::flow_info::FlowInfoValue; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateFlowTask; @@ -184,7 +184,7 @@ pub struct CreateFlowTaskData { pub(crate) cluster_id: ClusterId, pub(crate) state: CreateFlowTaskState, pub(crate) task: CreateFlowTask, - pub(crate) flow_id: Option, + pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, } diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 8cc3fe0aa80f..4b7000f60a5d 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use tonic::async_trait; use crate::error::Result; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::peer::Peer; use crate::sequence::SequenceRef; @@ -43,13 +43,13 @@ impl FlowMetadataAllocator { } /// Allocates a the [FlowTaskId]. - pub(crate) async fn allocate_flow_id(&self) -> Result { - let flow_id = self.flow_id_sequence.next().await? as FlowTaskId; + pub(crate) async fn allocate_flow_id(&self) -> Result { + let flow_id = self.flow_id_sequence.next().await? as FlowId; Ok(flow_id) } /// Allocates the [FlowTaskId] and [Peer]s. - pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec)> { + pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; let peers = self.partition_peer_allocator.alloc(partitions).await?; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d5db00c75910..7cc007c0d373 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -158,7 +158,7 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [ pub type RegionDistribution = BTreeMap>; /// The id of flow. -pub type FlowTaskId = u32; +pub type FlowId = u32; /// The partition of flow. pub type FlowPartitionId = u32; diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index c839d0cde2e9..6d4ec1cd2bba 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -32,7 +32,7 @@ use crate::key::flow::flownode_flow::FlownodeFlowManager; use crate::key::flow::table_flow::TableFlowManager; use crate::key::scope::MetaKey; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::FlowTaskId; +use crate::key::FlowId; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -132,7 +132,7 @@ impl FlowMetadataManager { /// Creates metadata for flow and returns an error if different metadata exists. pub async fn create_flow_metadata( &self, - flow_id: FlowTaskId, + flow_id: FlowId, flow_value: FlowInfoValue, ) -> Result<()> { let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 4c92dcb595c7..50dab7ea2ab7 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -24,9 +24,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::scope::{CatalogScoped, MetaKey}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowPartitionId, FlowTaskId, TableMetaValue, -}; +use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::table_name::TableName; @@ -58,7 +56,7 @@ impl MetaKey for FlowInfoKey { impl FlowInfoKey { /// Returns the [FlowInfoKey]. - pub fn new(catalog: String, flow_id: FlowTaskId) -> FlowInfoKey { + pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey { let inner = FlowInfoKeyInner::new(flow_id); FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) } @@ -69,7 +67,7 @@ impl FlowInfoKey { } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } } @@ -77,12 +75,12 @@ impl FlowInfoKey { /// The key of flow metadata. #[derive(Debug, Clone, Copy, PartialEq)] struct FlowInfoKeyInner { - flow_id: FlowTaskId, + flow_id: FlowId, } impl FlowInfoKeyInner { /// Returns a [FlowInfoKey] with the specified `flow_id`. - pub fn new(flow_id: FlowTaskId) -> FlowInfoKeyInner { + pub fn new(flow_id: FlowId) -> FlowInfoKeyInner { FlowInfoKeyInner { flow_id } } } @@ -109,7 +107,7 @@ impl MetaKey for FlowInfoKeyInner { err_msg: format!("Invalid FlowInfoKeyInner '{key}'"), })?; // Safety: pass the regex check above - let flow_id = captures[1].parse::().unwrap(); + let flow_id = captures[1].parse::().unwrap(); Ok(FlowInfoKeyInner { flow_id }) } } @@ -161,7 +159,7 @@ impl FlowInfoManager { } /// Returns the [FlowTaskValue] of specified `flow_id`. - pub async fn get(&self, catalog: &str, flow_id: FlowTaskId) -> Result> { + pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result> { let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); self.kv_backend .get(&key) @@ -176,7 +174,7 @@ impl FlowInfoManager { pub(crate) fn build_create_txn( &self, catalog: &str, - flow_id: FlowTaskId, + flow_id: FlowId, flow_value: &FlowInfoValue, ) -> Result<( Txn, diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 136fbcc30066..18fcac3686c3 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -21,9 +21,7 @@ use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::scope::{CatalogScoped, MetaKey}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{ - txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN, -}; +use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -112,17 +110,17 @@ impl FlowNameKeyInner { /// The value of [FlowNameKey]. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub struct FlowNameValue { - flow_id: FlowTaskId, + flow_id: FlowId, } impl FlowNameValue { /// Returns a [FlowNameValue] with specified [FlowTaskId]. - pub fn new(flow_id: FlowTaskId) -> Self { + pub fn new(flow_id: FlowId) -> Self { Self { flow_id } } /// Returns the [FlowTaskId] - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.flow_id } } @@ -163,7 +161,7 @@ impl FlowNameManager { &self, catalog: &str, name: &str, - flow_id: FlowTaskId, + flow_id: FlowId, ) -> Result<( Txn, impl FnOnce( diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index adf25a08e8d0..203c14302e83 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -23,7 +23,7 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowPartitionId, FlowTaskId}; +use crate::key::{FlowId, FlowPartitionId}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -62,7 +62,7 @@ impl FlownodeFlowKey { pub fn new( catalog: String, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> FlownodeFlowKey { let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id); @@ -85,7 +85,7 @@ impl FlownodeFlowKey { } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -103,17 +103,13 @@ impl FlownodeFlowKey { /// The key of mapping [FlownodeId] to [FlowTaskId]. pub struct FlownodeFlowKeyInner { flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, } impl FlownodeFlowKeyInner { /// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`. - pub fn new( - flownode_id: FlownodeId, - flow_id: FlowTaskId, - partition_id: FlowPartitionId, - ) -> Self { + pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self { Self { flownode_id, flow_id, @@ -158,7 +154,7 @@ impl MetaKey for FlownodeFlowKeyInner { })?; // Safety: pass the regex check above let flownode_id = captures[1].parse::().unwrap(); - let flow_id = captures[2].parse::().unwrap(); + let flow_id = captures[2].parse::().unwrap(); let partition_id = captures[3].parse::().unwrap(); Ok(FlownodeFlowKeyInner { @@ -190,7 +186,7 @@ impl FlownodeFlowManager { &self, catalog: &str, flownode_id: FlownodeId, - ) -> BoxStream<'static, Result<(FlowTaskId, FlowPartitionId)>> { + ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> { let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id); let req = RangeRequest::new().with_prefix(start_key); @@ -210,7 +206,7 @@ impl FlownodeFlowManager { pub(crate) fn build_create_txn>( &self, catalog: &str, - flow_id: FlowTaskId, + flow_id: FlowId, flownode_ids: I, ) -> Txn { let txns = flownode_ids diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index ff3db2a896c6..5a4bf474cdb9 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -23,7 +23,7 @@ use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowPartitionId, FlowTaskId}; +use crate::key::{FlowId, FlowPartitionId}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -45,7 +45,7 @@ lazy_static! { struct TableFlowKeyInner { table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, } @@ -73,7 +73,7 @@ impl TableFlowKey { catalog: String, table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> TableFlowKey { let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id); @@ -101,7 +101,7 @@ impl TableFlowKey { } /// Returns the [FlowTaskId]. - pub fn flow_id(&self) -> FlowTaskId { + pub fn flow_id(&self) -> FlowId { self.0.flow_id } @@ -121,7 +121,7 @@ impl TableFlowKeyInner { fn new( table_id: TableId, flownode_id: FlownodeId, - flow_id: FlowTaskId, + flow_id: FlowId, partition_id: FlowPartitionId, ) -> TableFlowKeyInner { Self { @@ -170,7 +170,7 @@ impl MetaKey for TableFlowKeyInner { // Safety: pass the regex check above let table_id = captures[1].parse::().unwrap(); let flownode_id = captures[2].parse::().unwrap(); - let flow_id = captures[3].parse::().unwrap(); + let flow_id = captures[3].parse::().unwrap(); let partition_id = captures[4].parse::().unwrap(); Ok(TableFlowKeyInner::new( table_id, @@ -221,7 +221,7 @@ impl TableFlowManager { pub fn build_create_txn>( &self, catalog: &str, - flow_id: FlowTaskId, + flow_id: FlowId, flownode_ids: I, source_table_ids: &[TableId], ) -> Txn {