Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up parts of the codebase #981

Merged
merged 26 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
06f093c
refactor: use serde
de-sh Nov 20, 2024
cde8edf
perf: `to_string` is an indirection
de-sh Dec 7, 2024
0204248
refactor: `Error` replaces `String`
de-sh Nov 20, 2024
6916673
refactor: serde renames
de-sh Dec 7, 2024
a0c4cb1
refactor: makes no sense to be `Option`al
de-sh Dec 7, 2024
4da90f8
test: fix sec = 1000 millis
de-sh Nov 25, 2024
2915486
refactor: serde `Compression`
de-sh Dec 7, 2024
19c3e01
ci: suppress deepsource
de-sh Dec 7, 2024
54dc85f
fix: case expectation
de-sh Dec 7, 2024
c9da598
rename error type
de-sh Dec 7, 2024
6372269
fix: serde compilation intricacies
de-sh Dec 7, 2024
2ca1557
refactor: construct once
de-sh Dec 8, 2024
6b90426
refactor: as methods
de-sh Dec 8, 2024
8a7cbf8
Merge branch 'main' into refactor
de-sh Dec 8, 2024
bcc8669
refactor: `partitioned_files` as a method
de-sh Dec 8, 2024
0b2e19e
refactor: in-place extraction of hot-tier manifest files
de-sh Dec 8, 2024
e95523f
refactor: `execution_plans`
de-sh Dec 8, 2024
91cf19d
Merge branch 'main' into refactor
de-sh Dec 12, 2024
ab4e869
Merge remote-tracking branch 'origin/main' into refactor
de-sh Dec 17, 2024
63f0343
feat: don't load sys, refactor `DiskUtil`
de-sh Dec 5, 2024
8de3326
Merge branch 'main' into refactor
de-sh Dec 17, 2024
37ccdb2
Merge branch 'main' into refactor
de-sh Dec 17, 2024
fcd6988
Merge branch 'main' into refactor
de-sh Dec 17, 2024
8dee91a
feat: don't clone on events
de-sh Dec 18, 2024
3564be4
clone less during ingestion
de-sh Dec 18, 2024
f6e0d47
Merge branch 'main' into refactor
de-sh Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
22 changes: 7 additions & 15 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl Cli {
.arg(
// RowGroupSize controls the number of rows present in one row group
// More rows = better compression but HIGHER Memory consumption during read/write
// 1048576 is the default value for DataFusion
// 1048576 is the default value for DataFusion
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
Expand Down Expand Up @@ -591,20 +591,12 @@ impl FromArgMatches for Cli {
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
.expect("default for row_group size");
self.parquet_compression = match m
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str()
{
"uncompressed" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP,
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI,
"lz4" => Compression::LZ4,
"zstd" => Compression::ZSTD,
_ => unreachable!(),
};
self.parquet_compression = serde_json::from_str(
m.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str(),
)
.expect("unexpected compression algo");

let openid_client_id = m.get_one::<String>(Self::OPENID_CLIENT_ID).cloned();
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename = "camelCase")]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ pub async fn put_stream_hot_tier(
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
Expand Down Expand Up @@ -695,8 +695,8 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::{
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename = "camelCase")]
struct Message {
#[serde(rename = "commonAttributes")]
common_attributes: CommonAttributes,
}

Expand Down
9 changes: 2 additions & 7 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
Expand Down Expand Up @@ -323,9 +322,7 @@ impl IngestServer {
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

let resource = serde_json::to_string(&store_data)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
return store
Expand All @@ -334,9 +331,7 @@ impl IngestServer {
.map_err(|err| anyhow!(err));
}
} else {
let resource = serde_json::to_string(&resource)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&resource)?);

