Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update files in distributed mode to use hash #761

Merged
merged 10 commits into from
Apr 19, 2024
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use ulid that we use for deployment ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hashing the timestamp, so I needed sha2


[build-dependencies]
cargo_toml = "0.15"
Expand Down
33 changes: 22 additions & 11 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct Report {
}

impl Report {
pub async fn new() -> Self {
pub async fn new() -> anyhow::Result<Self> {
let mut upt: f64 = 0.0;
if let Ok(uptime) = uptime_lib::get() {
upt = uptime.as_secs_f64();
Expand All @@ -91,9 +91,9 @@ impl Report {
cpu_count = info.cpus().len();
mem_total = info.total_memory();
}
let ingestor_metrics = fetch_ingestors_metrics().await;
let ingestor_metrics = fetch_ingestors_metrics().await?;

Self {
Ok(Self {
deployment_id: storage::StorageMetadata::global().deployment_id,
uptime: upt,
report_created_at: Utc::now(),
Expand All @@ -113,7 +113,7 @@ impl Report {
total_json_bytes: ingestor_metrics.4,
total_parquet_bytes: ingestor_metrics.5,
metrics: build_metrics().await,
}
})
}

pub async fn send(&self) {
Expand Down Expand Up @@ -148,7 +148,7 @@ fn total_event_stats() -> (u64, u64, u64) {
(total_events, total_json_bytes, total_parquet_bytes)
}

async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
let event_stats = total_event_stats();
let mut node_metrics =
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
Expand Down Expand Up @@ -181,24 +181,24 @@ async fn fetch_ingestors_metrics() -> (u64, u64, usize, u64, u64, u64) {
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
.unwrap(); // should respond
.expect("should respond");

let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
vec.push(data);
active_ingestors += 1;
}

node_metrics.accumulate(&mut vec);
}

(
Ok((
active_ingestors,
offline_ingestors,
node_metrics.stream_count,
node_metrics.total_events_count,
node_metrics.total_json_bytes,
node_metrics.total_parquet_bytes,
)
))
}

async fn build_metrics() -> HashMap<String, Value> {
Expand All @@ -220,14 +220,23 @@ async fn build_metrics() -> HashMap<String, Value> {
metrics
}

pub fn init_analytics_scheduler() {
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
log::info!("Setting up schedular for anonymous user analytics");

let mut scheduler = AsyncScheduler::new();
scheduler
.every(ANALYTICS_SEND_INTERVAL_SECONDS)
.run(move || async {
Report::new().await.send().await;
Report::new()
.await
.unwrap_or_else(|err| {
// panicing because seperate thread
// TODO: a better way to handle this
log::error!("Error while sending analytics: {}", err.to_string());
panic!("{}", err.to_string());
})
.send()
.await;
});

tokio::spawn(async move {
Expand All @@ -236,6 +245,8 @@ pub fn init_analytics_scheduler() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
});

Ok(())
}

#[derive(Serialize, Deserialize, Default, Debug)]
Expand Down
28 changes: 14 additions & 14 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*
*/

use std::sync::Arc;

use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::{io::ErrorKind, sync::Arc};

use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
utils::get_address,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;

use self::{column::Column, snapshot::ManifestItem};

Expand Down Expand Up @@ -117,8 +116,7 @@ pub async fn update_snapshot(

let mut ch = false;
for m in manifests.iter() {
let s = get_address();
let p = format!("{}.{}.{}", s.ip(), s.port(), MANIFEST_FILE);
let p = manifest_path("").to_string();
if m.manifest_path.contains(&p) {
ch = true;
}
Expand All @@ -142,7 +140,11 @@ pub async fn update_snapshot(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
)
.and_utc();

Expand All @@ -151,12 +153,11 @@ pub async fn update_snapshot(
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = manifest_path("").to_string();
let path =
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.put_object(&path, serde_json::to_vec(&manifest)?.into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
Expand Down Expand Up @@ -185,8 +186,7 @@ pub async fn update_snapshot(
..Manifest::default()
};

let addr = get_address();
let mainfest_file_name = format!("{}.{}.{}", addr.ip(), addr.port(), MANIFEST_FILE);
let mainfest_file_name = manifest_path("").to_string();
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
Expand Down
14 changes: 7 additions & 7 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Cli {
pub mode: Mode,

/// public address for the parseable server ingestor
pub ingestor_url: String,
pub ingestor_endpoint: String,
}

impl Cli {
Expand All @@ -115,7 +115,7 @@ impl Cli {
pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
pub const MODE: &'static str = "mode";
pub const INGESTOR_URL: &'static str = "ingestor-url";
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
pub const DEFAULT_USERNAME: &'static str = "admin";
pub const DEFAULT_PASSWORD: &'static str = "admin";

Expand Down Expand Up @@ -317,9 +317,9 @@ impl Cli {
.help("Mode of operation"),
)
.arg(
Arg::new(Self::INGESTOR_URL)
.long(Self::INGESTOR_URL)
.env("P_INGESTOR_URL")
Arg::new(Self::INGESTOR_ENDPOINT)
.long(Self::INGESTOR_ENDPOINT)
.env("P_INGESTOR_ENDPOINT")
.value_name("URL")
.required(false)
.help("URL to connect to this specific ingestor. Default is the address of the server.")
Expand Down Expand Up @@ -367,8 +367,8 @@ impl FromArgMatches for Cli {
.cloned()
.expect("default value for address");

self.ingestor_url = m
.get_one::<String>(Self::INGESTOR_URL)
self.ingestor_endpoint = m
.get_one::<String>(Self::INGESTOR_ENDPOINT)
.cloned()
.unwrap_or_else(String::default);

Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
.await?
.iter()
// we should be able to unwrap as we know the data is valid schema
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
.collect_vec();

let new_schema = Schema::try_merge(res)?;
Expand All @@ -97,7 +97,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingestor_info().await.unwrap();
let ima = get_ingestor_info().await?;

for im in ima.iter() {
let uri = format!(
Expand Down
41 changes: 31 additions & 10 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use chrono::Utc;
use http::StatusCode;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
use serde_json::error::Error as SerdeError;
use serde_json::Value as JsonValue;
use url::Url;

Expand Down Expand Up @@ -262,9 +264,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
StreamError::SerdeError(err)
})?
.get("staging")
.unwrap()
.ok_or(StreamError::SerdeError(SerdeError::missing_field(
"staging",
)))?
.as_str()
.unwrap()
.ok_or(StreamError::SerdeError(SerdeError::custom(
"staging path not a string/ not provided",
)))?
.to_string();

(true, sp, None, status)
Expand Down Expand Up @@ -304,7 +310,9 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.unwrap();
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
Expand Down Expand Up @@ -362,14 +370,27 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result<impl Responder, PostErr
if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
}

let url = Url::parse(&domain_name).unwrap();
let ingestor_meta_filename = ingestor_metadata_path(
url.host_str().unwrap().to_owned(),
url.port().unwrap().to_string(),
)
.to_string();
let object_store = CONFIG.storage().get_object_store();

let ingestor_metadatas = object_store
.get_objects(
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
Box::new(|file_name| file_name.starts_with("ingestor")),
)
.await?;

let ingestor_metadata = ingestor_metadatas
.iter()
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
.collect_vec();

let ingestor_metadata = ingestor_metadata
.iter()
.filter(|elem| elem.domain_name == domain_name)
.collect_vec();

let ingestor_meta_filename =
ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string();
let msg = match object_store
.try_delete_ingestor_meta(ingestor_meta_filename)
.await
Expand Down
6 changes: 3 additions & 3 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
}

if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(&body).unwrap();
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
let parsed_schema = convert_static_schema_to_arrow_schema(static_schema);
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
Expand Down Expand Up @@ -357,7 +357,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
None
};

let hash_map = STREAM_INFO.read().unwrap();
let hash_map = STREAM_INFO.read().expect("Readable");
let stream_meta = &hash_map
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
Expand Down Expand Up @@ -396,7 +396,7 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
stats
};

let stats = serde_json::to_value(stats).unwrap();
let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
}
Expand Down
Loading
Loading