Skip to content

Commit

Permalink
refactor: rename FlowTaskId to FlowId
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 247d1a9 commit 1e5c9d0
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 45 deletions.
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct CreateFlowTaskData {
pub(crate) cluster_id: ClusterId,
pub(crate) state: CreateFlowTaskState,
pub(crate) task: CreateFlowTask,
pub(crate) flow_id: Option<FlowTaskId>,
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/ddl/flow_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use tonic::async_trait;

use crate::error::Result;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;

Expand All @@ -43,13 +43,13 @@ impl FlowMetadataAllocator {
}

/// Allocates a the [FlowTaskId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowTaskId> {
let flow_id = self.flow_id_sequence.next().await? as FlowTaskId;
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
Ok(flow_id)
}

/// Allocates the [FlowTaskId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec<Peer>)> {
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub const CACHE_KEY_PREFIXES: [&str; 4] = [
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;

/// The id of flow.
pub type FlowTaskId = u32;
pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;

Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::scope::MetaKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::FlowTaskId;
use crate::key::FlowId;
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;

Expand Down Expand Up @@ -132,7 +132,7 @@ impl FlowMetadataManager {
/// Creates metadata for flow and returns an error if different metadata exists.
pub async fn create_flow_metadata(
&self,
flow_id: FlowTaskId,
flow_id: FlowId,
flow_value: FlowInfoValue,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
Expand Down
18 changes: 8 additions & 10 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowPartitionId, FlowTaskId, TableMetaValue,
};
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
Expand Down Expand Up @@ -58,7 +56,7 @@ impl MetaKey<FlowInfoKey> for FlowInfoKey {

impl FlowInfoKey {
/// Returns the [FlowInfoKey].
pub fn new(catalog: String, flow_id: FlowTaskId) -> FlowInfoKey {
pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey {
let inner = FlowInfoKeyInner::new(flow_id);
FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner)))
}
Expand All @@ -69,20 +67,20 @@ impl FlowInfoKey {
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}
}

/// The key of flow metadata.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowInfoKeyInner {
flow_id: FlowTaskId,
flow_id: FlowId,
}

