Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Nov 19, 2024
1 parent 8a94c5b commit 5a73a95
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl FlowInfoManager {
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.and_then(vec![TxnOp::Put(key.clone(), raw_value.clone())])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);

Ok((
Expand Down
2 changes: 2 additions & 0 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ impl FlowWorkerManager {
}
}

/// The arguments to create a flow in [`FlowWorkerManager`].
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {
pub flow_id: FlowId,
Expand Down Expand Up @@ -732,6 +733,7 @@ impl FlowWorkerManager {
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
Expand Down
3 changes: 3 additions & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ impl FlownodeBuilder {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
Expand Down
49 changes: 49 additions & 0 deletions tests/cases/standalone/common/flow/show_create_flow.result
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,55 @@ SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS W
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 |
+---------------------+---------------+-------------------------------------------------------------+

-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';

+---------------------+---------------+-------------------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+---------------------+---------------+-------------------------------------------------------------+
| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show WHERE number > -3 |
+---------------------+---------------+-------------------------------------------------------------+

SELECT * FROM out_num_cnt_show;

+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| 4 | 1970-01-01T00:00:00.002 |
| 10 | 1970-01-01T00:00:00.003 |
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+

INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3);

Affected Rows: 4

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');

+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+

SELECT * FROM out_num_cnt_show;

+--------+-------------------------+
| number | ts |
+--------+-------------------------+
| -2 | 1970-01-01T00:00:00.002 |
| -1 | 1970-01-01T00:00:00.003 |
| 4 | 1970-01-01T00:00:00.002 |
| 10 | 1970-01-01T00:00:00.003 |
| 11 | 1970-01-01T00:00:00.004 |
| 15 | 1970-01-01T00:00:00.001 |
| 16 | 1970-01-01T00:00:00.002 |
+--------+-------------------------+

DROP FLOW filter_numbers_show;

Affected Rows: 0
Expand Down
14 changes: 14 additions & 0 deletions tests/cases/standalone/common/flow/show_create_flow.sql
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT nu

SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';

-- makesure after recover should be the same
-- SQLNESS ARG restart=true

SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';

SELECT * FROM out_num_cnt_show;

INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3);

-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');

SELECT * FROM out_num_cnt_show;

DROP FLOW filter_numbers_show;

drop table out_num_cnt_show;
Expand Down

0 comments on commit 5a73a95

Please sign in to comment.