diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 5ccc91e34..0090a47f2 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -26,6 +26,7 @@ use std::{ }; use crate::{ + handlers::http::cluster::INTERNAL_STREAM_NAME, option::{Mode, CONFIG}, utils, }; @@ -132,7 +133,7 @@ impl WriterTable { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { stream_writer.lock().unwrap().push( stream_name, schema_key, @@ -161,7 +162,7 @@ impl WriterTable { ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { writer.lock().unwrap().push( stream_name, schema_key, @@ -174,7 +175,7 @@ impl WriterTable { } } None => { - if CONFIG.parseable.mode != Mode::Query { + if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME { let mut writer = Writer::default(); writer.push( stream_name, diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 505d555b8..4afce9b64 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -21,7 +21,7 @@ pub mod utils; use crate::handlers::http::cluster::utils::{ check_liveness, to_url_string, IngestionStats, QueriedStats, }; -use crate::handlers::http::ingest::PostError; +use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; use crate::option::CONFIG; @@ -46,8 +46,13 @@ type IngestorMetadataArr = Vec; use self::utils::StorageStats; use super::base_path_without_preceding_slash; +use std::time::Duration; use super::modal::IngestorMetadata; +use clokwerk::{AsyncScheduler, Interval}; +pub const INTERNAL_STREAM_NAME: &str = "meta"; + +const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); pub async fn sync_cache_with_ingestors( url: &str, @@ -432,50 +437,11 @@ pub async fn get_cluster_info() -> Result { } pub async fn get_cluster_metrics() -> Result { - let ingestor_metadata = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - PostError::Invalid(err) + let dresses = fetch_cluster_metrics().await.map_err(|err| { + log::error!("Fatal: failed to fetch cluster metrics: {:?}", err); + PostError::Invalid(err.into()) })?; - let mut dresses = vec![]; - - for ingestor in ingestor_metadata { - let uri = Url::parse(&format!( - "{}{}/metrics", - &ingestor.domain_name, - base_path_without_preceding_slash() - )) - .map_err(|err| { - PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) - })?; - - let res = reqwest::Client::new() - .get(uri) - .header(header::CONTENT_TYPE, "application/json") - .send() - .await; - - if let Ok(res) = res { - let text = res.text().await.map_err(PostError::NetworkError)?; - let lines: Vec> = - text.lines().map(|line| Ok(line.to_owned())).collect_vec(); - - let sample = prometheus_parse::Scrape::parse(lines.into_iter()) - .map_err(|err| PostError::CustomError(err.to_string()))? - .samples; - - dresses.push(Metrics::from_prometheus_samples( - sample, - ingestor.domain_name, - )); - } else { - log::warn!( - "Failed to fetch metrics from ingestor: {}\n", - ingestor.domain_name, - ); - } - } - Ok(actix_web::HttpResponse::Ok().json(dresses)) } @@ -545,3 +511,107 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result Result, PostError> { + let ingestor_metadata = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + })?; + + let mut dresses = vec![]; + + for ingestor in ingestor_metadata { + let uri = Url::parse(&format!( + "{}{}/metrics", + &ingestor.domain_name, + base_path_without_preceding_slash() + )) + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) + })?; + + let res = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await; + + if let Ok(res) = res { + let text = res.text().await.map_err(PostError::NetworkError)?; + let lines: Vec> = + text.lines().map(|line| Ok(line.to_owned())).collect_vec(); + + let sample = prometheus_parse::Scrape::parse(lines.into_iter()) + .map_err(|err| PostError::CustomError(err.to_string()))? + .samples; + let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor) + .await + .map_err(|err| { + log::error!("Fatal: failed to get ingestor metrics: {:?}", err); + PostError::Invalid(err.into()) + })?; + dresses.push(ingestor_metrics); + } else { + log::warn!( + "Failed to fetch metrics from ingestor: {}\n", + &ingestor.domain_name, + ); + } + } + Ok(dresses) +} + +pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { + log::info!("Setting up schedular for cluster metrics ingestion"); + + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(CLUSTER_METRICS_INTERVAL_SECONDS) + .run(move || async { + let result: Result<(), PostError> = async { + let cluster_metrics = fetch_cluster_metrics().await; + if let Ok(metrics) = cluster_metrics { + if !metrics.is_empty() { + log::info!("Cluster metrics fetched successfully from all ingestors"); + if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { + let stream_name = INTERNAL_STREAM_NAME; + + if matches!( + ingest_internal_stream( + stream_name.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { + log::info!( + "Cluster metrics successfully ingested into internal stream" + ); + } else { + log::error!( + "Failed to ingest cluster metrics into internal stream" + ); + } + } else { + log::error!("Failed to serialize cluster metrics"); + } + } + } + Ok(()) + } + .await; + + if let Err(err) = result { + log::error!("Error in cluster metrics scheduler: {:?}", err); + } + }); + + tokio::spawn(async move { + loop { + scheduler.run_pending().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + }); + + Ok(()) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index f03b45652..a5157f34c 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,6 +16,7 @@ * */ +use super::cluster::INTERNAL_STREAM_NAME; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; use crate::event::{ @@ -52,6 +53,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result<(), PostError> { + create_stream_if_not_exists(&stream_name).await?; + let size: usize = body.len(); + let parsed_timestamp = Utc::now().naive_utc(); + let (rb, is_first) = { + let body_val: Value = serde_json::from_slice(&body)?; + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .schema + .clone(); + let event = format::json::Event { + data: body_val, + tags: String::default(), + metadata: String::default(), + }; + event.into_recordbatch(schema, None, None)? + }; + event::Event { + rb, + stream_name, + origin_format: "json", + origin_size: size as u64, + is_first_event: is_first, + parsed_timestamp, + time_partition: None, + custom_partition_values: HashMap::new(), + } + .process() + .await?; + Ok(()) +} + async fn flatten_and_push_logs( req: HttpRequest, body: Bytes, @@ -93,7 +134,12 @@ async fn flatten_and_push_logs( // fails if the logstream does not exist pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - + if stream_name.eq(INTERNAL_STREAM_NAME) { + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} is an internal stream and cannot be ingested into", + stream_name + ))); + } flatten_and_push_logs(req, body, stream_name).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index da4f00ad3..15e343747 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -18,8 +18,8 @@ use self::error::{CreateStreamError, StreamError}; use super::base_path_without_preceding_slash; -use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; +use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, @@ -591,7 +591,9 @@ pub async fn create_stream( schema: Arc, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - validator::stream_name(&stream_name)?; + if stream_name.ne(INTERNAL_STREAM_NAME) { + validator::stream_name(&stream_name)?; + } // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 6630da786..26ed25d8f 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; +use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; use crate::rbac; use crate::rbac::role::Action; @@ -328,6 +329,8 @@ impl IngestServer { let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); + migration::run_migration(&CONFIG).await?; + let storage = CONFIG.storage().get_object_store(); if let Err(err) = metadata::STREAM_INFO.load(&*storage).await { log::warn!("could not populate local metadata. {:?}", err); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 3558ec3df..0abfc7ff0 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,11 +17,12 @@ */ use crate::handlers::airplane; -use crate::handlers::http::cluster; +use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::rbac::role::Action; +use crate::sync; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::web; use actix_web::web::ServiceConfig; @@ -185,11 +186,39 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } - tokio::spawn(airplane::server()); - - self.start(prometheus, CONFIG.parseable.openid.clone()) - .await?; + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync(); - Ok(()) + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + localsync_handler.join().unwrap_or(()); + remote_sync_handler.join().unwrap_or(()); + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + remote_sync_handler.join().unwrap_or(()); + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + } + + }; + } } } diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 21e27c03f..cd01fbd76 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,17 +1,36 @@ +use crate::handlers::http::base_path_without_preceding_slash; +use crate::handlers::http::ingest::PostError; +use crate::handlers::http::modal::IngestorMetadata; use crate::utils::get_url; +use actix_web::http::header; +use chrono::NaiveDateTime; +use chrono::Utc; use prometheus_parse::Sample as PromSample; use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; +use url::Url; #[derive(Debug, Serialize, Clone)] pub struct Metrics { address: String, parseable_events_ingested: f64, // all streams + parseable_events_ingested_size: f64, + parseable_lifetime_events_ingested: f64, // all streams + parseable_lifetime_events_ingested_size: f64, + parseable_deleted_events_ingested: f64, // all streams + parseable_deleted_events_ingested_size: f64, parseable_staging_files: f64, process_resident_memory_bytes: f64, parseable_storage_size: StorageMetrics, + parseable_lifetime_storage_size: StorageMetrics, + parseable_deleted_storage_size: StorageMetrics, + event_type: String, + event_time: NaiveDateTime, + commit: String, + staging: String, + cache: String, } #[derive(Debug, Serialize, Default, Clone)] @@ -32,9 +51,21 @@ impl Default for Metrics { Metrics { address, parseable_events_ingested: 0.0, + parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, process_resident_memory_bytes: 0.0, parseable_storage_size: StorageMetrics::default(), + parseable_lifetime_events_ingested: 0.0, + parseable_lifetime_events_ingested_size: 0.0, + parseable_deleted_events_ingested: 0.0, + parseable_deleted_events_ingested_size: 0.0, + parseable_deleted_storage_size: StorageMetrics::default(), + parseable_lifetime_storage_size: StorageMetrics::default(), + event_type: "cluster-metrics".to_string(), + event_time: Utc::now().naive_utc(), + commit: "".to_string(), + staging: "".to_string(), + cache: "".to_string(), } } } @@ -44,44 +75,138 @@ impl Metrics { Metrics { address, parseable_events_ingested: 0.0, + parseable_events_ingested_size: 0.0, parseable_staging_files: 0.0, process_resident_memory_bytes: 0.0, parseable_storage_size: StorageMetrics::default(), + parseable_lifetime_events_ingested: 0.0, + parseable_lifetime_events_ingested_size: 0.0, + parseable_deleted_events_ingested: 0.0, + parseable_deleted_events_ingested_size: 0.0, + parseable_deleted_storage_size: StorageMetrics::default(), + parseable_lifetime_storage_size: StorageMetrics::default(), + event_type: "cluster-metrics".to_string(), + event_time: Utc::now().naive_utc(), + commit: "".to_string(), + staging: "".to_string(), + cache: "".to_string(), } } } impl Metrics { - pub fn from_prometheus_samples(samples: Vec, address: String) -> Self { - let mut prom_dress = Metrics::new(address); - + pub async fn from_prometheus_samples( + samples: Vec, + ingestor_metadata: &IngestorMetadata, + ) -> Result { + let mut prom_dress = Metrics::new(ingestor_metadata.domain_name.to_string()); for sample in samples { - if &sample.metric == "parseable_events_ingested" { - if let PromValue::Counter(val) = sample.value { - prom_dress.parseable_events_ingested += val; - } - } else if sample.metric == "parseable_staging_files" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_staging_files += val; - } - } else if sample.metric == "process_resident_memory_bytes" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.process_resident_memory_bytes += val; - } - } else if sample.metric == "parseable_storage_size" { - if sample.labels.get("type").expect("type is present") == "data" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.data += val; + if let PromValue::Gauge(val) = sample.value { + match sample.metric.as_str() { + "parseable_events_ingested" => prom_dress.parseable_events_ingested += val, + "parseable_events_ingested_size" => { + prom_dress.parseable_events_ingested_size += val + } + "parseable_lifetime_events_ingested" => { + prom_dress.parseable_lifetime_events_ingested += val + } + "parseable_lifetime_events_ingested_size" => { + prom_dress.parseable_lifetime_events_ingested_size += val + } + "parseable_deleted_events_ingested" => { + prom_dress.parseable_deleted_events_ingested += val + } + "parseable_deleted_events_ingested_size" => { + prom_dress.parseable_deleted_events_ingested_size += val } - } else if sample.labels.get("type").expect("type is present") == "staging" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.staging += val; + "parseable_staging_files" => prom_dress.parseable_staging_files += val, + "process_resident_memory_bytes" => { + prom_dress.process_resident_memory_bytes += val } + "parseable_storage_size" => { + if sample.labels.get("type").expect("type is present") == "staging" { + prom_dress.parseable_storage_size.staging += val; + } + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_storage_size.data += val; + } + } + "parseable_lifetime_events_storage_size" => { + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_lifetime_storage_size.data += val; + } + } + "parseable_deleted_events_storage_size" => { + if sample.labels.get("type").expect("type is present") == "data" { + prom_dress.parseable_deleted_storage_size.data += val; + } + } + _ => {} } } } + let (commit_id, staging, cache) = Self::from_about_api_response(ingestor_metadata.clone()) + .await + .map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err.into()) + })?; + + prom_dress.commit = commit_id; + prom_dress.staging = staging; + prom_dress.cache = cache; - prom_dress + Ok(prom_dress) + } + + pub async fn from_about_api_response( + ingestor_metadata: IngestorMetadata, + ) -> Result<(String, String, String), PostError> { + let uri = Url::parse(&format!( + "{}{}/about", + &ingestor_metadata.domain_name, + base_path_without_preceding_slash() + )) + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) + })?; + let res = reqwest::Client::new() + .get(uri) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, ingestor_metadata.token) + .send() + .await; + if let Ok(res) = res { + let about_api_json = res.text().await.map_err(PostError::NetworkError)?; + let about_api_json: serde_json::Value = + serde_json::from_str(&about_api_json).map_err(PostError::SerdeError)?; + let commit_id = about_api_json + .get("commit") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let staging = about_api_json + .get("staging") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let cache = about_api_json + .get("cache") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + Ok(( + commit_id.to_string(), + staging.to_string(), + cache.to_string(), + )) + } else { + log::warn!( + "Failed to fetch about API response from ingestor: {}\n", + &ingestor_metadata.domain_name, + ); + Err(PostError::Invalid(anyhow::anyhow!( + "Failed to fetch about API response from ingestor: {}\n", + &ingestor_metadata.domain_name + ))) + } } #[allow(unused)] diff --git a/server/src/migration.rs b/server/src/migration.rs index ae053b9db..2c543516e 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -106,7 +106,10 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let path = stream_json_path(stream); - let stream_metadata = storage.get_object(&path).await?; + let stream_metadata = storage.get_object(&path).await.unwrap_or_default(); + if stream_metadata.is_empty() { + return Ok(()); + } let stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("stream.json is valid json"); diff --git a/server/src/validator.rs b/server/src/validator.rs index 6281354d8..9a6d15b49 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -24,7 +24,7 @@ use self::error::{AlertValidationError, StreamNameValidationError, UsernameValid // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ - "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", + "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", "meta", ]; pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> {