impl FlowInfoKeyInner {
/// Returns a [FlowInfoKey] with the specified `flow_id`.
pub fn new(flow_id: FlowTaskId) -> FlowInfoKeyInner {
pub fn new(flow_id: FlowId) -> FlowInfoKeyInner {
FlowInfoKeyInner { flow_id }
}
}
Expand All @@ -109,7 +107,7 @@ impl MetaKey<FlowInfoKeyInner> for FlowInfoKeyInner {
err_msg: format!("Invalid FlowInfoKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let flow_id = captures[1].parse::<FlowTaskId>().unwrap();
let flow_id = captures[1].parse::<FlowId>().unwrap();
Ok(FlowInfoKeyInner { flow_id })
}
}
Expand Down Expand Up @@ -161,7 +159,7 @@ impl FlowInfoManager {
}

/// Returns the [FlowTaskValue] of specified `flow_id`.
pub async fn get(&self, catalog: &str, flow_id: FlowTaskId) -> Result<Option<FlowInfoValue>> {
pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result<Option<FlowInfoValue>> {
let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes();
self.kv_backend
.get(&key)
Expand All @@ -176,7 +174,7 @@ impl FlowInfoManager {
pub(crate) fn build_create_txn(
&self,
catalog: &str,
flow_id: FlowTaskId,
flow_id: FlowId,
flow_value: &FlowInfoValue,
) -> Result<(
Txn,
Expand Down
12 changes: 5 additions & 7 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, TableMetaValue, NAME_PATTERN,
};
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;

Expand Down Expand Up @@ -112,17 +110,17 @@ impl FlowNameKeyInner {
/// The value of [FlowNameKey].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlowNameValue {
flow_id: FlowTaskId,
flow_id: FlowId,
}

impl FlowNameValue {
/// Returns a [FlowNameValue] with specified [FlowTaskId].
pub fn new(flow_id: FlowTaskId) -> Self {
pub fn new(flow_id: FlowId) -> Self {
Self { flow_id }
}

/// Returns the [FlowTaskId]
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.flow_id
}
}
Expand Down Expand Up @@ -163,7 +161,7 @@ impl FlowNameManager {
&self,
catalog: &str,
name: &str,
flow_id: FlowTaskId,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(
Expand Down
20 changes: 8 additions & 12 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowPartitionId, FlowTaskId};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand Down Expand Up @@ -62,7 +62,7 @@ impl FlownodeFlowKey {
pub fn new(
catalog: String,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> FlownodeFlowKey {
let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id);
Expand All @@ -85,7 +85,7 @@ impl FlownodeFlowKey {
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}

Expand All @@ -103,17 +103,13 @@ impl FlownodeFlowKey {
/// The key of mapping [FlownodeId] to [FlowTaskId].
pub struct FlownodeFlowKeyInner {
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}

impl FlownodeFlowKeyInner {
/// Returns a [FlownodeFlowKey] with the specified `flownode_id`, `flow_id` and `partition_id`.
pub fn new(
flownode_id: FlownodeId,
flow_id: FlowTaskId,
partition_id: FlowPartitionId,
) -> Self {
pub fn new(flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flownode_id,
flow_id,
Expand Down Expand Up @@ -158,7 +154,7 @@ impl MetaKey<FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
})?;
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_id = captures[2].parse::<FlowTaskId>().unwrap();
let flow_id = captures[2].parse::<FlowId>().unwrap();
let partition_id = captures[3].parse::<FlowPartitionId>().unwrap();

Ok(FlownodeFlowKeyInner {
Expand Down Expand Up @@ -190,7 +186,7 @@ impl FlownodeFlowManager {
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowTaskId, FlowPartitionId)>> {
) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> {
let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id);
let req = RangeRequest::new().with_prefix(start_key);

Expand All @@ -210,7 +206,7 @@ impl FlownodeFlowManager {
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_id: FlowTaskId,
flow_id: FlowId,
flownode_ids: I,
) -> Txn {
let txns = flownode_ids
Expand Down
14 changes: 7 additions & 7 deletions src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowPartitionId, FlowTaskId};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand All @@ -45,7 +45,7 @@ lazy_static! {
struct TableFlowKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
}

Expand Down Expand Up @@ -73,7 +73,7 @@ impl TableFlowKey {
catalog: String,
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKey {
let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id);
Expand Down Expand Up @@ -101,7 +101,7 @@ impl TableFlowKey {
}

/// Returns the [FlowTaskId].
pub fn flow_id(&self) -> FlowTaskId {
pub fn flow_id(&self) -> FlowId {
self.0.flow_id
}

Expand All @@ -121,7 +121,7 @@ impl TableFlowKeyInner {
fn new(
table_id: TableId,
flownode_id: FlownodeId,
flow_id: FlowTaskId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> TableFlowKeyInner {
Self {
Expand Down Expand Up @@ -170,7 +170,7 @@ impl MetaKey<TableFlowKeyInner> for TableFlowKeyInner {
// Safety: pass the regex check above
let table_id = captures[1].parse::<TableId>().unwrap();
let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
let flow_id = captures[3].parse::<FlowTaskId>().unwrap();
let flow_id = captures[3].parse::<FlowId>().unwrap();
let partition_id = captures[4].parse::<FlowPartitionId>().unwrap();
Ok(TableFlowKeyInner::new(
table_id,
Expand Down Expand Up @@ -221,7 +221,7 @@ impl TableFlowManager {
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_id: FlowTaskId,
flow_id: FlowId,
flownode_ids: I,
source_table_ids: &[TableId],
) -> Txn {
Expand Down

0 comments on commit 1e5c9d0

Please sign in to comment.