Skip to content

Commit

Permalink
feat: 🌈 Prometheus remote write support
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Mar 29, 2024
1 parent 1a752dd commit 5c043c0
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 18 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ config = "0.14"
serde = "1.0"
confique = "0.2"
byte-unit = "5.1"
prost = "0.12"
snap = "1.1"
hex = "0.4"
blake3 = "1.5"
regex = "1.10"
Expand Down
23 changes: 23 additions & 0 deletions src/datamodel/sensapp_datetime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
pub type SensAppDateTime = hifitime::Epoch;
use anyhow::Result;

pub trait SensAppDateTimeExt {
fn from_unix_nanoseconds_i64(timestamp: i64) -> Self;
fn from_unix_microseconds_i64(timestamp: i64) -> Self;
fn from_unix_milliseconds_i64(timestamp: i64) -> Self;
fn from_unix_seconds_i64(timestamp: i64) -> Self;
}

impl SensAppDateTimeExt for SensAppDateTime {
fn from_unix_nanoseconds_i64(timestamp: i64) -> Self {
Self::from_unix_duration(hifitime::Duration::from_truncated_nanoseconds(timestamp))
}
fn from_unix_microseconds_i64(timestamp: i64) -> Self {
Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Microsecond)
}
fn from_unix_milliseconds_i64(timestamp: i64) -> Self {
Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Millisecond)
}
fn from_unix_seconds_i64(timestamp: i64) -> Self {
Self::from_utc_duration(UNIX_REF_EPOCH.to_utc_duration() + timestamp * Unit::Second)
}
}

use hifitime::{Unit, UNIX_REF_EPOCH};
use sqlx::types::time::OffsetDateTime;
pub fn sensapp_datetime_to_offset_datetime(datetime: &SensAppDateTime) -> Result<OffsetDateTime> {
let unix_timestamp = datetime.to_unix_seconds().floor() as i128;
Expand Down
33 changes: 17 additions & 16 deletions src/http/influxdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{app_error::AppError, state::HttpServerState};
use crate::datamodel::{
batch_builder::BatchBuilder, SensAppDateTime, Sensor, SensorType, TypedSamples,
batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt, SensAppDateTime, Sensor,
SensorType, TypedSamples,
};
use anyhow::Result;
use axum::{
Expand Down Expand Up @@ -122,13 +123,13 @@ pub async fn publish_influxdb(
}): Query<InfluxDBQueryParams>,
bytes: Bytes,
) -> Result<StatusCode, AppError> {
println!("InfluxDB publish");
println!("bucket: {}", bucket);
println!("org: {:?}", org);
println!("org_id: {:?}", org_id);
println!("precision: {:?}", precision);
//println!("bytes: {:?}", bytes);
println!("headers: {:?}", headers);
// println!("InfluxDB publish");
// println!("bucket: {}", bucket);
// println!("org: {:?}", org);
// println!("org_id: {:?}", org_id);
// println!("precision: {:?}", precision);
// println!("bytes: {:?}", bytes);
// println!("headers: {:?}", headers);

// Requires org or org_id
if org.is_none() && org_id.is_none() {
Expand Down Expand Up @@ -183,16 +184,16 @@ pub async fn publish_influxdb(

let datetime = match line.timestamp {
Some(timestamp) => match precision_enum {
Precision::Nanoseconds => SensAppDateTime::from_unix_duration(
hifitime::Duration::from_truncated_nanoseconds(timestamp),
),
Precision::Microseconds => SensAppDateTime::from_unix_duration(
hifitime::Duration::from_microseconds(timestamp as f64),
),
Precision::Nanoseconds => {
SensAppDateTime::from_unix_nanoseconds_i64(timestamp)
}
Precision::Microseconds => {
SensAppDateTime::from_unix_microseconds_i64(timestamp)
}
Precision::Milliseconds => {
SensAppDateTime::from_unix_milliseconds(timestamp as f64)
SensAppDateTime::from_unix_milliseconds_i64(timestamp)
}
Precision::Seconds => SensAppDateTime::from_unix_seconds(timestamp as f64),
Precision::Seconds => SensAppDateTime::from_unix_seconds_i64(timestamp),
},
None => match SensAppDateTime::now() {
Ok(datetime) => datetime,
Expand Down
1 change: 1 addition & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod app_error;
pub mod influxdb;
pub mod prometheus;
pub mod server;
pub mod state;
162 changes: 162 additions & 0 deletions src/http/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::sync::Arc;

use crate::{
datamodel::{
batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt,
sensapp_vec::SensAppLabels, unit::Unit, Sample, SensAppDateTime, Sensor, SensorType,
TypedSamples,
},
parsing::prometheus::remote_write_parser::parse_remote_write_request,
};

use super::{app_error::AppError, state::HttpServerState};
use anyhow::Result;
use axum::{
debug_handler,
extract::State,
http::{HeaderMap, StatusCode},
};
use tokio_util::bytes::Bytes;

fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
// Check that we have the right content encoding, that must be snappy
match headers.get("content-encoding") {
Some(content_encoding) => match content_encoding.to_str() {
Ok("snappy") | Ok("SNAPPY") => {}
_ => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported content-encoding, must be snappy"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Missing content-encoding header"
)));
}
}

// Check that the content type is protocol buffer
match headers.get("content-type") {
Some(content_type) => match content_type.to_str() {
Ok("application/x-protobuf") | Ok("APPLICATION/X-PROTOBUF") => {}
_ => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported content-type, must be application/x-protobuf"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Missing content-type header"
)));
}
}

