From dec136073d76e40dca5373f7ecb003bbe91ad1e4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 29 Apr 2024 13:48:19 +0000 Subject: [PATCH] chore: apply suggestions from CR --- src/common/meta/src/key.rs | 8 ++--- src/common/meta/src/key/flow.rs | 52 ++++++++++++++++----------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 862d4ac0433f..30455181dcc3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -42,10 +42,10 @@ //! 7. Flow name key: `__flow/{catalog}/name/{flow_name}` //! - Mapping {catalog}/{flow_name} to {flow_id} //! -//! 8. Flownode task key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` +//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping {flownode_id} to {flow_id} //! -//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` +//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. //! @@ -69,13 +69,13 @@ //! //! flownode/ //! {flownode_id}/ -//! {task_id}/ +//! {flow_id}/ //! {partition_id} //! //! source_table/ //! {table_id}/ //! {flownode_id}/ -//! {task_id}/ +//! {flow_id}/ //! {partition_id} pub mod catalog_name; diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index d6ee6944f9bf..cbda6aa88276 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -86,9 +86,9 @@ impl> MetaKey> for FlowScoped { pub type FlowMetadataManagerRef = Arc; /// The manager of metadata, provides ability to: -/// - Create metadata of the task. -/// - Retrieve metadata of the task. -/// - Delete metadata of the task. +/// - Create metadata of the flow. +/// - Retrieve metadata of the flow. +/// - Delete metadata of the flow. pub struct FlowMetadataManager { flow_info_manager: FlowInfoManager, flownode_flow_manager: FlownodeFlowManager, @@ -145,13 +145,13 @@ impl FlowMetadataManager { &flow_value, )?; - let create_flownode_task_txn = self.flownode_flow_manager.build_create_txn( + let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn( &flow_value.catalog_name, flow_id, flow_value.flownode_ids().clone(), ); - let create_table_task_txn = self.table_flow_manager.build_create_txn( + let create_table_flow_txn = self.table_flow_manager.build_create_txn( &flow_value.catalog_name, flow_id, flow_value.flownode_ids().clone(), @@ -161,8 +161,8 @@ impl FlowMetadataManager { let txn = Txn::merge_all(vec![ create_flow_flow_name_txn, create_flow_txn, - create_flownode_task_txn, - create_table_task_txn, + create_flownode_flow_txn, + create_table_flow_txn, ]); info!( "Creating flow {}.{}({}), with {} txn operations", @@ -273,7 +273,7 @@ mod tests { async fn test_create_flow_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); - let task_id = 10; + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), @@ -282,7 +282,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: catalog_name.to_string(), - flow_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -292,28 +292,28 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_metadata(task_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. flow_metadata_manager - .create_flow_metadata(task_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); let got = flow_metadata_manager .flow_info_manager() - .get(catalog_name, task_id) + .get(catalog_name, flow_id) .await .unwrap() .unwrap(); assert_eq!(got, flow_value); - let tasks = flow_metadata_manager + let flows = flow_metadata_manager .flownode_flow_manager() .flows(catalog_name, 1) .try_collect::>() .await .unwrap(); - assert_eq!(tasks, vec![(task_id, 0)]); + assert_eq!(flows, vec![(flow_id, 0)]); for table_id in [1024, 1025, 1026] { let nodes = flow_metadata_manager .table_flow_manager() @@ -327,7 +327,7 @@ mod tests { catalog_name.to_string(), table_id, 1, - task_id, + flow_id, 0 )] ); @@ -335,10 +335,10 @@ mod tests { } #[tokio::test] - async fn test_create_table_metadata_task_exists_err() { + async fn test_create_table_metadata_flow_exists_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv); - let task_id = 10; + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), @@ -347,7 +347,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - flow_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), @@ -357,13 +357,13 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_metadata(task_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. let flow_value = FlowInfoValue { catalog_name: catalog_name.to_string(), - flow_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -373,7 +373,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_metadata(task_id + 1, flow_value) + .create_flow_metadata(flow_id + 1, flow_value) .await .unwrap_err(); assert_matches!(err, error::Error::FlowAlreadyExists { .. }); @@ -383,7 +383,7 @@ mod tests { async fn test_create_table_metadata_unexpected_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv); - let task_id = 10; + let flow_id = 10; let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), @@ -392,7 +392,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - flow_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), @@ -402,7 +402,7 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_metadata(task_id, flow_value.clone()) + .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); // Creates again. @@ -413,7 +413,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), - flow_name: "task".to_string(), + flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), @@ -423,7 +423,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_metadata(task_id, flow_value) + .create_flow_metadata(flow_id, flow_value) .await .unwrap_err(); assert!(err.to_string().contains("Reads the different value"));