Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 29, 2024
1 parent 6c286be commit f9c02f6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 26 deletions.
40 changes: 19 additions & 21 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ impl CreateFlowProcedure {
Ok(Status::executing(true))
}

async fn on_flownode_create_flow(&mut self) -> Result<Status> {
async fn on_flownode_create_flows(&mut self) -> Result<Status> {
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())),
body: Some(PbFlowRequest::Create((&self.data).into())),
};
create_flow_task.push(async move {
requester
Expand Down Expand Up @@ -117,7 +117,7 @@ impl CreateFlowProcedure {
// TODO(weny): Support `or_replace`.
self.context
.flow_task_metadata_manager
.create_flow_task_metadata(flow_task_id, self.data.to_flow_task_info_value())
.create_flow_task_metadata(flow_task_id, (&self.data).into())
.await?;
info!("Created flow task metadata for flow {flow_task_id}");
Ok(Status::done_with_output(flow_task_id))
Expand All @@ -139,7 +139,7 @@ impl Procedure for CreateFlowProcedure {

match state {
CreateFlowTaskState::Prepare => self.on_prepare().await,
CreateFlowTaskState::CreateFlows => self.on_flownode_create_flow().await,
CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
Expand Down Expand Up @@ -182,32 +182,30 @@ pub struct CreateFlowTaskData {
pub(crate) source_table_ids: Vec<TableId>,
}

impl CreateFlowTaskData {
/// Converts to [CreateRequest]
/// # Panic
/// Panic if the `flow_task_id` is None.
fn to_create_flow_request(&self) -> CreateRequest {
let flow_task_id = self.flow_task_id.unwrap();
let source_table_ids = &self.source_table_ids;
impl From<&CreateFlowTaskData> for CreateRequest {
fn from(value: &CreateFlowTaskData) -> Self {
let flow_task_id = value.flow_task_id.unwrap();
let source_table_ids = &value.source_table_ids;

CreateRequest {
task_id: Some(api::v1::flow::TaskId { id: flow_task_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(self.task.sink_table_name.clone().into()),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
create_if_not_exists: true,
expire_when: self.task.expire_when.clone(),
comment: self.task.comment.clone(),
sql: self.task.sql.clone(),
task_options: self.task.options.clone(),
expire_when: value.task.expire_when.clone(),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
task_options: value.task.options.clone(),
}
}
}

/// Converts to [FlowTaskInfoValue].
fn to_flow_task_info_value(&self) -> FlowTaskInfoValue {
impl From<&CreateFlowTaskData> for FlowTaskInfoValue {
fn from(value: &CreateFlowTaskData) -> Self {
let CreateFlowTask {
catalog_name,
task_name,
Expand All @@ -217,17 +215,17 @@ impl CreateFlowTaskData {
sql,
options,
..
} = self.task.clone();
} = value.task.clone();

let flownode_ids = self
let flownode_ids = value
.peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();

FlowTaskInfoValue {
source_table_ids: self.source_table_ids.clone(),
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/src/ddl/create_flow/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::error::{self, Result};
impl CreateFlowProcedure {
/// Checks:
/// - The new task name doesn't exist.
/// - The source tables exist.
pub(crate) async fn check_creation(&self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
Expand Down
5 changes: 2 additions & 3 deletions src/common/meta/src/ddl/create_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::key::table_name::TableNameKey;
impl CreateFlowProcedure {
/// Allocates the [FlowTaskId].
pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
// TODO(weny, ruihang): We don't support the partitions. It's always be 1, now.
let partitions = 1;
let (flow_task_id, peers) = self
.context
Expand All @@ -34,10 +34,9 @@ impl CreateFlowProcedure {
Ok(())
}

/// Collects source table ids
/// Ensures all source tables exist and collects source table ids
pub(crate) async fn collect_source_tables(&mut self) -> Result<()> {
// Ensures all source tables exist.

let keys = self
.data
.task
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub enum Error {
location: Location,
},

#[snafu(display("Task already exists, task: {}", task_name,))]
#[snafu(display("Task already exists: {}", task_name))]
TaskAlreadyExists {
task_name: String,
location: Location,
Expand Down

0 comments on commit f9c02f6

Please sign in to comment.