diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 3a8acd60e75a..d564e1dd0971 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -271,10 +271,17 @@ impl FlowWorkerManager { let rows_proto: Vec = insert .into_iter() .map(|(mut row, _ts)| { - // `update_at` col - row.extend([Value::from(common_time::Timestamp::new_millisecond( - now, - ))]); + // extend `update_at` col if needed + // if schema include a millisecond timestamp here, and result row doesn't have it, add it + if row.len() < proto_schema.len() + && proto_schema[row.len()].datatype + == greptime_proto::v1::ColumnDataType::TimestampMillisecond + as i32 + { + row.extend([Value::from( + common_time::Timestamp::new_millisecond(now), + )]); + } // ts col, if auto create if is_ts_placeholder { ensure!( @@ -291,6 +298,17 @@ impl FlowWorkerManager { common_time::Timestamp::new_millisecond(0), )]); } + if row.len() != proto_schema.len() { + InternalSnafu { + reason: format!( + "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}", + proto_schema.len(), + row.len(), + proto_schema.iter().map(|c|&c.column_name).collect_vec() + ), + } + .fail()?; + } Ok(row.into()) }) .collect::, Error>>()?; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 6aaa2c74be5b..33d4ddacba90 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -140,10 +140,11 @@ admin flush_flow('test_distinct_basic'); INSERT INTO distinct_basic VALUES + (20, "2021-07-01 00:00:00.200"), (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -Affected Rows: 2 +Affected Rows: 3 admin flush_flow('test_distinct_basic'); @@ -739,7 +740,8 @@ CREATE TABLE ngx_distribution ( bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, - update_at TIMESTAMP, -- auto generated column by flow engine + -- auto generated column by flow engine + update_at TIMESTAMP, PRIMARY KEY(stat, bucket_size) ); @@ -748,7 +750,7 @@ Affected Rows: 0 CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, - trunc(size, -1)::INT as bucket_size, + trunc(size, -1) :: INT as bucket_size, count(client) AS total_logs, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM @@ -837,3 +839,80 @@ DROP TABLE ngx_distribution; Affected Rows: 0 +CREATE TABLE requests ( + service_name STRING, + service_ip STRING, + val INT, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE requests_without_ip ( + service_name STRING, + val INT, + ts TIMESTAMP TIME INDEX, +); + +Affected Rows: 0 + +CREATE FLOW requests_long_term SINK TO requests_without_ip AS +SELECT + service_name, + val, + ts +FROM + requests; + +Affected Rows: 0 + +INSERT INTO + requests +VALUES + ("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"), + ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +Affected Rows: 8 + +admin flush_flow('requests_long_term'); + ++----------------------------------------+ +| ADMIN flush_flow('requests_long_term') | ++----------------------------------------+ +| 1 | ++----------------------------------------+ + +SELECT + * +FROM + requests_without_ip; + ++--------------+-----+---------------------+ +| service_name | val | ts | ++--------------+-----+---------------------+ +| svc1 | 100 | 2024-10-18T19:00:00 | +| svc1 | 200 | 2024-10-18T19:00:30 | +| svc1 | 300 | 2024-10-18T19:01:00 | +| svc1 | 100 | 2024-10-18T19:01:01 | +| svc1 | 400 | 2024-10-18T19:01:30 | +| svc1 | 200 | 2024-10-18T19:01:31 | ++--------------+-----+---------------------+ + +DROP FLOW requests_long_term; + +Affected Rows: 0 + +DROP TABLE requests_without_ip; + +Affected Rows: 0 + +DROP TABLE requests; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 9356508aba4f..179a19195d8f 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -79,6 +79,7 @@ admin flush_flow('test_distinct_basic'); INSERT INTO distinct_basic VALUES + (20, "2021-07-01 00:00:00.200"), (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); @@ -405,14 +406,15 @@ CREATE TABLE ngx_distribution ( bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, - update_at TIMESTAMP, -- auto generated column by flow engine + -- auto generated column by flow engine + update_at TIMESTAMP, PRIMARY KEY(stat, bucket_size) ); CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, - trunc(size, -1)::INT as bucket_size, + trunc(size, -1) :: INT as bucket_size, count(client) AS total_logs, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM @@ -461,3 +463,49 @@ DROP FLOW calc_ngx_distribution; DROP TABLE ngx_access_log; DROP TABLE ngx_distribution; + +CREATE TABLE requests ( + service_name STRING, + service_ip STRING, + val INT, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE requests_without_ip ( + service_name STRING, + val INT, + ts TIMESTAMP TIME INDEX, +); + +CREATE FLOW requests_long_term SINK TO requests_without_ip AS +SELECT + service_name, + val, + ts +FROM + requests; + +INSERT INTO + requests +VALUES + ("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), + ("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), + ("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"), + ("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"), + ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), + ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); + +admin flush_flow('requests_long_term'); + +SELECT + * +FROM + requests_without_ip; + +DROP FLOW requests_long_term; + +DROP TABLE requests_without_ip; + +DROP TABLE requests; \ No newline at end of file