store.put_object(&path, resource).await?;
}
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl IngestorMetadata {
#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
use bytes::Bytes;
use rstest::rstest;

use super::{IngestorMetadata, DEFAULT_VERSION};
Expand Down Expand Up @@ -255,10 +256,7 @@ mod test {
"8002".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let lhs = Bytes::from(serde_json::to_vec(&im).unwrap());
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
.try_into_bytes()
.unwrap();
Expand Down
94 changes: 22 additions & 72 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB
const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
pub const CURRENT_HOT_TIER_VERSION: &str = "v2";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct StreamHotTier {
pub version: Option<String>,
#[serde(rename = "size")]
pub size: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_size: Option<String>,
pub used_size: String,
de-sh marked this conversation as resolved.
Show resolved Hide resolved
pub available_size: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub oldest_date_time_entry: Option<String>,
}
Expand Down Expand Up @@ -98,12 +96,7 @@ impl HotTierManager {
if self.check_stream_hot_tier_exists(&stream) && stream != current_stream {
let stream_hot_tier = self.get_hot_tier(&stream).await?;
total_hot_tier_size += &stream_hot_tier.size.parse::<u64>().unwrap();
total_hot_tier_used_size += &stream_hot_tier
.used_size
.clone()
.unwrap()
.parse::<u64>()
.unwrap();
total_hot_tier_used_size += stream_hot_tier.used_size.parse::<u64>().unwrap();
}
}
Ok((total_hot_tier_size, total_hot_tier_used_size))
Expand All @@ -123,8 +116,7 @@ impl HotTierManager {
if self.check_stream_hot_tier_exists(stream) {
//delete existing hot tier if its size is less than the updated hot tier size else return error
let existing_hot_tier = self.get_hot_tier(stream).await?;
existing_hot_tier_used_size =
existing_hot_tier.used_size.unwrap().parse::<u64>().unwrap();
existing_hot_tier_used_size = existing_hot_tier.used_size.parse::<u64>().unwrap();

if stream_hot_tier_size < existing_hot_tier_used_size {
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
Expand Down Expand Up @@ -260,12 +252,7 @@ impl HotTierManager {
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
async fn process_stream(&self, stream: String) -> Result<(), HotTierError> {
let stream_hot_tier = self.get_hot_tier(&stream).await?;
let mut parquet_file_size = stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap();
let mut parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();

let object_store = CONFIG.storage().get_object_store();
let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?;
Expand Down Expand Up @@ -357,13 +344,7 @@ impl HotTierManager {
let mut file_processed = false;
let mut stream_hot_tier = self.get_hot_tier(stream).await?;
if !self.is_disk_available(parquet_file.file_size).await?
|| stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
<= parquet_file.file_size
|| stream_hot_tier.available_size.parse::<u64>().unwrap() <= parquet_file.file_size
{
if !self
.cleanup_hot_tier_old_data(
Expand All @@ -376,12 +357,7 @@ impl HotTierManager {
{
return Ok(file_processed);
}
*parquet_file_size = stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap();
*parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();
}
let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone());
fs::create_dir_all(parquet_path.parent().unwrap()).await?;
Expand All @@ -393,18 +369,11 @@ impl HotTierManager {
.await?;
file.write_all(&parquet_data).await?;
*parquet_file_size += parquet_file.file_size;
stream_hot_tier.used_size = Some(parquet_file_size.to_string());

stream_hot_tier.available_size = Some(
(stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
- parquet_file.file_size)
.to_string(),
);
stream_hot_tier.used_size = parquet_file_size.to_string();

stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::<u64>().unwrap()
- parquet_file.file_size)
.to_string();
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
file_processed = true;
let mut hot_tier_manifest = self
Expand Down Expand Up @@ -614,35 +583,16 @@ impl HotTierManager {
fs::remove_dir_all(path_to_delete.parent().unwrap()).await?;
delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?;

stream_hot_tier.used_size = Some(
(stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
- file_size)
.to_string(),
);
stream_hot_tier.available_size = Some(
(stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
+ file_size)
.to_string(),
);
stream_hot_tier.used_size =
(stream_hot_tier.used_size.parse::<u64>().unwrap() - file_size)
.to_string();
stream_hot_tier.available_size =
(stream_hot_tier.available_size.parse::<u64>().unwrap() + file_size)
.to_string();
self.put_hot_tier(stream, stream_hot_tier).await?;
delete_successful = true;

if stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
if stream_hot_tier.available_size.parse::<u64>().unwrap()
<= parquet_file_size
{
continue 'loop_files;
Expand Down Expand Up @@ -740,8 +690,8 @@ impl HotTierManager {
let mut stream_hot_tier = StreamHotTier {
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
used_size: Some("0".to_string()),
available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()),
used_size: "0".to_string(),
available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
oldest_date_time_entry: None,
};
self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier)
Expand Down
Loading
Loading