diff --git a/Cargo.lock b/Cargo.lock index 595145e..bff2ff5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2926,6 +2926,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.55", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -3266,12 +3289,14 @@ dependencies = [ "num-traits", "once_cell", "polars", + "prost", "regex", "rust_decimal", "serde", "serde_json", "sindit-senml", "smallvec 1.13.2", + "snap", "sqlx", "tokio", "tokio-stream", @@ -3496,6 +3521,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.6" diff --git a/Cargo.toml b/Cargo.toml index d8dd7f1..130b6e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,8 @@ config = "0.14" serde = "1.0" confique = "0.2" byte-unit = "5.1" +prost = "0.12" +snap = "1.1" hex = "0.4" blake3 = "1.5" regex = "1.10" diff --git a/src/datamodel/sensapp_datetime.rs b/src/datamodel/sensapp_datetime.rs index b7de905..bf5d046 100644 --- a/src/datamodel/sensapp_datetime.rs +++ b/src/datamodel/sensapp_datetime.rs @@ -1,6 +1,29 @@ pub type SensAppDateTime = hifitime::Epoch; use anyhow::Result; +pub trait SensAppDateTimeExt { + fn from_unix_nanoseconds_i64(timestamp: i64) -> Self; + fn from_unix_microseconds_i64(timestamp: i64) -> Self; + fn from_unix_milliseconds_i64(timestamp: i64) -> Self; + fn from_unix_seconds_i64(timestamp: i64) -> Self; +} + +impl SensAppDateTimeExt for SensAppDateTime { + fn from_unix_nanoseconds_i64(timestamp: i64) -> Self { + Self::from_unix_duration(hifitime::Duration::from_truncated_nanoseconds(timestamp)) + } + fn from_unix_microseconds_i64(timestamp: i64) -> Self { + Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Microsecond) + } + fn from_unix_milliseconds_i64(timestamp: i64) -> Self { + Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Millisecond) + } + fn from_unix_seconds_i64(timestamp: i64) -> Self { + Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Second) + } +} + +use hifitime::{Unit, UNIX_REF_EPOCH}; use sqlx::types::time::OffsetDateTime; pub fn sensapp_datetime_to_offset_datetime(datetime: &SensAppDateTime) -> Result { let unix_timestamp = datetime.to_unix_seconds().floor() as i128; diff --git a/src/http/influxdb.rs b/src/http/influxdb.rs index 6e492f2..a3650d8 100644 --- a/src/http/influxdb.rs +++ b/src/http/influxdb.rs @@ -1,6 +1,7 @@ use super::{app_error::AppError, state::HttpServerState}; use crate::datamodel::{ - batch_builder::BatchBuilder, SensAppDateTime, Sensor, SensorType, TypedSamples, + batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt, SensAppDateTime, Sensor, + SensorType, TypedSamples, }; use anyhow::Result; use axum::{ @@ -122,13 +123,13 @@ pub async fn publish_influxdb( }): Query, bytes: Bytes, ) -> Result { - println!("InfluxDB publish"); - println!("bucket: {}", bucket); - println!("org: {:?}", org); - println!("org_id: {:?}", org_id); - println!("precision: {:?}", precision); - //println!("bytes: {:?}", bytes); - println!("headers: {:?}", headers); + // println!("InfluxDB publish"); + // println!("bucket: {}", bucket); + // println!("org: {:?}", org); + // println!("org_id: {:?}", org_id); + // println!("precision: {:?}", precision); + // println!("bytes: {:?}", bytes); + // println!("headers: {:?}", headers); // Requires org or org_id if org.is_none() && org_id.is_none() { @@ -183,16 +184,16 @@ pub async fn publish_influxdb( let datetime = match line.timestamp { Some(timestamp) => match precision_enum { - Precision::Nanoseconds => SensAppDateTime::from_unix_duration( - hifitime::Duration::from_truncated_nanoseconds(timestamp), - ), - Precision::Microseconds => SensAppDateTime::from_unix_duration( - hifitime::Duration::from_microseconds(timestamp as f64), - ), + Precision::Nanoseconds => { + SensAppDateTime::from_unix_nanoseconds_i64(timestamp) + } + Precision::Microseconds => { + SensAppDateTime::from_unix_microseconds_i64(timestamp) + } Precision::Milliseconds => { - SensAppDateTime::from_unix_milliseconds(timestamp as f64) + SensAppDateTime::from_unix_milliseconds_i64(timestamp) } - Precision::Seconds => SensAppDateTime::from_unix_seconds(timestamp as f64), + Precision::Seconds => SensAppDateTime::from_unix_seconds_i64(timestamp), }, None => match SensAppDateTime::now() { Ok(datetime) => datetime, diff --git a/src/http/mod.rs b/src/http/mod.rs index dd67d5c..30bd13e 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,4 +1,5 @@ pub mod app_error; pub mod influxdb; +pub mod prometheus; pub mod server; pub mod state; diff --git a/src/http/prometheus.rs b/src/http/prometheus.rs new file mode 100644 index 0000000..48cb5db --- /dev/null +++ b/src/http/prometheus.rs @@ -0,0 +1,162 @@ +use std::sync::Arc; + +use crate::{ + datamodel::{ + batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt, + sensapp_vec::SensAppLabels, unit::Unit, Sample, SensAppDateTime, Sensor, SensorType, + TypedSamples, + }, + parsing::prometheus::remote_write_parser::parse_remote_write_request, +}; + +use super::{app_error::AppError, state::HttpServerState}; +use anyhow::Result; +use axum::{ + debug_handler, + extract::State, + http::{HeaderMap, StatusCode}, +}; +use tokio_util::bytes::Bytes; + +fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> { + // Check that we have the right content encoding, that must be snappy + match headers.get("content-encoding") { + Some(content_encoding) => match content_encoding.to_str() { + Ok("snappy") | Ok("SNAPPY") => {} + _ => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Unsupported content-encoding, must be snappy" + ))); + } + }, + None => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Missing content-encoding header" + ))); + } + } + + // Check that the content type is protocol buffer + match headers.get("content-type") { + Some(content_type) => match content_type.to_str() { + Ok("application/x-protobuf") | Ok("APPLICATION/X-PROTOBUF") => {} + _ => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Unsupported content-type, must be application/x-protobuf" + ))); + } + }, + None => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Missing content-type header" + ))); + } + } + + // Check that the remote write version is supported + match headers.get("x-prometheus-remote-write-version") { + Some(version) => match version.to_str() { + Ok("0.1.0") => {} + _ => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Unsupported x-prometheus-remote-write-version, must be 0.1.0" + ))); + } + }, + None => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "Missing x-prometheus-remote-write-version header" + ))); + } + } + + Ok(()) +} + +#[debug_handler] +pub async fn publish_prometheus( + State(state): State, + headers: HeaderMap, + bytes: Bytes, +) -> Result { + // println!("InfluxDB publish"); + // println!("bucket: {}", bucket); + // println!("org: {:?}", org); + // println!("org_id: {:?}", org_id); + // println!("precision: {:?}", precision); + // println!("bytes: {:?}", bytes); + + println!("Received {} bytes", bytes.len()); + + // Verify headers + verify_headers(&headers)?; + + // Parse the content + let write_request = parse_remote_write_request(&bytes)?; + + // Regularly, prometheus sends metadata on the undocumented reserved field, + // so we stop immediately when it happens. + if write_request.timeseries.is_empty() { + return Ok(StatusCode::NO_CONTENT); + } + + println!("Received {} timeseries", write_request.timeseries.len()); + + let mut batch_builder = BatchBuilder::new()?; + for time_serie in write_request.timeseries { + let mut labels = SensAppLabels::with_capacity(time_serie.labels.len()); + let mut name: Option = None; + let mut unit: Option = None; + for label in time_serie.labels { + match label.name.as_str() { + "__name__" => { + name = Some(label.value.clone()); + } + "unit" => { + unit = Some(Unit::new(label.value.clone(), None)); + } + _ => {} + } + labels.push((label.name, label.value)); + } + let name = match name { + Some(name) => name, + None => { + return Err(AppError::BadRequest(anyhow::anyhow!( + "A time serie is missing its __name__ label" + ))); + } + }; + + // Prometheus has a very simple model, it's always a float. + let sensor = Sensor::new_without_uuid(name, SensorType::Float, unit, Some(labels))?; + + // We can now add the samples + let samples = TypedSamples::Float( + time_serie + .samples + .into_iter() + .map(|sample| Sample { + datetime: SensAppDateTime::from_unix_milliseconds_i64(sample.timestamp), + value: sample.value, + }) + .collect(), + ); + + batch_builder.add(Arc::new(sensor), samples).await?; + // batch_builder.send_if_batch_full(event_bus.clone()).await?; + } + + match batch_builder.send_what_is_left(state.event_bus).await { + Ok(Some(mut receiver)) => { + receiver.wait().await?; + } + Ok(None) => {} + Err(error) => { + return Err(AppError::InternalServerError(anyhow::anyhow!(error))); + } + } + + // OK no content + Ok(StatusCode::NO_CONTENT) +} diff --git a/src/http/server.rs b/src/http/server.rs index 163d606..40e0570 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -1,5 +1,6 @@ use super::app_error::AppError; use super::influxdb::publish_influxdb; +use super::prometheus::publish_prometheus; use super::state::HttpServerState; use crate::config; use crate::importers::csv::publish_csv_async; @@ -16,7 +17,6 @@ use axum::Json; use axum::Router; use futures::TryStreamExt; use polars::prelude::*; -use sindit_senml::time; use std::io; use std::io::Cursor; use std::net::SocketAddr; @@ -70,11 +70,16 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res "/sensors/:sensor_name_or_uuid/publish_multipart", post(publish_multipart).layer(max_body_layer.clone()), ) - // InfluxDB compatibility + // InfluxDB Write API .route( "/api/v2/write", post(publish_influxdb).layer(max_body_layer.clone()), ) + // Prometheus Remote Write API + .route( + "/api/v1/prometheus_remote_write", + post(publish_prometheus).layer(max_body_layer.clone()), + ) .layer(middleware) .with_state(state); diff --git a/src/main.rs b/src/main.rs index a1252d2..3d70d69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ mod http; mod importers; mod infer; mod name_to_uuid; +mod parsing; mod storage; #[tokio::main] diff --git a/src/parsing/mod.rs b/src/parsing/mod.rs index e69de29..a243279 100644 --- a/src/parsing/mod.rs +++ b/src/parsing/mod.rs @@ -0,0 +1 @@ +pub mod prometheus; diff --git a/src/parsing/prometheus/mod.rs b/src/parsing/prometheus/mod.rs new file mode 100644 index 0000000..c2ec303 --- /dev/null +++ b/src/parsing/prometheus/mod.rs @@ -0,0 +1,2 @@ +pub mod remote_write_models; +pub mod remote_write_parser; diff --git a/src/parsing/prometheus/prometheus_remote_write.proto b/src/parsing/prometheus/prometheus_remote_write.proto new file mode 100644 index 0000000..26596c5 --- /dev/null +++ b/src/parsing/prometheus/prometheus_remote_write.proto @@ -0,0 +1,32 @@ +// https://prometheus.io/docs/concepts/remote_write_spec/ +// Version: 1.0 +// Status: Published +// Date: April 2023 + +syntax = "proto3"; + +message WriteRequest { + repeated TimeSeries timeseries = 1; + // Cortex uses this field to determine the source of the write request. + // We reserve it to avoid any compatibility issues. + reserved 2; + + // Prometheus uses this field to send metadata, but this is + // omitted from v1 of the spec as it is experimental. + reserved 3; +} + +message TimeSeries { + repeated Label labels = 1; + repeated Sample samples = 2; +} + +message Label { + string name = 1; + string value = 2; +} + +message Sample { + double value = 1; + int64 timestamp = 2; +} diff --git a/src/parsing/prometheus/remote_write_models.rs b/src/parsing/prometheus/remote_write_models.rs new file mode 100644 index 0000000..bf7ac3a --- /dev/null +++ b/src/parsing/prometheus/remote_write_models.rs @@ -0,0 +1,42 @@ +// This file is manually edited because setting up an automatic +// compilation of the protocol buffer to Rust code did sound cumbersome +// for such a simple structure. +// +// The code is inspired by prom-write. +// https://github.com/theduke/prom-write/blob/b434cb64c305044b78bb772115217026512b7b9b/lib/src/lib.rs +// Licensed under Apache 2.0 and MIT licenses. +// +// The code uses the crate PROST, which has nothing to do with Alain Prost. +// +// Check the prometheus_remote_write.proto file and https://prometheus.io/docs/concepts/remote_write_spec/ +// for more information. + +#[derive(prost::Message)] +pub struct WriteRequest { + #[prost(message, repeated, tag = "1")] + pub timeseries: Vec, +} + +#[derive(prost::Message)] +pub struct TimeSeries { + #[prost(message, repeated, tag = "1")] + pub labels: Vec