Skip to content

Commit

Permalink
feat: 🌈 list_sensors also for BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Aug 20, 2024
1 parent b126dc1 commit 190f052
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 19 deletions.
140 changes: 140 additions & 0 deletions src/storage/bigquery/bigquery_crud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::collections::BTreeMap;

use crate::{
crud::{list_cursor::ListCursor, viewmodel::sensor_viewmodel::SensorViewModel},
datamodel::sensor,
};
use anyhow::{anyhow, Result};
use gcp_bigquery_client::model::{
query_parameter::QueryParameter, query_parameter_type::QueryParameterType,
query_parameter_value::QueryParameterValue, query_request::QueryRequest,
};
use time::OffsetDateTime;

use super::BigQueryStorage;

pub async fn list_sensors(
bqs: &BigQueryStorage,
cursor: ListCursor,
limit: usize,
) -> Result<(Vec<SensorViewModel>, Option<ListCursor>)> {
// We fetch the limit + 1 to know if there is a next page
let query_limit = limit + 1;

let mut query_request = QueryRequest::new(
r#"
SELECT uuid, name, UNIX_MICROS(created_at) as created_at, type, unit, labels
FROM `{dataset_id}.sensor_labels_view`
WHERE created_at > TIMESTAMP_MICROS(@created_at)
OR (created_at = TIMESTAMP_MICROS(@created_at) AND uuid >= @uuid)
ORDER BY created_at ASC, uuid ASC
LIMIT @limit
"#
.replace("{dataset_id}", bqs.dataset_id()),
);
// BigQuery doesn't support the >= operator on STRUCT:
// Greater than is not defined for arguments of type STRUCT<TIMESTAMP, STRING>
// WHERE (created_at, uuid) >= (TIMESTAMP_MILLIS(@created_at), @uuid)

let limit_query_parameter = QueryParameter {
name: Some("limit".to_string()),
parameter_type: Some(QueryParameterType {
r#type: "INT64".to_string(),
struct_types: None,
array_type: None,
}),
parameter_value: Some(QueryParameterValue {
value: Some(query_limit.to_string()),
struct_values: None,
array_values: None,
}),
};
let created_at_query_parameter = QueryParameter {
name: Some("created_at".to_string()),
parameter_type: Some(QueryParameterType {
r#type: "INT64".to_string(),
struct_types: None,
array_type: None,
}),
parameter_value: Some(QueryParameterValue {
value: Some(cursor.next_created_at),
struct_values: None,
array_values: None,
}),
};
let uuid_query_parameter = QueryParameter {
name: Some("uuid".to_string()),
parameter_type: Some(QueryParameterType {
r#type: "STRING".to_string(),
struct_types: None,
array_type: None,
}),
parameter_value: Some(QueryParameterValue {
value: Some(cursor.next_uuid),
struct_values: None,
array_values: None,
}),
};

query_request.query_parameters = Some(vec![
limit_query_parameter,
created_at_query_parameter,
uuid_query_parameter,
]);

let mut result = bqs
.client()
.read()
.await
.job()
.query(bqs.project_id(), query_request)
.await?;

let mut sensors_views: Vec<SensorViewModel> = Vec::with_capacity(result.row_count());
let mut cursor: Option<ListCursor> = None;

while result.next_row() {
let uuid_string: String = result
.get_string(0)?
.ok_or_else(|| anyhow!("uuid is null"))?;
let uuid = uuid::Uuid::parse_str(&uuid_string)?;
let name: String = result
.get_string(1)?
.ok_or_else(|| anyhow!("name is null"))?;
let created_at: i64 = result
.get_i64(2)?
.ok_or_else(|| anyhow!("created_at is null"))?;

let created_at_offset_datetime =
OffsetDateTime::from_unix_timestamp_nanos((created_at as i128) * 1_000)?;
let created_at_rfc3339: String =
created_at_offset_datetime.format(&::time::format_description::well_known::Rfc3339)?;

// If we reached the limit, we use the value as a cursor
if sensors_views.len() == limit {
cursor = Some(ListCursor::new(created_at.to_string(), uuid_string));
break;
}

let sensor_type: String = result
.get_string(3)?
.ok_or_else(|| anyhow!("type is null"))?;
let unit: Option<String> = result.get_string(4)?;
let labels_json: String = result
.get_string(5)?
.ok_or_else(|| anyhow!("labels is null"))?;
let labels: BTreeMap<String, String> = serde_json::from_str(&labels_json)?;

let sensor_view_model = SensorViewModel {
uuid,
name,
created_at: Some(created_at_rfc3339),
sensor_type,
unit,
labels,
};
sensors_views.push(sensor_view_model);
}

Ok((sensors_views, cursor))
}
28 changes: 10 additions & 18 deletions src/storage/bigquery/migrations/20240223133248_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,13 @@ ON
s.sensor_id = nv.sensor_id;

CREATE OR REPLACE VIEW `{dataset_id}.sensor_labels_view` AS
SELECT
s.sensor_id,
s.uuid,
s.name AS sensor_name,
s.type,
s.unit,
(
SELECT JSON_OBJECT(
ARRAY_AGG(lnd.name),
ARRAY_AGG(ldd.description)
)
FROM `{dataset_id}.labels` l
LEFT JOIN `{dataset_id}.labels_name_dictionary` lnd ON l.name = lnd.id
LEFT JOIN `{dataset_id}.labels_description_dictionary` ldd ON l.description = ldd.id
WHERE l.sensor_id = s.sensor_id
) AS labels
FROM
`{dataset_id}.sensors` s;
SELECT sensors.uuid, sensors.created_at, sensors.name, type, units.name as unit, JSON_OBJECT(
ARRAY_AGG(labels_name_dictionary.name), ARRAY_AGG(labels_description_dictionary.description)
) AS labels
FROM `{dataset_id}.sensors` as sensors
LEFT JOIN `{dataset_id}.units` as units on sensors.unit = units.id
LEFT JOIN `{dataset_id}.labels` as labels on sensors.sensor_id = labels.sensor_id
LEFT JOIN `{dataset_id}.labels_name_dictionary` as labels_name_dictionary on labels.name = labels_name_dictionary.id
LEFT JOIN `{dataset_id}.labels_description_dictionary` as labels_description_dictionary on labels.description = labels_description_dictionary.id
GROUP BY sensors.sensor_id, sensors.uuid, sensors.created_at, sensors.name, type, units.name
ORDER BY sensors.created_at ASC, sensors.uuid ASC;
4 changes: 3 additions & 1 deletion src/storage/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use bigquery_crud::list_sensors;
use bigquery_publishers::{
publish_blob_values, publish_boolean_values, publish_float_values, publish_integer_values,
publish_json_values, publish_location_values, publish_numeric_values, publish_string_values,
Expand All @@ -21,6 +22,7 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{sync::RwLock, time::timeout};
use url::Url;

mod bigquery_crud;
mod bigquery_labels_utilities;
mod bigquery_prost_structs;
mod bigquery_publishers;
Expand Down Expand Up @@ -241,6 +243,6 @@ impl StorageInstance for BigQueryStorage {
cursor: ListCursor,
limit: usize,
) -> Result<(Vec<SensorViewModel>, Option<ListCursor>)> {
unimplemented!();
list_sensors(self, cursor, limit).await
}
}

0 comments on commit 190f052

Please sign in to comment.