// Check that the remote write version is supported
match headers.get("x-prometheus-remote-write-version") {
Some(version) => match version.to_str() {
Ok("0.1.0") => {}
_ => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported x-prometheus-remote-write-version, must be 0.1.0"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Missing x-prometheus-remote-write-version header"
)));
}
}

Ok(())
}

#[debug_handler]
pub async fn publish_prometheus(
State(state): State<HttpServerState>,
headers: HeaderMap,
bytes: Bytes,
) -> Result<StatusCode, AppError> {
// println!("InfluxDB publish");
// println!("bucket: {}", bucket);
// println!("org: {:?}", org);
// println!("org_id: {:?}", org_id);
// println!("precision: {:?}", precision);
// println!("bytes: {:?}", bytes);

println!("Received {} bytes", bytes.len());

// Verify headers
verify_headers(&headers)?;

// Parse the content
let write_request = parse_remote_write_request(&bytes)?;

// Regularly, prometheus sends metadata on the undocumented reserved field,
// so we stop immediately when it happens.
if write_request.timeseries.is_empty() {
return Ok(StatusCode::NO_CONTENT);
}

println!("Received {} timeseries", write_request.timeseries.len());

let mut batch_builder = BatchBuilder::new()?;
for time_serie in write_request.timeseries {
let mut labels = SensAppLabels::with_capacity(time_serie.labels.len());
let mut name: Option<String> = None;
let mut unit: Option<Unit> = None;
for label in time_serie.labels {
match label.name.as_str() {
"__name__" => {
name = Some(label.value.clone());
}
"unit" => {
unit = Some(Unit::new(label.value.clone(), None));
}
_ => {}
}
labels.push((label.name, label.value));
}
let name = match name {
Some(name) => name,
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"A time serie is missing its __name__ label"
)));
}
};

// Prometheus has a very simple model, it's always a float.
let sensor = Sensor::new_without_uuid(name, SensorType::Float, unit, Some(labels))?;

// We can now add the samples
let samples = TypedSamples::Float(
time_serie
.samples
.into_iter()
.map(|sample| Sample {
datetime: SensAppDateTime::from_unix_milliseconds_i64(sample.timestamp),
value: sample.value,
})
.collect(),
);

batch_builder.add(Arc::new(sensor), samples).await?;
// batch_builder.send_if_batch_full(event_bus.clone()).await?;
}

match batch_builder.send_what_is_left(state.event_bus).await {
Ok(Some(mut receiver)) => {
receiver.wait().await?;
}
Ok(None) => {}
Err(error) => {
return Err(AppError::InternalServerError(anyhow::anyhow!(error)));
}
}

// OK no content
Ok(StatusCode::NO_CONTENT)
}
9 changes: 7 additions & 2 deletions src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::app_error::AppError;
use super::influxdb::publish_influxdb;
use super::prometheus::publish_prometheus;
use super::state::HttpServerState;
use crate::config;
use crate::importers::csv::publish_csv_async;
Expand All @@ -16,7 +17,6 @@ use axum::Json;
use axum::Router;
use futures::TryStreamExt;
use polars::prelude::*;
use sindit_senml::time;
use std::io;
use std::io::Cursor;
use std::net::SocketAddr;
Expand Down Expand Up @@ -70,11 +70,16 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res
"/sensors/:sensor_name_or_uuid/publish_multipart",
post(publish_multipart).layer(max_body_layer.clone()),
)
// InfluxDB compatibility
// InfluxDB Write API
.route(
"/api/v2/write",
post(publish_influxdb).layer(max_body_layer.clone()),
)
// Prometheus Remote Write API
.route(
"/api/v1/prometheus_remote_write",
post(publish_prometheus).layer(max_body_layer.clone()),
)
.layer(middleware)
.with_state(state);

Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod http;
mod importers;
mod infer;
mod name_to_uuid;
mod parsing;
mod storage;

#[tokio::main]
Expand Down
1 change: 1 addition & 0 deletions src/parsing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod prometheus;
2 changes: 2 additions & 0 deletions src/parsing/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod remote_write_models;
pub mod remote_write_parser;
32 changes: 32 additions & 0 deletions src/parsing/prometheus/prometheus_remote_write.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// https://prometheus.io/docs/concepts/remote_write_spec/
// Version: 1.0
// Status: Published
// Date: April 2023

syntax = "proto3";

message WriteRequest {
repeated TimeSeries timeseries = 1;
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;

// Prometheus uses this field to send metadata, but this is
// omitted from v1 of the spec as it is experimental.
reserved 3;
}

message TimeSeries {
repeated Label labels = 1;
repeated Sample samples = 2;
}

message Label {
string name = 1;
string value = 2;
}

message Sample {
double value = 1;
int64 timestamp = 2;
}
Loading

0 comments on commit 5c043c0

Please sign in to comment.