From 876fcb7fea9d3abbde0a77ba3ef32e9b6293106f Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 11 Apr 2024 11:53:17 +0530 Subject: [PATCH 01/10] update get_address to use url instead of sockaddr --- server/src/catalog.rs | 21 ++++++++++++++++--- .../src/handlers/http/modal/ingest_server.rs | 15 ++++++++++--- server/src/metrics/prom_utils.rs | 6 +++++- server/src/storage/object_storage.rs | 15 ++++++++----- server/src/storage/staging.rs | 4 ++-- server/src/utils.rs | 11 +++++----- 6 files changed, 52 insertions(+), 20 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 1d0270a88..3498c8183 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -118,7 +118,12 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { let s = get_address(); - let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE); + let p = format!( + "{}.{}.{}", + s.domain().unwrap(), + s.port().unwrap_or_default(), + MANIFEST_FILE + ); if m.manifest_path.contains(&p) { ch = true; } @@ -152,7 +157,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage @@ -186,7 +196,12 @@ pub async fn update_snapshot( }; let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap(), + MANIFEST_FILE + ); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index ac92c7ca1..04dc98a3b 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -189,7 +189,10 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); let sock = get_address(); - let path = ingestor_metadata_path(sock.ip().to_string(), sock.port().to_string()); + let path = ingestor_metadata_path( + sock.domain().unwrap().to_string(), + sock.port().unwrap_or_default().to_string(), + ); if store.get_object(&path).await.is_ok() { println!("ingestor metadata already exists"); @@ -198,13 +201,19 @@ impl IngestServer { let scheme = CONFIG.parseable.get_scheme(); let resource = IngestorMetadata::new( - sock.port().to_string(), + sock.port().unwrap_or_default().to_string(), CONFIG .parseable .domain_address .clone() .unwrap_or_else(|| { - Url::parse(&format!("{}://{}:{}", scheme, sock.ip(), sock.port())).unwrap() + Url::parse(&format!( + "{}://{}:{}", + scheme, + sock.domain().unwrap(), + sock.port().unwrap_or_default() + )) + .unwrap() }) .to_string(), DEFAULT_VERSION.to_string(), diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 6fb99405d..37a0445bf 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -23,7 +23,11 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { let socket = get_address(); - let address = format!("http://{}:{}", socket.ip(), socket.port()); + let address = format!( + "http://{}:{}", + socket.domain().unwrap(), + socket.port().unwrap_or_default() + ); Metrics { address, parseable_events_ingested: 0.0, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index c3acec8fa..573b60bb4 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -541,8 +541,8 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), SCHEMA_FILE_NAME ); @@ -561,8 +561,8 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { let addr = get_address(); let file_name = format!( ".ingestor.{}.{}{}", - addr.ip(), - addr.port(), + addr.domain().unwrap(), + addr.port().unwrap_or_default(), STREAM_METADATA_FILE_NAME ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) @@ -589,7 +589,12 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { let addr = get_address(); - let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE); + let mainfest_file_name = format!( + "{}.{}.{}", + addr.domain().unwrap(), + addr.port().unwrap_or_default(), + MANIFEST_FILE + ); RelativePathBuf::from_iter([prefix, &mainfest_file_name]) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index f2f42a901..b3637ce68 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -64,8 +64,8 @@ impl StorageDir { + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); let sock = get_address(); - let ip = sock.ip(); - let port = sock.port(); + let ip = sock.domain().unwrap(); + let port = sock.port().unwrap_or_default(); format!("{local_uri}{ip}.{port}.{extention}") } diff --git a/server/src/utils.rs b/server/src/utils.rs index d1cf6a155..015553b1a 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -25,7 +25,9 @@ pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; use std::env; +#[allow(unused_imports)] use std::net::SocketAddr; +use url::Url; #[allow(dead_code)] pub fn hostname() -> Option { @@ -224,10 +226,9 @@ impl TimePeriod { } } -#[inline(always)] -pub fn get_address() -> SocketAddr { +pub fn get_address() -> Url { if CONFIG.parseable.ingestor_url.is_empty() { - CONFIG.parseable.address.parse::().unwrap() + CONFIG.parseable.address.parse::().unwrap() } else { let addr_from_env = CONFIG .parseable @@ -245,9 +246,7 @@ pub fn get_address() -> SocketAddr { let var_port = port[1..].to_string(); port = get_from_env(&var_port); } - format!("{}:{}", hostname, port) - .parse::() - .unwrap() + format!("{}:{}", hostname, port).parse::().unwrap() } } fn get_from_env(var_to_fetch: &str) -> String { From 87452c1b9bfef52ec2a3a4b2d8412d1a4f83776b Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 11 Apr 2024 12:11:58 +0530 Subject: [PATCH 02/10] fix relative url with base --- server/src/utils.rs | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 015553b1a..316ece654 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -226,29 +226,38 @@ impl TimePeriod { } } +// TODO: CLEAN UP pub fn get_address() -> Url { if CONFIG.parseable.ingestor_url.is_empty() { - CONFIG.parseable.address.parse::().unwrap() - } else { - let addr_from_env = CONFIG - .parseable - .ingestor_url - .split(':') - .collect::>(); - - let mut hostname = addr_from_env[0].to_string(); - let mut port = addr_from_env[1].to_string(); - if hostname.starts_with('$') { - let var_hostname = hostname[1..].to_string(); - hostname = get_from_env(&var_hostname); - } - if port.starts_with('$') { - let var_port = port[1..].to_string(); - port = get_from_env(&var_port); + let url = format!( + "{}://{}", + CONFIG.parseable.get_scheme(), + CONFIG.parseable.address + ); + return url.parse::().unwrap(); + } + let addr_from_env = CONFIG + .parseable + .ingestor_url + .split(':') + .collect::>(); + + let mut hostname = addr_from_env[0].to_string(); + let mut port = addr_from_env[1].to_string(); + if hostname.starts_with('$') { + let var_hostname = hostname[1..].to_string(); + hostname = get_from_env(&var_hostname); + if !hostname.starts_with("http") { + hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); } - format!("{}:{}", hostname, port).parse::().unwrap() } + if port.starts_with('$') { + let var_port = port[1..].to_string(); + port = get_from_env(&var_port); + } + format!("{}:{}", hostname, port).parse::().unwrap() } + fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) } From e647567a15c939f5eb5a8471870f19ba82f5ec6d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 11 Apr 2024 12:49:57 +0530 Subject: [PATCH 03/10] fix to check scheme for hostname --- server/src/utils.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/utils.rs b/server/src/utils.rs index 316ece654..5dd0bad36 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -247,10 +247,11 @@ pub fn get_address() -> Url { if hostname.starts_with('$') { let var_hostname = hostname[1..].to_string(); hostname = get_from_env(&var_hostname); - if !hostname.starts_with("http") { - hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); - } } + if !hostname.starts_with("http") { + hostname = format!("{}://{}", CONFIG.parseable.get_scheme(), hostname); + } + if port.starts_with('$') { let var_port = port[1..].to_string(); port = get_from_env(&var_port); From bb5790f4657d055ae956bee446cade63cfc07f1c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 15 Apr 2024 13:34:54 +0530 Subject: [PATCH 04/10] rename ingestor_url to ingestor_endpoint --- server/src/cli.rs | 14 +++++++------- server/src/utils.rs | 16 ++++++++++------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 41e03b39f..2ad9899cd 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -88,7 +88,7 @@ pub struct Cli { pub mode: Mode, /// public address for the parseable server ingestor - pub ingestor_url: String, + pub ingestor_endpoint: String, } impl Cli { @@ -115,7 +115,7 @@ impl Cli { pub const ROW_GROUP_SIZE: &'static str = "row-group-size"; pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo"; pub const MODE: &'static str = "mode"; - pub const INGESTOR_URL: &'static str = "ingestor-url"; + pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; @@ -317,9 +317,9 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_URL) - .long(Self::INGESTOR_URL) - .env("P_INGESTOR_URL") + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") .value_name("URL") .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") @@ -367,8 +367,8 @@ impl FromArgMatches for Cli { .cloned() .expect("default value for address"); - self.ingestor_url = m - .get_one::(Self::INGESTOR_URL) + self.ingestor_endpoint = m + .get_one::(Self::INGESTOR_ENDPOINT) .cloned() .unwrap_or_else(String::default); diff --git a/server/src/utils.rs b/server/src/utils.rs index 5dd0bad36..987e41262 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -226,24 +226,27 @@ impl TimePeriod { } } -// TODO: CLEAN UP pub fn get_address() -> Url { - if CONFIG.parseable.ingestor_url.is_empty() { - let url = format!( + if CONFIG.parseable.ingestor_endpoint.is_empty() { + return format!( "{}://{}", CONFIG.parseable.get_scheme(), CONFIG.parseable.address - ); - return url.parse::().unwrap(); + ) + .parse::() // if the value was improperly set, this will panic before hand + .unwrap(); } let addr_from_env = CONFIG .parseable - .ingestor_url + .ingestor_endpoint .split(':') .collect::>(); let mut hostname = addr_from_env[0].to_string(); let mut port = addr_from_env[1].to_string(); + + // if the env var value fits the pattern $VAR_NAME:$VAR_NAME + // fetch the value from the specified env vars if hostname.starts_with('$') { let var_hostname = hostname[1..].to_string(); hostname = get_from_env(&var_hostname); @@ -259,6 +262,7 @@ pub fn get_address() -> Url { format!("{}:{}", hostname, port).parse::().unwrap() } +/// util fuction to fetch value from an env var fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) } From dd5bae6c9f61301aef8a286e0666f1b115e8a7a3 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Apr 2024 15:33:15 +0530 Subject: [PATCH 05/10] update files in distributed mode to use hash --- Cargo.lock | 9 +-- server/Cargo.toml | 1 + server/src/catalog.rs | 8 +-- server/src/handlers/http/cluster/mod.rs | 27 ++++++-- .../src/handlers/http/modal/ingest_server.rs | 47 ++++--------- server/src/handlers/http/modal/mod.rs | 13 +++- .../src/handlers/http/modal/query_server.rs | 4 +- server/src/metrics/prom_utils.rs | 4 +- server/src/storage/object_storage.rs | 31 +++++---- server/src/storage/staging.rs | 66 +++++++++++++++++-- server/src/storage/store_metadata.rs | 10 +-- server/src/utils.rs | 12 +++- 12 files changed, 152 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7840d5e9..61117c66a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1490,9 +1490,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -2758,6 +2758,7 @@ dependencies = [ "serde_json", "serde_repr", "sha1_smol", + "sha2", "static-files", "sysinfo", "thiserror", @@ -3529,9 +3530,9 @@ checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "sha2" -version = "0.10.6" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", diff --git a/server/Cargo.toml b/server/Cargo.toml index ce1581cd9..99eb9722a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] } path-clean = "1.0.1" prost = "0.12.3" prometheus-parse = "0.2.5" +sha2 = "0.10.8" [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 3498c8183..941f2baa0 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -25,7 +25,7 @@ use crate::{ catalog::manifest::Manifest, query::PartialTimeFilter, storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE}, - utils::get_address, + utils::get_url, }; use self::{column::Column, snapshot::ManifestItem}; @@ -117,7 +117,7 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { - let s = get_address(); + let s = get_url(); let p = format!( "{}.{}.{}", s.domain().unwrap(), @@ -156,7 +156,7 @@ pub async fn update_snapshot( ..Manifest::default() }; - let addr = get_address(); + let addr = get_url(); let mainfest_file_name = format!( "{}.{}.{}", addr.domain().unwrap(), @@ -195,7 +195,7 @@ pub async fn update_snapshot( ..Manifest::default() }; - let addr = get_address(); + let addr = get_url(); let mainfest_file_name = format!( "{}.{}.{}", addr.domain().unwrap(), diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index a4720506f..7786dc396 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -362,14 +362,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result(elem).unwrap_or_default()) + .collect_vec(); + + let ingestor_metadata = ingestor_metadata + .iter() + .filter(|elem| elem.domain_name == domain_name) + .collect_vec(); + + let ingestor_meta_filename = + ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string(); let msg = match object_store .try_delete_ingestor_meta(ingestor_meta_filename) .await diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 04dc98a3b..ec4643d3f 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -28,6 +28,7 @@ use crate::rbac::role::Action; use crate::storage; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; +use crate::storage::staging; use crate::storage::ObjectStorageError; use crate::sync; @@ -36,9 +37,7 @@ use super::ssl_acceptor::get_ssl_acceptor; use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; -use super::DEFAULT_VERSION; -use crate::utils::get_address; use actix_web::body::MessageBody; use actix_web::Scope; use actix_web::{web, App, HttpServer}; @@ -46,14 +45,17 @@ use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; use itertools::Itertools; +use once_cell::sync::Lazy; use relative_path::RelativePathBuf; -use url::Url; use crate::{ handlers::http::{base_path, cross_origin_config}, option::CONFIG, }; +pub static INGESTOR_META: Lazy = + Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable")); + #[derive(Default)] pub struct IngestServer; @@ -102,6 +104,7 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { self.validate()?; + // check for querier state. Is it there, or was it there in the past self.check_querier_state().await?; // to get the .parseable.json file in staging @@ -188,46 +191,23 @@ impl IngestServer { async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { let store = CONFIG.storage().get_object_store(); - let sock = get_address(); - let path = ingestor_metadata_path( - sock.domain().unwrap().to_string(), - sock.port().unwrap_or_default().to_string(), - ); + // find the meta file in staging if not generate new metadata + let resource = INGESTOR_META.clone(); + // use the id that was generated/found in the staging and + // generate the path for the object store + let path = ingestor_metadata_path(None); if store.get_object(&path).await.is_ok() { - println!("ingestor metadata already exists"); + log::info!("ingestor metadata already exists"); return Ok(()); }; - let scheme = CONFIG.parseable.get_scheme(); - let resource = IngestorMetadata::new( - sock.port().unwrap_or_default().to_string(), - CONFIG - .parseable - .domain_address - .clone() - .unwrap_or_else(|| { - Url::parse(&format!( - "{}://{}:{}", - scheme, - sock.domain().unwrap(), - sock.port().unwrap_or_default() - )) - .unwrap() - }) - .to_string(), - DEFAULT_VERSION.to_string(), - store.get_bucket_name(), - &CONFIG.parseable.username, - &CONFIG.parseable.password, - ); - let resource = serde_json::to_string(&resource) .unwrap() .try_into_bytes() .unwrap(); - store.put_object(&path, resource).await?; + store.put_object(&path, resource.clone()).await?; Ok(()) } @@ -286,6 +266,7 @@ impl IngestServer { } async fn initialize(&self) -> anyhow::Result<()> { + // ! Undefined and Untested behaviour if let Some(cache_manager) = LocalCacheManager::global() { cache_manager .validate(CONFIG.parseable.local_cache_size) diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 29ef214ff..edd7bd3c3 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -61,6 +61,7 @@ pub struct IngestorMetadata { pub domain_name: String, pub bucket_name: String, pub token: String, + pub ingestor_id: String, } impl IngestorMetadata { @@ -71,6 +72,7 @@ impl IngestorMetadata { bucket_name: String, username: &str, password: &str, + ingestor_id: String, ) -> Self { let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); @@ -82,8 +84,13 @@ impl IngestorMetadata { version, bucket_name, token, + ingestor_id, } } + + pub fn get_ingestor_id(&self) -> String { + self.ingestor_id.clone() + } } #[cfg(test)] @@ -102,9 +109,10 @@ mod test { "somebucket".to_string(), "admin", "admin", + "ingestor_id".to_string(), ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap(); assert_eq!(rhs, lhs); } @@ -118,13 +126,14 @@ mod test { "somebucket".to_string(), "admin", "admin", + "ingestor_id".to_string(), ); let lhs = serde_json::to_string(&im) .unwrap() .try_into_bytes() .unwrap(); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4="}"# + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"# .try_into_bytes() .unwrap(); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 8611e9910..1cdf7866a 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -125,12 +125,12 @@ impl QueryServer { .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Server::get_user_role_webscope()) - .service(Self::get_cluster_info_web_scope()), + .service(Self::get_cluster_web_scope()), ) .service(Server::get_generated()); } - fn get_cluster_info_web_scope() -> actix_web::Scope { + fn get_cluster_web_scope() -> actix_web::Scope { web::scope("/cluster") .service( // GET "/cluster/info" ==> Get info of the cluster diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 37a0445bf..d30edc367 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,4 +1,4 @@ -use crate::utils::get_address; +use crate::utils::get_url; use prometheus_parse::Sample as PromSample; use prometheus_parse::Value as PromValue; use serde::Serialize; @@ -22,7 +22,7 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { - let socket = get_address(); + let socket = get_url(); let address = format!( "http://{}:{}", socket.domain().unwrap(), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 573b60bb4..23b02c512 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -25,8 +25,9 @@ use super::{ SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::option::Mode; -use crate::utils::get_address; +use crate::utils::get_url; use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -257,7 +258,7 @@ pub trait ObjectStorage: Sync + 'static { let stream_metadata = match self.get_object(&stream_json_path(stream_name)).await { Ok(data) => data, Err(_) => { - // ! this is hard coded for now + // get the base stream metadata let bytes = self .get_object(&RelativePathBuf::from_iter([ stream_name, @@ -538,11 +539,9 @@ fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { fn schema_path(stream_name: &str) -> RelativePathBuf { match CONFIG.parseable.mode { Mode::Ingest => { - let addr = get_address(); let file_name = format!( - ".ingestor.{}.{}{}", - addr.domain().unwrap(), - addr.port().unwrap_or_default(), + ".ingestor.{}{}", + INGESTOR_META.ingestor_id.clone(), SCHEMA_FILE_NAME ); @@ -558,11 +557,9 @@ fn schema_path(stream_name: &str) -> RelativePathBuf { pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { match &CONFIG.parseable.mode { Mode::Ingest => { - let addr = get_address(); let file_name = format!( - ".ingestor.{}.{}{}", - addr.domain().unwrap(), - addr.port().unwrap_or_default(), + ".ingestor.{}{}", + INGESTOR_META.get_ingestor_id(), STREAM_METADATA_FILE_NAME ); RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) @@ -581,6 +578,7 @@ pub fn parseable_json_path() -> RelativePathBuf { RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, PARSEABLE_METADATA_FILE_NAME]) } +/// TODO: Needs to be updated for distributed mode #[inline(always)] fn alert_json_path(stream_name: &str) -> RelativePathBuf { RelativePathBuf::from_iter([stream_name, ALERT_FILE_NAME]) @@ -588,7 +586,7 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { #[inline(always)] fn manifest_path(prefix: &str) -> RelativePathBuf { - let addr = get_address(); + let addr = get_url(); let mainfest_file_name = format!( "{}.{}.{}", addr.domain().unwrap(), @@ -599,9 +597,16 @@ fn manifest_path(prefix: &str) -> RelativePathBuf { } #[inline(always)] -pub fn ingestor_metadata_path(ip: String, port: String) -> RelativePathBuf { +pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf { + if let Some(id) = id { + return RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + &format!("ingestor.{}.json", id), + ]); + } + RelativePathBuf::from_iter([ PARSEABLE_ROOT_DIRECTORY, - &format!("ingestor.{}.{}.json", ip, port), + &format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()), ]) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index b3637ce68..23f37fb70 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -27,10 +27,11 @@ use std::{ use crate::{ event::DEFAULT_TIMESTAMP_KEY, + handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION}, metrics, - option::CONFIG, + option::{Mode, CONFIG}, storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_address}, + utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url}, }; use arrow_schema::{ArrowError, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; @@ -63,10 +64,12 @@ impl StorageDir { + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); - let sock = get_address(); - let ip = sock.domain().unwrap(); - let port = sock.port().unwrap_or_default(); - format!("{local_uri}{ip}.{port}.{extention}") + if CONFIG.parseable.mode == Mode::Ingest { + let id = INGESTOR_META.get_ingestor_id(); + format!("{local_uri}{id}.{extention}") + } else { + format!("{local_uri}.{extention}") + } } fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { @@ -160,9 +163,9 @@ impl StorageDir { fn arrow_path_to_parquet(path: &Path) -> PathBuf { let filename = path.file_name().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); - let filename = filename.rsplit_once('.').unwrap(); let filename = format!("{}.{}", filename.0, filename.1); + /* let file_stem = path.file_stem().unwrap().to_str().unwrap(); let random_string = @@ -288,6 +291,55 @@ fn parquet_writer_props( } } +pub fn get_ingestor_info() -> anyhow::Result { + let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + + // all the files should be in the staging directory root + let entries = std::fs::read_dir(path)?; + + for entry in entries { + // cause the staging directory will have only one file with ingestor in the name + // so the JSON Parse should not error unless the file is corrupted + let path = entry?.path(); + let flag = path + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("ingestor"); + + if flag { + return Ok(serde_json::from_slice(&std::fs::read(path)?)?); + } + } + + let store = CONFIG.storage().get_object_store(); + let url = get_url(); + let out = IngestorMetadata::new( + url.port().unwrap().to_string(), // here port should be defined + url.to_string(), + DEFAULT_VERSION.to_string(), + store.get_bucket_name(), + &CONFIG.parseable.username, + &CONFIG.parseable.password, + get_ingestor_id(), + ); + + put_ingestor_info(out.clone())?; + + Ok(out) +} + +fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { + let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + let file_name = format!("ingestor.{}.json", info.ingestor_id); + let file_path = path.join(file_name); + + std::fs::write(file_path, serde_json::to_string(&info)?)?; + + Ok(()) +} + #[derive(Debug, thiserror::Error)] pub enum MoveDataError { #[error("Unable to create recordbatch stream")] diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 18a3efaeb..3c58982d2 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -115,7 +115,7 @@ pub async fn resolve_parseable_metadata() -> Result Result { - standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) + standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) .map_err(|err| { ObjectStorageError::Custom(err.to_string()) })?; @@ -232,9 +232,9 @@ pub enum EnvChange { CreateBoth, } -fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> { - // mode::all -> mode::query | mode::ingest allowed - // but mode::query | mode::ingest -> mode::all not allowed +fn standalone_after_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> { + // standalone -> query | ingest allowed + // but query | ingest -> standalone not allowed if remote_server_mode == Mode::Query { return Err(MetadataError::StandaloneWithDistributed("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.".to_string())); } diff --git a/server/src/utils.rs b/server/src/utils.rs index 987e41262..0cd2bfb64 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -24,6 +24,7 @@ pub mod uid; pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use sha2::{Digest, Sha256}; use std::env; #[allow(unused_imports)] use std::net::SocketAddr; @@ -226,7 +227,7 @@ impl TimePeriod { } } -pub fn get_address() -> Url { +pub fn get_url() -> Url { if CONFIG.parseable.ingestor_endpoint.is_empty() { return format!( "{}://{}", @@ -267,6 +268,15 @@ fn get_from_env(var_to_fetch: &str) -> String { env::var(var_to_fetch).unwrap_or_else(|_| "".to_string()) } +pub fn get_ingestor_id() -> String { + let now = Utc::now().to_rfc3339().to_string(); + let mut hasher = Sha256::new(); + hasher.update(now); + let result = format!("{:x}", hasher.finalize()); + log::debug!("Ingestor ID: {}", &result); + result +} + #[cfg(test)] mod tests { use chrono::DateTime; From 8c0595859a2e3a487b0381bf490dc7da75a3644a Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Apr 2024 14:02:30 +0530 Subject: [PATCH 06/10] fix: use hash for distributed mode --- server/src/catalog.rs | 43 +++++++++------------------- server/src/migration.rs | 4 +-- server/src/option.rs | 2 +- server/src/static_schema.rs | 1 - server/src/storage/object_storage.rs | 22 +++++++------- server/src/storage/staging.rs | 14 +++++---- server/src/sync.rs | 4 +-- server/src/utils.rs | 13 ++++----- 8 files changed, 45 insertions(+), 58 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 941f2baa0..6f70d0a41 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -16,17 +16,16 @@ * */ -use std::sync::Arc; - -use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; -use relative_path::RelativePathBuf; +use std::{io::ErrorKind, sync::Arc}; use crate::{ catalog::manifest::Manifest, query::PartialTimeFilter, - storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE}, - utils::get_url, + storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; +use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; +use relative_path::RelativePathBuf; +use std::io::Error as IOError; use self::{column::Column, snapshot::ManifestItem}; @@ -117,13 +116,7 @@ pub async fn update_snapshot( let mut ch = false; for m in manifests.iter() { - let s = get_url(); - let p = format!( - "{}.{}.{}", - s.domain().unwrap(), - s.port().unwrap_or_default(), - MANIFEST_FILE - ); + let p = manifest_path("").to_string(); if m.manifest_path.contains(&p) { ch = true; } @@ -147,7 +140,11 @@ pub async fn update_snapshot( 23 * 3600 + 59 * 60 + 59, 999_999_999, ) - .unwrap(), + .ok_or(IOError::new( + ErrorKind::Other, + "Failed to create upper bound for manifest", + )) + .map_err(ObjectStorageError::IoError)?, ) .and_utc(); @@ -156,17 +153,11 @@ pub async fn update_snapshot( ..Manifest::default() }; - let addr = get_url(); - let mainfest_file_name = format!( - "{}.{}.{}", - addr.domain().unwrap(), - addr.port().unwrap_or_default(), - MANIFEST_FILE - ); + let mainfest_file_name = manifest_path("").to_string(); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage - .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) + .put_object(&path, serde_json::to_vec(&manifest)?.into()) .await?; let path = storage.absolute_url(&path); let new_snapshot_entriy = snapshot::ManifestItem { @@ -195,13 +186,7 @@ pub async fn update_snapshot( ..Manifest::default() }; - let addr = get_url(); - let mainfest_file_name = format!( - "{}.{}.{}", - addr.domain().unwrap(), - addr.port().unwrap(), - MANIFEST_FILE - ); + let mainfest_file_name = manifest_path("").to_string(); let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) diff --git a/server/src/migration.rs b/server/src/migration.rs index 3b8bd6db2..9e7e9a3db 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -244,7 +244,7 @@ async fn run_meta_file_migration( // we can unwrap here because we know the file exists let new_path = RelativePathBuf::from_iter([ PARSEABLE_ROOT_DIRECTORY, - file.file_name().unwrap(), + file.file_name().expect("should have a file name"), ]); object_store.put_object(&new_path, bytes).await?; object_store.delete_object(&file).await?; @@ -281,7 +281,7 @@ async fn run_stream_files_migration( let new_path = RelativePathBuf::from_iter([ stream.as_str(), STREAM_ROOT_DIRECTORY, - path.file_name().unwrap(), + path.file_name().expect("should have a file name"), ]); object_store.put_object(&new_path, bytes).await?; object_store.delete_object(&path).await?; diff --git a/server/src/option.rs b/server/src/option.rs index 1983fdd07..0fb12f7f4 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -229,7 +229,7 @@ impl Mode { "Query" => Ok(Mode::Query), "Ingest" => Ok(Mode::Ingest), "All" => Ok(Mode::All), - x => Err(format!("Invalid mode: {}", x)), + x => Err(format!("Trying to Parse Invalid mode: {}", x)), } } } diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index 8e95b9b1e..cbf48b7ae 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -12,7 +12,6 @@ pub struct StaticSchema { } #[derive(Serialize, Deserialize, Debug)] - pub struct SchemaFields { name: String, data_type: String, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 23b02c512..d2e4fa02c 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,7 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::option::Mode; -use crate::utils::get_url; + use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -585,15 +585,17 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { } #[inline(always)] -fn manifest_path(prefix: &str) -> RelativePathBuf { - let addr = get_url(); - let mainfest_file_name = format!( - "{}.{}.{}", - addr.domain().unwrap(), - addr.port().unwrap_or_default(), - MANIFEST_FILE - ); - RelativePathBuf::from_iter([prefix, &mainfest_file_name]) +pub fn manifest_path(prefix: &str) -> RelativePathBuf { + if CONFIG.parseable.mode == Mode::Ingest { + let manifest_file_name = format!( + "ingestor.{}.{}", + INGESTOR_META.get_ingestor_id(), + MANIFEST_FILE + ); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } else { + RelativePathBuf::from_iter([MANIFEST_FILE]) + } } #[inline(always)] diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 23f37fb70..5830dfe72 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -31,7 +31,10 @@ use crate::{ metrics, option::{Mode, CONFIG}, storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url}, + utils::{ + self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url, + hostname_unchecked, + }, }; use arrow_schema::{ArrowError, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; @@ -64,11 +67,12 @@ impl StorageDir { + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); + let hostname = hostname_unchecked(); if CONFIG.parseable.mode == Mode::Ingest { let id = INGESTOR_META.get_ingestor_id(); - format!("{local_uri}{id}.{extention}") + format!("{local_uri}{hostname}{id}.{extention}") } else { - format!("{local_uri}.{extention}") + format!("{local_uri}{hostname}.{extention}") } } @@ -163,7 +167,7 @@ impl StorageDir { fn arrow_path_to_parquet(path: &Path) -> PathBuf { let filename = path.file_name().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); - let filename = filename.rsplit_once('.').unwrap(); + let filename = filename.rsplit_once('.').expect("contains the delim `.`"); let filename = format!("{}.{}", filename.0, filename.1); /* @@ -316,7 +320,7 @@ pub fn get_ingestor_info() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); let url = get_url(); let out = IngestorMetadata::new( - url.port().unwrap().to_string(), // here port should be defined + url.port().expect("here port should be defined").to_string(), url.to_string(), DEFAULT_VERSION.to_string(), store.get_bucket_name(), diff --git a/server/src/sync.rs b/server/src/sync.rs index d7eb5d2d7..b44dcde13 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -28,7 +28,7 @@ use std::time::Duration; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; -pub(crate) fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { +pub fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -70,7 +70,7 @@ pub(crate) fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, one (handle, outbox_rx, inbox_tx) } -pub(crate) fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { +pub fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let mut inbox_rx = AssertUnwindSafe(inbox_rx); diff --git a/server/src/utils.rs b/server/src/utils.rs index 0cd2bfb64..e599e38cb 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -36,16 +36,11 @@ pub fn hostname() -> Option { .ok() .and_then(|hostname| hostname.into_string().ok()) } -#[allow(dead_code)] + pub fn hostname_unchecked() -> String { hostname::get().unwrap().into_string().unwrap() } -#[allow(dead_code)] -pub fn capitalize_ascii(s: &str) -> String { - s[0..1].to_uppercase() + &s[1..] -} - /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { @@ -235,7 +230,7 @@ pub fn get_url() -> Url { CONFIG.parseable.address ) .parse::() // if the value was improperly set, this will panic before hand - .unwrap(); + .expect("Valid URL"); } let addr_from_env = CONFIG .parseable @@ -260,7 +255,9 @@ pub fn get_url() -> Url { let var_port = port[1..].to_string(); port = get_from_env(&var_port); } - format!("{}:{}", hostname, port).parse::().unwrap() + format!("{}:{}", hostname, port) + .parse::() + .expect("Valid URL") } /// util fuction to fetch value from an env var From 7bd8102067ff32cf22222518b2047458df26fab6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Apr 2024 14:04:51 +0530 Subject: [PATCH 07/10] removed unwraps in favor of error propagation --- server/src/analytics.rs | 33 ++++++++++++------- server/src/handlers/http.rs | 4 +-- server/src/handlers/http/cluster/mod.rs | 14 ++++++-- server/src/handlers/http/logstream.rs | 6 ++-- .../src/handlers/http/modal/ingest_server.rs | 8 ++--- .../src/handlers/http/modal/query_server.rs | 2 +- server/src/handlers/http/modal/server.rs | 2 +- server/src/handlers/http/query.rs | 9 ++++- server/src/handlers/livetail.rs | 2 +- server/src/metrics/prom_utils.rs | 11 ++++--- server/src/storage/localfs.rs | 18 ++++++---- server/src/storage/object_storage.rs | 4 +-- server/src/storage/store_metadata.rs | 5 +-- 13 files changed, 75 insertions(+), 43 deletions(-) diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 9a76d0db7..d77e004f5 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -74,7 +74,7 @@ pub struct Report { } impl Report { - pub async fn new() -> Self { + pub async fn new() -> anyhow::Result { let mut upt: f64 = 0.0; if let Ok(uptime) = uptime_lib::get() { upt = uptime.as_secs_f64(); @@ -91,9 +91,9 @@ impl Report { cpu_count = info.cpus().len(); mem_total = info.total_memory(); } - let ingestor_metrics = fetch_ingestors_metrics().await; + let ingestor_metrics = fetch_ingestors_metrics().await?; - Self { + Ok(Self { deployment_id: storage::StorageMetadata::global().deployment_id, uptime: upt, report_created_at: Utc::now(), @@ -113,7 +113,7 @@ impl Report { total_json_bytes: ingestor_metrics.4, total_parquet_bytes: ingestor_metrics.5, metrics: build_metrics().await, - } + }) } pub async fn send(&self) { @@ -148,7 +148,7 @@ fn total_event_stats() -> (u64, u64, u64) { (total_events, total_json_bytes, total_parquet_bytes) } -async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) { +async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> { let event_stats = total_event_stats(); let mut node_metrics = NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2); @@ -181,9 +181,9 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) { .header(header::CONTENT_TYPE, "application/json") .send() .await - .unwrap(); // should respond + .expect("should respond"); - let data = serde_json::from_slice::(&resp.bytes().await.unwrap()).unwrap(); + let data = serde_json::from_slice::(&resp.bytes().await?)?; vec.push(data); active_ingestors += 1; } @@ -191,14 +191,14 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) { node_metrics.accumulate(&mut vec); } - ( + Ok(( active_ingestors, offline_ingestors, node_metrics.stream_count, node_metrics.total_events_count, node_metrics.total_json_bytes, node_metrics.total_parquet_bytes, - ) + )) } async fn build_metrics() -> HashMap { @@ -220,14 +220,23 @@ async fn build_metrics() -> HashMap { metrics } -pub fn init_analytics_scheduler() { +pub fn init_analytics_scheduler() -> anyhow::Result<()> { log::info!("Setting up schedular for anonymous user analytics"); let mut scheduler = AsyncScheduler::new(); scheduler .every(ANALYTICS_SEND_INTERVAL_SECONDS) .run(move || async { - Report::new().await.send().await; + Report::new() + .await + .unwrap_or_else(|err| { + // panicing because seperate thread + // TODO: a better way to handle this + log::error!("Error while sending analytics: {}", err.to_string()); + panic!("{}", err.to_string()); + }) + .send() + .await; }); tokio::spawn(async move { @@ -236,6 +245,8 @@ pub fn init_analytics_scheduler() { tokio::time::sleep(Duration::from_secs(10)).await; } }); + + Ok(()) } #[derive(Serialize, Deserialize, Default, Debug)] diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index a1f506b29..6044b74ac 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -85,7 +85,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; - let ima = get_ingestor_info().await.unwrap(); + let ima = get_ingestor_info().await?; for im in ima.iter() { let uri = format!( diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 7786dc396..0e091004b 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -37,6 +37,8 @@ use chrono::Utc; use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; +use serde::de::Error; +use serde_json::error::Error as SerdeError; use serde_json::Value as JsonValue; use url::Url; @@ -262,9 +264,13 @@ pub async fn get_cluster_info() -> Result { StreamError::SerdeError(err) })? .get("staging") - .unwrap() + .ok_or(StreamError::SerdeError(SerdeError::missing_field( + "staging", + )))? .as_str() - .unwrap() + .ok_or(StreamError::SerdeError(SerdeError::custom( + "staging path not a string/ not provided", + )))? .to_string(); (true, sp, None, status) @@ -304,7 +310,9 @@ pub async fn get_cluster_metrics() -> Result { &ingestor.domain_name, base_path_without_preceding_slash() )) - .unwrap(); + .map_err(|err| { + PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err)) + })?; let res = reqwest::Client::new() .get(uri) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e244b1d02..f857bd455 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -169,7 +169,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result None }; - let hash_map = STREAM_INFO.read().unwrap(); + let hash_map = STREAM_INFO.read().expect("Readable"); let stream_meta = &hash_map .get(&stream_name) .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; @@ -396,7 +396,7 @@ pub async fn get_stats(req: HttpRequest) -> Result stats }; - let stats = serde_json::to_value(stats).unwrap(); + let stats = serde_json::to_value(stats)?; Ok((web::Json(stats), StatusCode::OK)) } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index ec4643d3f..59744c24c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -42,6 +42,7 @@ use actix_web::body::MessageBody; use actix_web::Scope; use actix_web::{web, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; +use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; use itertools::Itertools; @@ -202,12 +203,11 @@ impl IngestServer { return Ok(()); }; - let resource = serde_json::to_string(&resource) - .unwrap() + let resource = serde_json::to_string(&resource)? .try_into_bytes() - .unwrap(); + .map_err(|err| anyhow!(err))?; - store.put_object(&path, resource.clone()).await?; + store.put_object(&path, resource).await?; Ok(()) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 1cdf7866a..ee258c0d3 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -180,7 +180,7 @@ impl QueryServer { // all internal data structures populated now. // start the analytics scheduler if enabled if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler(); + analytics::init_analytics_scheduler()?; } self.start(prometheus, CONFIG.parseable.openid.clone()) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 4d72bd9af..d7508f5bc 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -443,7 +443,7 @@ impl Server { sync::object_store_sync(); if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler(); + analytics::init_analytics_scheduler()?; } tokio::spawn(handlers::livetail::server()); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 39d22f2a0..26f29b592 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -70,7 +70,12 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Option { .iter() .filter_map(|value| value.to_str().ok()) .flat_map(Cookie::split_parse) - .map(|value| value.unwrap()) + .map(|value| value.expect("cookie is parseable")) .collect(); cookies diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index d30edc367..21e27c03f 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -22,11 +22,12 @@ struct StorageMetrics { impl Default for Metrics { fn default() -> Self { - let socket = get_url(); + let url = get_url(); let address = format!( "http://{}:{}", - socket.domain().unwrap(), - socket.port().unwrap_or_default() + url.domain() + .unwrap_or(url.host_str().expect("should have a host")), + url.port().unwrap_or_default() ); Metrics { address, @@ -68,11 +69,11 @@ impl Metrics { prom_dress.process_resident_memory_bytes += val; } } else if sample.metric == "parseable_storage_size" { - if sample.labels.get("type").unwrap() == "data" { + if sample.labels.get("type").expect("type is present") == "data" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_storage_size.data += val; } - } else if sample.labels.get("type").unwrap() == "staging" { + } else if sample.labels.get("type").expect("type is present") == "staging" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_storage_size.staging += val; } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index f480fa33a..fc7bd7f23 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -162,15 +162,17 @@ impl ObjectStorage for LocalFS { let flag = entry .path() .file_name() - .unwrap_or_default() + .ok_or(ObjectStorageError::NoSuchKey( + "Dir Entry Suggests no file present".to_string(), + ))? .to_str() - .unwrap_or_default() + .expect("file name is parseable to str") .contains("ingestor"); if flag { path_arr.push(RelativePathBuf::from_iter([ stream_name, - entry.path().file_name().unwrap().to_str().unwrap(), + entry.path().file_name().unwrap().to_str().unwrap(), // checking the error before hand ])); } } @@ -209,9 +211,11 @@ impl ObjectStorage for LocalFS { let path = entry .path() .file_name() - .unwrap() + .ok_or(ObjectStorageError::NoSuchKey( + "Dir Entry suggests no file present".to_string(), + ))? .to_str() - .unwrap() + .expect("file name is parseable to str") .to_owned(); let ingestor_file = filter_func(path); @@ -390,9 +394,9 @@ impl ObjectStorage for LocalFS { self.root .iter() .last() - .unwrap() + .expect("can be unwrapped without checking as the path is absolute") .to_str() - .unwrap() + .expect("valid unicode") .to_string() } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d2e4fa02c..a149f48f3 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -220,7 +220,7 @@ pub trait ObjectStorage: Sync + 'static { ]); let data = self.get_object(&schema_path).await?; // schema was not found in store, so it needs to be placed - self.put_schema(stream_name, &serde_json::from_slice(&data).unwrap()) + self.put_schema(stream_name, &serde_json::from_slice(&data)?) .await?; data @@ -331,7 +331,7 @@ pub trait ObjectStorage: Sync + 'static { .get("retention") .cloned(); if let Some(retention) = retention { - Ok(serde_json::from_value(retention).unwrap()) + Ok(serde_json::from_value(retention)?) } else { Ok(Retention::default()) } diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 3c58982d2..44ae55868 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -128,7 +128,7 @@ pub async fn resolve_parseable_metadata() -> Result { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if Mode::from_string(&metadata.server_mode).unwrap() == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if Mode::from_string(&metadata.server_mode).map_err(ObjectStorageError::Custom)? == Mode::All && CONFIG.parseable.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?; @@ -201,7 +201,8 @@ fn determine_environment( // if both staging and remote have same deployment id if staging.deployment_id == remote.deployment_id { EnvChange::None(remote) - } else if Mode::from_string(&remote.server_mode).unwrap() == Mode::All + } else if Mode::from_string(&remote.server_mode).expect("server mode is valid here") + == Mode::All && (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest) { // if you are switching to distributed mode from standalone mode From 4dec3d5fdf6fc81f5e5581aa4b22d65e0c993566 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 18 Apr 2024 12:44:53 +0530 Subject: [PATCH 08/10] update trim the id to be 15 chars long --- .../src/handlers/http/modal/ingest_server.rs | 26 ++++++++++++++++--- server/src/storage/staging.rs | 1 - server/src/utils.rs | 6 ++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 59744c24c..f9d391978 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -198,10 +198,28 @@ impl IngestServer { // generate the path for the object store let path = ingestor_metadata_path(None); - if store.get_object(&path).await.is_ok() { - log::info!("ingestor metadata already exists"); - return Ok(()); - }; + // we are considering that we can always get from object store + if let Ok(store_meta) = store.get_object(&path).await { + log::info!("Ingestor Metadata is present. Checking for updates"); + let mut store_data = serde_json::from_slice::(&store_meta) + .map_err(|_| anyhow!("IngestorMetadata was not parseable as valid json"))?; + + if store_data.domain_name != INGESTOR_META.domain_name { + log::info!("Ingestor Metadata update needed."); + store_data.domain_name = INGESTOR_META.domain_name.clone(); + store_data.port = INGESTOR_META.port.clone(); + + let resource = serde_json::to_string(&store_data)? + .try_into_bytes() + .map_err(|err| anyhow!(err))?; + + // if pushing to object store fails propagate the error + return store + .put_object(&path, resource) + .await + .map_err(|err| anyhow!(err)); + } + } let resource = serde_json::to_string(&resource)? .try_into_bytes() diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 5830dfe72..9a578881a 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -330,7 +330,6 @@ pub fn get_ingestor_info() -> anyhow::Result { ); put_ingestor_info(out.clone())?; - Ok(out) } diff --git a/server/src/utils.rs b/server/src/utils.rs index e599e38cb..ec60f115a 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -25,9 +25,8 @@ pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; use sha2::{Digest, Sha256}; + use std::env; -#[allow(unused_imports)] -use std::net::SocketAddr; use url::Url; #[allow(dead_code)] @@ -270,8 +269,9 @@ pub fn get_ingestor_id() -> String { let mut hasher = Sha256::new(); hasher.update(now); let result = format!("{:x}", hasher.finalize()); + let result = result.split_at(15).0.to_string(); log::debug!("Ingestor ID: {}", &result); - result + result.to_string() } #[cfg(test)] From 866308ac0d17ee0b654b412d48cf97973fc259f1 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 18 Apr 2024 14:51:55 +0530 Subject: [PATCH 09/10] fix: update information if domain, port, or token changes --- .../src/handlers/http/modal/ingest_server.rs | 12 ++++- server/src/storage/staging.rs | 53 +++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index f9d391978..1499913a7 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -54,6 +54,7 @@ use crate::{ option::CONFIG, }; +/// ! have to use a guard before using it pub static INGESTOR_META: Lazy = Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable")); @@ -206,8 +207,15 @@ impl IngestServer { if store_data.domain_name != INGESTOR_META.domain_name { log::info!("Ingestor Metadata update needed."); - store_data.domain_name = INGESTOR_META.domain_name.clone(); - store_data.port = INGESTOR_META.port.clone(); + log::info!( + "Old Domain Name: {}, New Domain Name: {}", + store_data.domain_name, + INGESTOR_META.domain_name + ); + store_data + .domain_name + .clone_from(&INGESTOR_META.domain_name); + store_data.port.clone_from(&INGESTOR_META.port); let resource = serde_json::to_string(&store_data)? .try_into_bytes() diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 9a578881a..512a9c4c0 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -37,6 +37,7 @@ use crate::{ }, }; use arrow_schema::{ArrowError, Schema}; +use base64::Engine; use chrono::{NaiveDateTime, Timelike, Utc}; use parquet::{ arrow::ArrowWriter, @@ -300,6 +301,9 @@ pub fn get_ingestor_info() -> anyhow::Result { // all the files should be in the staging directory root let entries = std::fs::read_dir(path)?; + let url = get_url(); + let port = url.port().expect("here port should be defined").to_string(); + let url = url.to_string(); for entry in entries { // cause the staging directory will have only one file with ingestor in the name @@ -313,15 +317,50 @@ pub fn get_ingestor_info() -> anyhow::Result { .contains("ingestor"); if flag { - return Ok(serde_json::from_slice(&std::fs::read(path)?)?); + // get the ingestor metadata from staging + let mut meta: IngestorMetadata = serde_json::from_slice(&std::fs::read(path)?)?; + + // compare url endpoint and port + if meta.domain_name != url { + log::info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, + url + ); + meta.domain_name = url; + } + + if meta.port != port { + log::info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } + + let token = base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )); + + let token = format!("Basic {}", token); + + if meta.token != token { + // TODO: Update the message to be more informative with username and password + log::info!( + "Credentials were Updated. Old: {} New: {}", + meta.token, + token + ); + meta.token = token; + } + + put_ingestor_info(meta.clone())?; + return Ok(meta); } } let store = CONFIG.storage().get_object_store(); - let url = get_url(); let out = IngestorMetadata::new( - url.port().expect("here port should be defined").to_string(), - url.to_string(), + port, + url, DEFAULT_VERSION.to_string(), store.get_bucket_name(), &CONFIG.parseable.username, @@ -333,6 +372,12 @@ pub fn get_ingestor_info() -> anyhow::Result { Ok(out) } +/// Puts the ingestor info into the staging. +/// +/// This function takes the ingestor info as a parameter and stores it in staging. +/// # Parameters +/// +/// * `ingestor_info`: The ingestor info to be stored. fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { let path = PathBuf::from(&CONFIG.parseable.local_staging_path); let file_name = format!("ingestor.{}.json", info.ingestor_id); From 23db756b5572a92bc518bbbe14256b3d2b570a83 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 19 Apr 2024 11:08:20 +0530 Subject: [PATCH 10/10] fix bug in object store sync in standalone mode --- server/src/storage/object_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index a149f48f3..77eb9f20d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -594,7 +594,7 @@ pub fn manifest_path(prefix: &str) -> RelativePathBuf { ); RelativePathBuf::from_iter([prefix, &manifest_file_name]) } else { - RelativePathBuf::from_iter([MANIFEST_FILE]) + RelativePathBuf::from_iter([prefix, MANIFEST_FILE]) } }