Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for daily stats for querier #811

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
Expand Down Expand Up @@ -156,6 +157,67 @@ pub async fn sync_streams_with_ingestors(
Ok(())
}

pub async fn fetch_daily_stats_from_ingestors(
stream_name: &str,
date: &str,
) -> Result<Stats, StreamError> {
let mut total_events_ingested: u64 = 0;
let mut total_ingestion_size: u64 = 0;
let mut total_storage_size: u64 = 0;

let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
})?;
for ingestor in ingestor_infos.iter() {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res
.text()
.await
.map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| {
StreamError::Anyhow(anyhow::anyhow!(
"Invalid URL in Ingestor Metadata: {}",
err
))
})?
.samples;

let (events_ingested, ingestion_size, storage_size) =
Metrics::get_daily_stats_from_samples(sample, stream_name, date);
total_events_ingested += events_ingested;
total_ingestion_size += ingestion_size;
total_storage_size += storage_size;
}
}

let stats = Stats {
events: total_events_ingested,
ingestion: total_ingestion_size,
storage: total_storage_size,
};
Ok(stats)
}

/// get the cumulative stats from all ingestors
pub async fn fetch_stats_from_ingestors(
stream_name: &str,
Expand Down
26 changes: 21 additions & 5 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use super::cluster::{
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, INTERNAL_STREAM_NAME,
};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
Expand Down Expand Up @@ -527,10 +529,24 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
}

if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;

return Ok((web::Json(stats), StatusCode::OK));
if CONFIG.parseable.mode == Mode::Query {
let querier_stats = get_stats_date(&stream_name, date_value).await?;
let ingestor_stats =
fetch_daily_stats_from_ingestors(&stream_name, date_value).await?;
let total_stats = Stats {
events: querier_stats.events + ingestor_stats.events,
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
storage: querier_stats.storage + ingestor_stats.storage,
};
let stats = serde_json::to_value(total_stats)?;

return Ok((web::Json(stats), StatusCode::OK));
} else {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;

return Ok((web::Json(stats), StatusCode::OK));
}
}
}

Expand Down
41 changes: 41 additions & 0 deletions server/src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@ impl Metrics {
}

impl Metrics {
pub fn get_daily_stats_from_samples(
samples: Vec<PromSample>,
stream_name: &str,
date: &str,
) -> (u64, u64, u64) {
let mut events_ingested: u64 = 0;
let mut ingestion_size: u64 = 0;
let mut storage_size: u64 = 0;
for sample in samples {
if let PromValue::Gauge(val) = sample.value {
match sample.metric.as_str() {
"parseable_events_ingested_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
events_ingested = val as u64;
}
}
"parseable_events_ingested_size_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
ingestion_size = val as u64;
}
}
"parseable_events_storage_size_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
storage_size = val as u64;
}
}
_ => {}
}
}
}
(events_ingested, ingestion_size, storage_size)
}
pub async fn from_prometheus_samples(
samples: Vec<PromSample>,
ingestor_metadata: &IngestorMetadata,
Expand Down
Loading