Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 4a66a12 commit dec1360
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
8 changes: 4 additions & 4 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
//! 7. Flow name key: `__flow/{catalog}/name/{flow_name}`
//! - Mapping {catalog}/{flow_name} to {flow_id}
//!
//! 8. Flownode task key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping {flownode_id} to {flow_id}
//!
//! 9. Table task key: `__table_task/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
Expand All @@ -69,13 +69,13 @@
//!
//! flownode/
//! {flownode_id}/
//! {task_id}/
//! {flow_id}/
//! {partition_id}
//!
//! source_table/
//! {table_id}/
//! {flownode_id}/
//! {task_id}/
//! {flow_id}/
//! {partition_id}
pub mod catalog_name;
Expand Down
52 changes: 26 additions & 26 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ impl<T: MetaKey<T>> MetaKey<FlowScoped<T>> for FlowScoped<T> {
pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;

/// The manager of metadata, provides ability to:
/// - Create metadata of the task.
/// - Retrieve metadata of the task.
/// - Delete metadata of the task.
/// - Create metadata of the flow.
/// - Retrieve metadata of the flow.
/// - Delete metadata of the flow.
pub struct FlowMetadataManager {
flow_info_manager: FlowInfoManager,
flownode_flow_manager: FlownodeFlowManager,
Expand Down Expand Up @@ -145,13 +145,13 @@ impl FlowMetadataManager {
&flow_value,
)?;

let create_flownode_task_txn = self.flownode_flow_manager.build_create_txn(
let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
);

let create_table_task_txn = self.table_flow_manager.build_create_txn(
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
&flow_value.catalog_name,
flow_id,
flow_value.flownode_ids().clone(),
Expand All @@ -161,8 +161,8 @@ impl FlowMetadataManager {
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flownode_task_txn,
create_table_task_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
Expand Down Expand Up @@ -273,7 +273,7 @@ mod tests {
async fn test_create_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let task_id = 10;
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
Expand All @@ -282,7 +282,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: catalog_name.to_string(),
flow_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
Expand All @@ -292,28 +292,28 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_metadata(task_id, flow_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
flow_metadata_manager
.create_flow_metadata(task_id, flow_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(catalog_name, task_id)
.get(catalog_name, flow_id)
.await
.unwrap()
.unwrap();
assert_eq!(got, flow_value);
let tasks = flow_metadata_manager
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(catalog_name, 1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(tasks, vec![(task_id, 0)]);
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
Expand All @@ -327,18 +327,18 @@ mod tests {
catalog_name.to_string(),
table_id,
1,
task_id,
flow_id,
0
)]
);
}
}

#[tokio::test]
async fn test_create_table_metadata_task_exists_err() {
async fn test_create_table_metadata_flow_exists_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let task_id = 10;
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
Expand All @@ -347,7 +347,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
Expand All @@ -357,13 +357,13 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_metadata(task_id, flow_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
let flow_value = FlowInfoValue {
catalog_name: catalog_name.to_string(),
flow_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
Expand All @@ -373,7 +373,7 @@ mod tests {
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_metadata(task_id + 1, flow_value)
.create_flow_metadata(flow_id + 1, flow_value)
.await
.unwrap_err();
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
Expand All @@ -383,7 +383,7 @@ mod tests {
async fn test_create_table_metadata_unexpected_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let task_id = 10;
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
Expand All @@ -392,7 +392,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
Expand All @@ -402,7 +402,7 @@ mod tests {
options: Default::default(),
};
flow_metadata_manager
.create_flow_metadata(task_id, flow_value.clone())
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
Expand All @@ -413,7 +413,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "task".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
Expand All @@ -423,7 +423,7 @@ mod tests {
options: Default::default(),
};
let err = flow_metadata_manager
.create_flow_metadata(task_id, flow_value)
.create_flow_metadata(flow_id, flow_value)
.await
.unwrap_err();
assert!(err.to_string().contains("Reads the different value"));
Expand Down

0 comments on commit dec1360

Please sign in to comment.