From e8d8f4ce02dbc3976566cfd599abf2ecb7bb9b46 Mon Sep 17 00:00:00 2001 From: David Raznick Date: Sat, 2 Dec 2023 21:06:42 +0000 Subject: [PATCH] update deps and add low memory feature --- Cargo.toml | 18 +++++++++--------- src/lib.rs | 27 +++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b60b9e2..82978c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libflatterer" -version = "0.19.9" +version = "0.19.10" authors = ["David Raznick "] edition = "2021" description = "Lib to make JSON flatterer" @@ -14,7 +14,7 @@ homepage = "https://github.com/kindly/libflatterer" yajlish = { version = "0.4.0" } #yajlish = { path = "../yajlish" } serde_json = { version = "1", features = ["preserve_order"] } -itertools = "0.10" +itertools = "0.12.0" csv = "1" serde = {version = "1", features = ["derive"] } smallvec="1.6.1" @@ -22,10 +22,10 @@ regex="1" slug="0.1" smartstring={version = "1.0.0", features = ["serde"] } snafu = {version = "0.7.0"} -indexmap = {version = "1", features = ["serde"] } +indexmap = {version = "2.1.0", features = ["serde"] } log = "0.4" lazy_static = "1" -typed-builder = "0.10.0" +typed-builder = "0.18.0" num_cpus = "1.13.1" flate2 = "1.0.24" csvs_convert = "0.8.8" @@ -39,7 +39,7 @@ jsonpath-rust = "0.3.0" [target.'cfg(not(target_family = "wasm"))'.dependencies] jsonref= "0.4" -object_store = { version = "0.6.1", features = ["aws", "http"]} +object_store = { version = "0.8.0", features = ["aws", "http"]} #object_store = { git = "https://github.com/apache/arrow-rs.git", features = ["aws", "http"]} crossbeam-channel="0.5" @@ -49,12 +49,12 @@ nanoid = "0.4.0" tokio = { version = "1.27.0", features = ["rt"] } csv-async = { version = "1.2.6", features = ["tokio"] } futures-util = { version = "0.3.27" } -parquet = {version = "38.0.0", features = ["async"] } +parquet = {version = "49.0.0", features = ["async"] } tokio-util = { version = "0.7.7", features = ["io", "io-util"] } futures = {version = "0.3.28"} -arrow-array = "38.0.0" -arrow-schema = "38.0.0" -async-compression = { version = "0.3.15", features = ["gzip", "tokio"] } +arrow-array = "49.0.0" +arrow-schema = "49.0.0" +async-compression = { version = "0.4.5", features = ["gzip", "tokio"] } env_logger = "0.10.0" [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 327ce53..4f90e30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -443,6 +443,8 @@ pub struct Options { #[builder(default)] pub low_disk: bool, #[builder(default)] + pub low_memory: bool, + #[builder(default)] pub gzip_input: bool, #[builder(default)] pub json_path_selector: String, @@ -3232,7 +3234,12 @@ pub fn flatten_single( input: String, mut flat_files: FlatFiles, ) -> Result { - let (item_sender, item_receiver): (Sender, Receiver) = bounded(1000); + let channel_size = if flat_files.options.low_memory { + flat_files.options.threads + 10 + } else { + 1000 + }; + let (item_sender, item_receiver): (Sender, Receiver) = bounded(channel_size); let (stop_sender, stop_receiver) = bounded(1); @@ -3426,8 +3433,10 @@ pub fn flatten(input: Box, output: String, mut options: Options) -> let mut output_paths = vec![]; let mut join_handlers = vec![]; + let channel_size = if options.low_memory {options.threads + 10 } else { 1000 }; + let (item_sender, item_receiver_initial): (Sender, Receiver) = - bounded(1000); + bounded(channel_size); let mut stop_senders = vec![]; @@ -3932,6 +3941,11 @@ mod tests { name.push_str("-low_disk") } + if let Some(low_memory) = options["low_memory"].as_bool() { + flatten_options.low_memory = low_memory; + name.push_str("-low_memory") + } + if let Some(arrays_new_table) = options["arrays_new_table"].as_bool() { flatten_options.arrays_new_table = arrays_new_table; name.push_str("-arrow_new_table") @@ -4318,6 +4332,15 @@ mod tests { ) } + #[test] + fn test_low_memory() { + test_output( + "fixtures/basic.json", + vec![], + json!({"low_memory": true}), + ) + } + #[test] fn test_json_path() { test_output(