Skip to content

Commit

Permalink
feat: added internal stream "meta" (#801)
Browse files Browse the repository at this point in the history
This PR adds internal stream "meta" to be used 
in distributed mode only. The query node creates this 
internal stream. It fetches cluster metrics 
from all ingestion nodes, adds the metrics data to an 
event and ingests the event to this internal stream.
This data will be used by console in cluster page to
show the stats per ingestor.
  • Loading branch information
nikhilsinhaparseable authored May 22, 2024
1 parent ad7f67e commit 0da46fa
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 80 deletions.
7 changes: 4 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
};

use crate::{
handlers::http::cluster::INTERNAL_STREAM_NAME,
option::{Mode, CONFIG},
utils,
};
Expand Down Expand Up @@ -132,7 +133,7 @@ impl WriterTable {
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
Expand Down Expand Up @@ -161,7 +162,7 @@ impl WriterTable {
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
writer.lock().unwrap().push(
stream_name,
schema_key,
Expand All @@ -174,7 +175,7 @@ impl WriterTable {
}
}
None => {
if CONFIG.parseable.mode != Mode::Query {
if CONFIG.parseable.mode != Mode::Query || stream_name == INTERNAL_STREAM_NAME {
let mut writer = Writer::default();
writer.push(
stream_name,
Expand Down
156 changes: 113 additions & 43 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod utils;
use crate::handlers::http::cluster::utils::{
check_liveness, to_url_string, IngestionStats, QueriedStats,
};
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::option::CONFIG;
Expand All @@ -46,8 +46,13 @@ type IngestorMetadataArr = Vec<IngestorMetadata>;
use self::utils::StorageStats;

use super::base_path_without_preceding_slash;
use std::time::Duration;

use super::modal::IngestorMetadata;
use clokwerk::{AsyncScheduler, Interval};
pub const INTERNAL_STREAM_NAME: &str = "meta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

pub async fn sync_cache_with_ingestors(
url: &str,
Expand Down Expand Up @@ -432,50 +437,11 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
}

pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
let dresses = fetch_cluster_metrics().await.map_err(|err| {
log::error!("Fatal: failed to fetch cluster metrics: {:?}", err);
PostError::Invalid(err.into())
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;

dresses.push(Metrics::from_prometheus_samples(
sample,
ingestor.domain_name,
));
} else {
log::warn!(
"Failed to fetch metrics from ingestor: {}\n",
ingestor.domain_name,
);
}
}

Ok(actix_web::HttpResponse::Ok().json(dresses))
}

Expand Down Expand Up @@ -545,3 +511,107 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
log::info!("{}", &msg);
Ok((msg, StatusCode::OK))
}

async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
let ingestor_metadata = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err)
})?;

let mut dresses = vec![];

for ingestor in ingestor_metadata {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.samples;
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor metrics: {:?}", err);
PostError::Invalid(err.into())
})?;
dresses.push(ingestor_metrics);
} else {
log::warn!(
"Failed to fetch metrics from ingestor: {}\n",
&ingestor.domain_name,
);
}
}
Ok(dresses)
}

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
log::info!("Setting up schedular for cluster metrics ingestion");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
.run(move || async {
let result: Result<(), PostError> = async {
let cluster_metrics = fetch_cluster_metrics().await;
if let Ok(metrics) = cluster_metrics {
if !metrics.is_empty() {
log::info!("Cluster metrics fetched successfully from all ingestors");
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
let stream_name = INTERNAL_STREAM_NAME;

if matches!(
ingest_internal_stream(
stream_name.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Ok(())
) {
log::info!(
"Cluster metrics successfully ingested into internal stream"
);
} else {
log::error!(
"Failed to ingest cluster metrics into internal stream"
);
}
} else {
log::error!("Failed to serialize cluster metrics");
}
}
}
Ok(())
}
.await;

if let Err(err) = result {
log::error!("Error in cluster metrics scheduler: {:?}", err);
}
});

tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
});

Ok(())
}
48 changes: 47 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use super::cluster::INTERNAL_STREAM_NAME;
use super::logstream::error::CreateStreamError;
use super::{kinesis, otel};
use crate::event::{
Expand Down Expand Up @@ -52,6 +53,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
if stream_name.eq(INTERNAL_STREAM_NAME) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
stream_name
)));
}
create_stream_if_not_exists(&stream_name).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Expand All @@ -61,6 +68,40 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}
}

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
create_stream_if_not_exists(&stream_name).await?;
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let (rb, is_first) = {
let body_val: Value = serde_json::from_slice(&body)?;
let hash_map = STREAM_INFO.read().unwrap();
let schema = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.schema
.clone();
let event = format::json::Event {
data: body_val,
tags: String::default(),
metadata: String::default(),
};
event.into_recordbatch(schema, None, None)?
};
event::Event {
rb,
stream_name,
origin_format: "json",
origin_size: size as u64,
is_first_event: is_first,
parsed_timestamp,
time_partition: None,
custom_partition_values: HashMap::new(),
}
.process()
.await?;
Ok(())
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
Expand Down Expand Up @@ -93,7 +134,12 @@ async fn flatten_and_push_logs(
// fails if the logstream does not exist
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if stream_name.eq(INTERNAL_STREAM_NAME) {
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream {} is an internal stream and cannot be ingested into",
stream_name
)));
}
flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
}
Expand Down
6 changes: 4 additions & 2 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
Expand Down Expand Up @@ -591,7 +591,9 @@ pub async fn create_stream(
schema: Arc<Schema>,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
validator::stream_name(&stream_name)?;
if stream_name.ne(INTERNAL_STREAM_NAME) {
validator::stream_name(&stream_name)?;
}

// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
Expand Down
3 changes: 3 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt;
use crate::localcache::LocalCacheManager;
use crate::metadata;
use crate::metrics;
use crate::migration;
use crate::migration::metadata_migration::migrate_ingester_metadata;
use crate::rbac;
use crate::rbac::role::Action;
Expand Down Expand Up @@ -328,6 +329,8 @@ impl IngestServer {
let prometheus = metrics::build_metrics_handler();
CONFIG.storage().register_store_metrics(&prometheus);

migration::run_migration(&CONFIG).await?;

let storage = CONFIG.storage().get_object_store();
if let Err(err) = metadata::STREAM_INFO.load(&*storage).await {
log::warn!("could not populate local metadata. {:?}", err);
Expand Down
41 changes: 35 additions & 6 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/

use crate::handlers::airplane;
use crate::handlers::http::cluster;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};

use crate::rbac::role::Action;
use crate::sync;
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
use actix_web::web;
use actix_web::web::ServiceConfig;
Expand Down Expand Up @@ -185,11 +186,39 @@ impl QueryServer {
analytics::init_analytics_scheduler()?;
}

tokio::spawn(airplane::server());

self.start(prometheus, CONFIG.parseable.openid.clone())
.await?;
if matches!(init_cluster_metrics_schedular(), Ok(())) {
log::info!("Cluster metrics scheduler started successfully");
}
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync();

Ok(())
tokio::spawn(airplane::server());
let app = self.start(prometheus, CONFIG.parseable.openid.clone());

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
localsync_handler.join().unwrap_or(());
remote_sync_handler.join().unwrap_or(());
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
remote_sync_handler.join().unwrap_or(());
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync();
}

};
}
}
}
Loading

0 comments on commit 0da46fa

Please sign in to comment.