From 0da46fa30c0b7e9ada8218ee7c5d3d0a2a7c0368 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Wed, 22 May 2024 19:11:57 +0530 Subject: [PATCH] feat: added internal stream "meta" (#801) This PR adds internal stream "meta" to be used in distributed mode only. The query node creates this internal stream. It fetches cluster metrics from all ingestion nodes, adds the metrics data to an event and ingests the event to this internal stream. This data will be used by console in cluster page to show the stats per ingestor. --- server/src/event/writer.rs | 7 +- server/src/handlers/http/cluster/mod.rs | 156 +++++++++++----- server/src/handlers/http/ingest.rs | 48 ++++- server/src/handlers/http/logstream.rs | 6 +- .../src/handlers/http/modal/ingest_server.rs | 3 + .../src/handlers/http/modal/query_server.rs | 41 ++++- server/src/metrics/prom_utils.rs | 171 +++++++++++++++--- server/src/migration.rs | 5 +- server/src/validator.rs | 2 +- 9 files changed, 359 insertions(+), 80 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 5ccc91e34..0090a47f2 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -26,6 +26,7 @@ use std::{ }; use crate::{ + handlers::http::cluster::INTERNAL_STREAM_NAME, option::{Mode, CONFIG}, utils, }; @@ -132,7 +133,7 @@ impl WriterTable { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { stream_writer.lock().unwrap().push( stream_name, schema_key, @@ -161,7 +162,7 @@ impl WriterTable { ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { writer.lock().unwrap().push( stream_name, schema_key, @@ -174,7 +175,7 @@ impl WriterTable { } } None => { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { let mut writer = Writer::default(); writer.push( stream_name, diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 505d555b8..4afce9b64 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -21,7 +21,7 @@ pub mod utils; use crate::handlers::http::cluster::utils::{ check_liveness, to_url_string, IngestionStats, QueriedStats, }; -use crate::handlers::http::ingest::PostError; +use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::option::CONFIG; @@ -46,8 +46,13 @@ type IngestorMetadataArr = Vec; use self::utils::StorageStats; use super::base_path_without_preceding_slash; +use std::time::Duration; use super::modal::IngestorMetadata; +use clokwerk::{AsyncScheduler, Interval}; +pub const INTERNAL_STREAM_NAME: &str = "meta"; + +const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); pub async fn sync_cache_with_ingestors( url: &str, @@ -432,50 +437,11 @@ pub async fn get_cluster_info() -> Result { } pub async fn get_cluster_metrics() -> Result { - let ingestor_metadata = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - PostError::Invalid(err) + let dresses = fetch_cluster_metrics().await.map_err(|err| { + log::error!("Fatal: failed to fetch cluster metrics: {:?}", err); + PostError::Invalid(err.into()) })?; - let mut dresses = vec![]; - - for ingestor in ingestor_metadata { - let uri = Url::parse(&format!( - "{}{}/metrics", - &ingestor.domain_name, - base_path_without_preceding_slash() - )) - .map_err(|err| { - PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) - })?; - - let res = reqwest::Client::new() - .get(uri) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; - - if let Ok(res) = res { - let text = res.text().await.map_err(PostError::NetworkError)?; - let lines: Vec> = - text.lines().map(|line| Ok(line.to_owned())).collect_vec(); - - let sample = prometheus_parse::Scrape::parse(lines.into_iter()) - .map_err(|err| PostError::CustomError(err.to_string()))? - .samples; - - dresses.push(Metrics::from_prometheus_samples( - sample, - ingestor.domain_name, - )); - } else { - log::warn!( - "Failed to fetch metrics from ingestor: {}\n", - ingestor.domain_name, - ); - } - } - Ok(actix_web::HttpResponse::Ok().json(dresses)) } @@ -545,3 +511,107 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result Result, PostError> { + let ingestor_metadata = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + })?; + + let mut dresses = vec![]; + + for ingestor in ingestor_metadata { + let uri = Url::parse(&format!( + "{}{}/metrics", + &ingestor.domain_name, + base_path_without_preceding_slash() + )) + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) + })?; + + let res = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + if let Ok(res) = res { + let text = res.text().await.map_err(PostError::NetworkError)?; + let lines: Vec> = + text.lines().map(|line| Ok(line.to_owned())).collect_vec(); + + let sample = prometheus_parse::Scrape::parse(lines.into_iter()) + .map_err(|err| PostError::CustomError(err.to_string()))? + .samples; + let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor) + .await + .map_err(|err| { + log::error!("Fatal: failed to get ingestor metrics: {:?}", err); + PostError::Invalid(err.into()) + })?; + dresses.push(ingestor_metrics); + } else { + log::warn!( + "Failed to fetch metrics from ingestor: {}\n", + &ingestor.domain_name, + ); + } + } + Ok(dresses) +} + +pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { + log::info!("Setting up schedular for cluster metrics ingestion"); + + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(CLUSTER_METRICS_INTERVAL_SECONDS) + .run(move || async { + let result: Result<(), PostError> = async { + let cluster_metrics = fetch_cluster_metrics().await; + if let Ok(metrics) = cluster_metrics { + if !metrics.is_empty() { + log::info!("Cluster metrics fetched successfully from all ingestors"); + if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { + let stream_name = INTERNAL_STREAM_NAME; + + if matches!( + ingest_internal_stream( + stream_name.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { + log::info!( + "Cluster metrics successfully ingested into internal stream" + ); + } else { + log::error!( + "Failed to ingest cluster metrics into internal stream" + ); + } + } else { + log::error!("Failed to serialize cluster metrics"); + } + } + } + Ok(()) + } + .await; + + if let Err(err) = result { + log::error!("Error in cluster metrics scheduler: {:?}", err); + } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + + Ok(()) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index f03b45652..a5157f34c 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,6 +16,7 @@ * */ +use super::cluster::INTERNAL_STREAM_NAME; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; use crate::event::{ @@ -52,6 +53,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { + create_stream_if_not_exists(&stream_name).await?; + let size: usize = body.len(); + let parsed_timestamp = Utc::now().naive_utc(); + let (rb, is_first) = { + let body_val: Value = serde_json::from_slice(&body)?; + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .schema + .clone(); + let event = format::json::Event { + data: body_val, + tags: String::default(), + metadata: String::default(), + }; + event.into_recordbatch(schema, None, None)? + }; + event::Event { + rb, + stream_name, + origin_format: "json", + origin_size: size as u64, + is_first_event: is_first, + parsed_timestamp, + time_partition: None, + custom_partition_values: HashMap::new(), + } + .process() + .await?; + Ok(()) +} + async fn flatten_and_push_logs( req: HttpRequest, body: Bytes, @@ -93,7 +134,12 @@ async fn flatten_and_push_logs( // fails if the logstream does not exist pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - + if stream_name.eq(INTERNAL_STREAM_NAME) { + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} is an internal stream and cannot be ingested into", + stream_name + ))); + } flatten_and_push_logs(req, body, stream_name).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index da4f00ad3..15e343747 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -18,8 +18,8 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; -use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; +use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, @@ -591,7 +591,9 @@ pub async fn create_stream( schema: Arc, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - validator::stream_name(&stream_name)?; + if stream_name.ne(INTERNAL_STREAM_NAME) { + validator::stream_name(&stream_name)?; + } // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 6630da786..26ed25d8f 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; +use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; use crate::rbac; use crate::rbac::role::Action; @@ -328,6 +329,8 @@ impl IngestServer { let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); + migration::run_migration(&CONFIG).await?; + let storage = CONFIG.storage().get_object_store(); if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { log::warn!("could not populate local metadata. {:?}", err); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 3558ec3df..0abfc7ff0 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,11 +17,12 @@ */ use crate::handlers::airplane; -use crate::handlers::http::cluster; +use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::rbac::role::Action; +use crate::sync; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::web; use actix_web::web::ServiceConfig; @@ -185,11 +186,39 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } - tokio::spawn(airplane::server()); - - self.start(prometheus, CONFIG.parseable.openid.clone()) - .await?; + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync(); - Ok(()) + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + localsync_handler.join().unwrap_or(()); + remote_sync_handler.join().unwrap_or(()); + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + remote_sync_handler.join().unwrap_or(()); + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + } + + }; + } } } diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 21e27c03f..cd01fbd76 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,17 +1,36 @@ +use crate::handlers::http::base_path_without_preceding_slash; +use crate::handlers::http::ingest::PostError; +use crate::handlers::http::modal::IngestorMetadata; use crate::utils::get_url; +use actix_web::http::header; +use chrono::NaiveDateTime; +use chrono::Utc; use prometheus_parse::Sample as PromSample; use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; +use url::Url; #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, parseable_events_ingested: f64, // all streams + parseable_events_ingested_size: f64, + parseable_lifetime_events_ingested: f64, // all streams + parseable_lifetime_events_ingested_size: f64, + parseable_deleted_events_ingested: f64, // all streams + parseable_deleted_events_ingested_size: f64, parseable_staging_files: f64, process_resident_memory_bytes: f64, parseable_storage_size: StorageMetrics, + parseable_lifetime_storage_size: StorageMetrics, + parseable_deleted_storage_size: StorageMetrics, + event_type: String, + event_time: NaiveDateTime, + commit: String, + staging: String, + cache: String, } #[derive(Debug, Serialize, Default, Clone)] @@ -32,9 +51,21 @@ impl Default for Metrics { Metrics { address, parseable_events_ingested: 0.0, + parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, process_resident_memory_bytes: 0.0, parseable_storage_size: StorageMetrics::default(), + parseable_lifetime_events_ingested: 0.0, + parseable_lifetime_events_ingested_size: 0.0, + parseable_deleted_events_ingested: 0.0, + parseable_deleted_events_ingested_size: 0.0, + parseable_deleted_storage_size: StorageMetrics::default(), + parseable_lifetime_storage_size: StorageMetrics::default(), + event_type: "cluster-metrics".to_string(), + event_time: Utc::now().naive_utc(), + commit: "".to_string(), + staging: "".to_string(), + cache: "".to_string(), } } } @@ -44,44 +75,138 @@ impl Metrics { Metrics { address, parseable_events_ingested: 0.0, + parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, process_resident_memory_bytes: 0.0, parseable_storage_size: StorageMetrics::default(), + parseable_lifetime_events_ingested: 0.0, + parseable_lifetime_events_ingested_size: 0.0, + parseable_deleted_events_ingested: 0.0, + parseable_deleted_events_ingested_size: 0.0, + parseable_deleted_storage_size: StorageMetrics::default(), + parseable_lifetime_storage_size: StorageMetrics::default(), + event_type: "cluster-metrics".to_string(), + event_time: Utc::now().naive_utc(), + commit: "".to_string(), + staging: "".to_string(), + cache: "".to_string(), } } } impl Metrics { - pub fn from_prometheus_samples(samples: Vec, address: String) -> Self { - let mut prom_dress = Metrics::new(address); - + pub async fn from_prometheus_samples( + samples: Vec, + ingestor_metadata: &IngestorMetadata, + ) -> Result { + let mut prom_dress = Metrics::new(ingestor_metadata.domain_name.to_string()); for sample in samples { - if &sample.metric == "parseable_events_ingested" { - if let PromValue::Counter(val) = sample.value { - prom_dress.parseable_events_ingested += val; - } - } else if sample.metric == "parseable_staging_files" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_staging_files += val; - } - } else if sample.metric == "process_resident_memory_bytes" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.process_resident_memory_bytes += val; - } - } else if sample.metric == "parseable_storage_size" { - if sample.labels.get("type").expect("type is present") == "data" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.data += val; + if let PromValue::Gauge(val) = sample.value { + match sample.metric.as_str() { + "parseable_events_ingested" => prom_dress.parseable_events_ingested += val, + "parseable_events_ingested_size" => { + prom_dress.parseable_events_ingested_size += val + } + "parseable_lifetime_events_ingested" => { + prom_dress.parseable_lifetime_events_ingested += val + } + "parseable_lifetime_events_ingested_size" => { + prom_dress.parseable_lifetime_events_ingested_size += val + } + "parseable_deleted_events_ingested" => { + prom_dress.parseable_deleted_events_ingested += val + } + "parseable_deleted_events_ingested_size" => { + prom_dress.parseable_deleted_events_ingested_size += val } - } else if sample.labels.get("type").expect("type is present") == "staging" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.staging += val; + "parseable_staging_files" => prom_dress.parseable_staging_files += val, + "process_resident_memory_bytes" => { + prom_dress.process_resident_memory_bytes += val } + "parseable_storage_size" => { + if sample.labels.get("type").expect("type is present") == "staging" { + prom_dress.parseable_storage_size.staging += val; + } + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_storage_size.data += val; + } + } + "parseable_lifetime_events_storage_size" => { + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_lifetime_storage_size.data += val; + } + } + "parseable_deleted_events_storage_size" => { + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_deleted_storage_size.data += val; + } + } + _ => {} } } } + let (commit_id, staging, cache) = Self::from_about_api_response(ingestor_metadata.clone()) + .await + .map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; + + prom_dress.commit = commit_id; + prom_dress.staging = staging; + prom_dress.cache = cache; - prom_dress + Ok(prom_dress) + } + + pub async fn from_about_api_response( + ingestor_metadata: IngestorMetadata, + ) -> Result<(String, String, String), PostError> { + let uri = Url::parse(&format!( + "{}{}/about", + &ingestor_metadata.domain_name, + base_path_without_preceding_slash() + )) + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) + })?; + let res = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingestor_metadata.token) + .send() + .await; + if let Ok(res) = res { + let about_api_json = res.text().await.map_err(PostError::NetworkError)?; + let about_api_json: serde_json::Value = + serde_json::from_str(&about_api_json).map_err(PostError::SerdeError)?; + let commit_id = about_api_json + .get("commit") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let staging = about_api_json + .get("staging") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let cache = about_api_json + .get("cache") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + Ok(( + commit_id.to_string(), + staging.to_string(), + cache.to_string(), + )) + } else { + log::warn!( + "Failed to fetch about API response from ingestor: {}\n", + &ingestor_metadata.domain_name, + ); + Err(PostError::Invalid(anyhow::anyhow!( + "Failed to fetch about API response from ingestor: {}\n", + &ingestor_metadata.domain_name + ))) + } } #[allow(unused)] diff --git a/server/src/migration.rs b/server/src/migration.rs index ae053b9db..2c543516e 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -106,7 +106,10 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let path = stream_json_path(stream); - let stream_metadata = storage.get_object(&path).await?; + let stream_metadata = storage.get_object(&path).await.unwrap_or_default(); + if stream_metadata.is_empty() { + return Ok(()); + } let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); diff --git a/server/src/validator.rs b/server/src/validator.rs index 6281354d8..9a6d15b49 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -24,7 +24,7 @@ use self::error::{AlertValidationError, StreamNameValidationError, UsernameValid // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ - "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", + "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", "meta", ]; pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> {