Skip to content

Commit

Permalink
chore: better column schema check for flow (#4855)
Browse files Browse the repository at this point in the history
* chore: better column schema check for flow

* chore: better msg

* tests: clean up after tests

* chore: better msg

* chore: per review

* tests: sqlness
  • Loading branch information
discord9 authored Oct 24, 2024
1 parent aa9a265 commit ff38abd
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 9 deletions.
26 changes: 22 additions & 4 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,17 @@ impl FlowWorkerManager {
let rows_proto: Vec<v1::Row> = insert
.into_iter()
.map(|(mut row, _ts)| {
// `update_at` col
row.extend([Value::from(common_time::Timestamp::new_millisecond(
now,
))]);
// extend `update_at` col if needed
// if schema include a millisecond timestamp here, and result row doesn't have it, add it
if row.len() < proto_schema.len()
&& proto_schema[row.len()].datatype
== greptime_proto::v1::ColumnDataType::TimestampMillisecond
as i32
{
row.extend([Value::from(
common_time::Timestamp::new_millisecond(now),
)]);
}
// ts col, if auto create
if is_ts_placeholder {
ensure!(
Expand All @@ -291,6 +298,17 @@ impl FlowWorkerManager {
common_time::Timestamp::new_millisecond(0),
)]);
}
if row.len() != proto_schema.len() {
InternalSnafu {
reason: format!(
"Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
proto_schema.len(),
row.len(),
proto_schema.iter().map(|c|&c.column_name).collect_vec()
),
}
.fail()?;
}
Ok(row.into())
})
.collect::<Result<Vec<_>, Error>>()?;
Expand Down
85 changes: 82 additions & 3 deletions tests/cases/standalone/common/flow/flow_basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ admin flush_flow('test_distinct_basic');
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

Affected Rows: 2
Affected Rows: 3

admin flush_flow('test_distinct_basic');

Expand Down Expand Up @@ -739,7 +740,8 @@ CREATE TABLE ngx_distribution (
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP, -- auto generated column by flow engine
-- auto generated column by flow engine
update_at TIMESTAMP,
PRIMARY KEY(stat, bucket_size)
);

Expand All @@ -748,7 +750,7 @@ Affected Rows: 0
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1)::INT as bucket_size,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
Expand Down Expand Up @@ -837,3 +839,80 @@ DROP TABLE ngx_distribution;

Affected Rows: 0

CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
);

Affected Rows: 0

CREATE TABLE requests_without_ip (
service_name STRING,
val INT,
ts TIMESTAMP TIME INDEX,
);

Affected Rows: 0

CREATE FLOW requests_long_term SINK TO requests_without_ip AS
SELECT
service_name,
val,
ts
FROM
requests;

Affected Rows: 0

INSERT INTO
requests
VALUES
("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");

Affected Rows: 8

admin flush_flow('requests_long_term');

+----------------------------------------+
| ADMIN flush_flow('requests_long_term') |
+----------------------------------------+
| 1 |
+----------------------------------------+

SELECT
*
FROM
requests_without_ip;

+--------------+-----+---------------------+
| service_name | val | ts |
+--------------+-----+---------------------+
| svc1 | 100 | 2024-10-18T19:00:00 |
| svc1 | 200 | 2024-10-18T19:00:30 |
| svc1 | 300 | 2024-10-18T19:01:00 |
| svc1 | 100 | 2024-10-18T19:01:01 |
| svc1 | 400 | 2024-10-18T19:01:30 |
| svc1 | 200 | 2024-10-18T19:01:31 |
+--------------+-----+---------------------+

DROP FLOW requests_long_term;

Affected Rows: 0

DROP TABLE requests_without_ip;

Affected Rows: 0

DROP TABLE requests;

Affected Rows: 0

52 changes: 50 additions & 2 deletions tests/cases/standalone/common/flow/flow_basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ admin flush_flow('test_distinct_basic');
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");

Expand Down Expand Up @@ -405,14 +406,15 @@ CREATE TABLE ngx_distribution (
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP, -- auto generated column by flow engine
-- auto generated column by flow engine
update_at TIMESTAMP,
PRIMARY KEY(stat, bucket_size)
);

CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1)::INT as bucket_size,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
Expand Down Expand Up @@ -461,3 +463,49 @@ DROP FLOW calc_ngx_distribution;
DROP TABLE ngx_access_log;

DROP TABLE ngx_distribution;

CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
);

CREATE TABLE requests_without_ip (
service_name STRING,
val INT,
ts TIMESTAMP TIME INDEX,
);

CREATE FLOW requests_long_term SINK TO requests_without_ip AS
SELECT
service_name,
val,
ts
FROM
requests;

INSERT INTO
requests
VALUES
("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");

admin flush_flow('requests_long_term');

SELECT
*
FROM
requests_without_ip;

DROP FLOW requests_long_term;

DROP TABLE requests_without_ip;

DROP TABLE requests;

0 comments on commit ff38abd

Please sign in to comment.