From 190f052ca2322955bdd54e8c28b39e397cc17926 Mon Sep 17 00:00:00 2001 From: Antoine Pultier <45740+fungiboletus@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:52:27 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8C=88=20list=5Fsensors=20also=20?= =?UTF-8?q?for=20BigQuery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/storage/bigquery/bigquery_crud.rs | 140 ++++++++++++++++++ .../migrations/20240223133248_init.sql | 28 ++-- src/storage/bigquery/mod.rs | 4 +- 3 files changed, 153 insertions(+), 19 deletions(-) create mode 100644 src/storage/bigquery/bigquery_crud.rs diff --git a/src/storage/bigquery/bigquery_crud.rs b/src/storage/bigquery/bigquery_crud.rs new file mode 100644 index 0000000..6f86de8 --- /dev/null +++ b/src/storage/bigquery/bigquery_crud.rs @@ -0,0 +1,140 @@ +use std::collections::BTreeMap; + +use crate::{ + crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel}, + datamodel::sensor, +}; +use anyhow::{anyhow, Result}; +use gcp_bigquery_client::model::{ + query_parameter::QueryParameter, query_parameter_type::QueryParameterType, + query_parameter_value::QueryParameterValue, query_request::QueryRequest, +}; +use time::OffsetDateTime; + +use super::BigQueryStorage; + +pub async fn list_sensors( + bqs: &BigQueryStorage, + cursor: ListCursor, + limit: usize, +) -> Result<(Vec, Option)> { + // We fetch the limit + 1 to know if there is a next page + let query_limit = limit + 1; + + let mut query_request = QueryRequest::new( + r#" + SELECT uuid, name, UNIX_MICROS(created_at) as created_at, type, unit, labels + FROM `{dataset_id}.sensor_labels_view` + WHERE created_at > TIMESTAMP_MICROS(@created_at) + OR (created_at = TIMESTAMP_MICROS(@created_at) AND uuid >= @uuid) + ORDER BY created_at ASC, uuid ASC + LIMIT @limit + "# + .replace("{dataset_id}", bqs.dataset_id()), + ); + // BigQuery doesn't support the >= operator on STRUCT: + // Greater than is not defined for arguments of type STRUCT + // WHERE (created_at, uuid) >= (TIMESTAMP_MILLIS(@created_at), @uuid) + + let limit_query_parameter = QueryParameter { + name: Some("limit".to_string()), + parameter_type: Some(QueryParameterType { + r#type: "INT64".to_string(), + struct_types: None, + array_type: None, + }), + parameter_value: Some(QueryParameterValue { + value: Some(query_limit.to_string()), + struct_values: None, + array_values: None, + }), + }; + let created_at_query_parameter = QueryParameter { + name: Some("created_at".to_string()), + parameter_type: Some(QueryParameterType { + r#type: "INT64".to_string(), + struct_types: None, + array_type: None, + }), + parameter_value: Some(QueryParameterValue { + value: Some(cursor.next_created_at), + struct_values: None, + array_values: None, + }), + }; + let uuid_query_parameter = QueryParameter { + name: Some("uuid".to_string()), + parameter_type: Some(QueryParameterType { + r#type: "STRING".to_string(), + struct_types: None, + array_type: None, + }), + parameter_value: Some(QueryParameterValue { + value: Some(cursor.next_uuid), + struct_values: None, + array_values: None, + }), + }; + + query_request.query_parameters = Some(vec![ + limit_query_parameter, + created_at_query_parameter, + uuid_query_parameter, + ]); + + let mut result = bqs + .client() + .read() + .await + .job() + .query(bqs.project_id(), query_request) + .await?; + + let mut sensors_views: Vec = Vec::with_capacity(result.row_count()); + let mut cursor: Option = None; + + while result.next_row() { + let uuid_string: String = result + .get_string(0)? + .ok_or_else(|| anyhow!("uuid is null"))?; + let uuid = uuid::Uuid::parse_str(&uuid_string)?; + let name: String = result + .get_string(1)? + .ok_or_else(|| anyhow!("name is null"))?; + let created_at: i64 = result + .get_i64(2)? + .ok_or_else(|| anyhow!("created_at is null"))?; + + let created_at_offset_datetime = + OffsetDateTime::from_unix_timestamp_nanos((created_at as i128) * 1_000)?; + let created_at_rfc3339: String = + created_at_offset_datetime.format(&::time::format_description::well_known::Rfc3339)?; + + // If we reached the limit, we use the value as a cursor + if sensors_views.len() == limit { + cursor = Some(ListCursor::new(created_at.to_string(), uuid_string)); + break; + } + + let sensor_type: String = result + .get_string(3)? + .ok_or_else(|| anyhow!("type is null"))?; + let unit: Option = result.get_string(4)?; + let labels_json: String = result + .get_string(5)? + .ok_or_else(|| anyhow!("labels is null"))?; + let labels: BTreeMap = serde_json::from_str(&labels_json)?; + + let sensor_view_model = SensorViewModel { + uuid, + name, + created_at: Some(created_at_rfc3339), + sensor_type, + unit, + labels, + }; + sensors_views.push(sensor_view_model); + } + + Ok((sensors_views, cursor)) +} diff --git a/src/storage/bigquery/migrations/20240223133248_init.sql b/src/storage/bigquery/migrations/20240223133248_init.sql index 9e2a277..f7a9812 100644 --- a/src/storage/bigquery/migrations/20240223133248_init.sql +++ b/src/storage/bigquery/migrations/20240223133248_init.sql @@ -132,21 +132,13 @@ ON s.sensor_id = nv.sensor_id; CREATE OR REPLACE VIEW `{dataset_id}.sensor_labels_view` AS -SELECT - s.sensor_id, - s.uuid, - s.name AS sensor_name, - s.type, - s.unit, - ( - SELECT JSON_OBJECT( - ARRAY_AGG(lnd.name), - ARRAY_AGG(ldd.description) - ) - FROM `{dataset_id}.labels` l - LEFT JOIN `{dataset_id}.labels_name_dictionary` lnd ON l.name = lnd.id - LEFT JOIN `{dataset_id}.labels_description_dictionary` ldd ON l.description = ldd.id - WHERE l.sensor_id = s.sensor_id - ) AS labels -FROM - `{dataset_id}.sensors` s; +SELECT sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit, JSON_OBJECT( + ARRAY_AGG(labels_name_dictionary.name), ARRAY_AGG(labels_description_dictionary.description) +) AS labels +FROM `{dataset_id}.sensors` as sensors +LEFT JOIN `{dataset_id}.units` as units on sensors.unit = units.id +LEFT JOIN `{dataset_id}.labels` as labels on sensors.sensor_id = labels.sensor_id +LEFT JOIN `{dataset_id}.labels_name_dictionary` as labels_name_dictionary on labels.name = labels_name_dictionary.id +LEFT JOIN `{dataset_id}.labels_description_dictionary` as labels_description_dictionary on labels.description = labels_description_dictionary.id +GROUP BY sensors.sensor_id, sensors.uuid, sensors.created_at, sensors.name, type, units.name +ORDER BY sensors.created_at ASC, sensors.uuid ASC; diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index 5b8ba1f..4205542 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -4,6 +4,7 @@ use crate::{ }; use anyhow::{bail, Result}; use async_trait::async_trait; +use bigquery_crud::list_sensors; use bigquery_publishers::{ publish_blob_values, publish_boolean_values, publish_float_values, publish_integer_values, publish_json_values, publish_location_values, publish_numeric_values, publish_string_values, @@ -21,6 +22,7 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{sync::RwLock, time::timeout}; use url::Url; +mod bigquery_crud; mod bigquery_labels_utilities; mod bigquery_prost_structs; mod bigquery_publishers; @@ -241,6 +243,6 @@ impl StorageInstance for BigQueryStorage { cursor: ListCursor, limit: usize, ) -> Result<(Vec, Option)> { - unimplemented!(); + list_sensors(self, cursor, limit).await } }