Skip to content

Commit

Permalink
update deps and add low memory feature
Browse files Browse the repository at this point in the history
  • Loading branch information
kindly committed Dec 2, 2023
1 parent 270057a commit e8d8f4c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "libflatterer"
version = "0.19.9"
version = "0.19.10"
authors = ["David Raznick <[email protected]>"]
edition = "2021"
description = "Lib to make JSON flatterer"
Expand All @@ -14,18 +14,18 @@ homepage = "https://github.com/kindly/libflatterer"
yajlish = { version = "0.4.0" }
#yajlish = { path = "../yajlish" }
serde_json = { version = "1", features = ["preserve_order"] }
itertools = "0.10"
itertools = "0.12.0"
csv = "1"
serde = {version = "1", features = ["derive"] }
smallvec="1.6.1"
regex="1"
slug="0.1"
smartstring={version = "1.0.0", features = ["serde"] }
snafu = {version = "0.7.0"}
indexmap = {version = "1", features = ["serde"] }
indexmap = {version = "2.1.0", features = ["serde"] }
log = "0.4"
lazy_static = "1"
typed-builder = "0.10.0"
typed-builder = "0.18.0"
num_cpus = "1.13.1"
flate2 = "1.0.24"
csvs_convert = "0.8.8"
Expand All @@ -39,7 +39,7 @@ jsonpath-rust = "0.3.0"

[target.'cfg(not(target_family = "wasm"))'.dependencies]
jsonref= "0.4"
object_store = { version = "0.6.1", features = ["aws", "http"]}
object_store = { version = "0.8.0", features = ["aws", "http"]}
#object_store = { git = "https://github.com/apache/arrow-rs.git", features = ["aws", "http"]}

crossbeam-channel="0.5"
Expand All @@ -49,12 +49,12 @@ nanoid = "0.4.0"
tokio = { version = "1.27.0", features = ["rt"] }
csv-async = { version = "1.2.6", features = ["tokio"] }
futures-util = { version = "0.3.27" }
parquet = {version = "38.0.0", features = ["async"] }
parquet = {version = "49.0.0", features = ["async"] }
tokio-util = { version = "0.7.7", features = ["io", "io-util"] }
futures = {version = "0.3.28"}
arrow-array = "38.0.0"
arrow-schema = "38.0.0"
async-compression = { version = "0.3.15", features = ["gzip", "tokio"] }
arrow-array = "49.0.0"
arrow-schema = "49.0.0"
async-compression = { version = "0.4.5", features = ["gzip", "tokio"] }
env_logger = "0.10.0"

[dev-dependencies]
Expand Down
27 changes: 25 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ pub struct Options {
#[builder(default)]
pub low_disk: bool,
#[builder(default)]
pub low_memory: bool,
#[builder(default)]
pub gzip_input: bool,
#[builder(default)]
pub json_path_selector: String,
Expand Down Expand Up @@ -3232,7 +3234,12 @@ pub fn flatten_single(
input: String,
mut flat_files: FlatFiles,
) -> Result<FlatFiles> {
let (item_sender, item_receiver): (Sender<Item>, Receiver<yajlparser::Item>) = bounded(1000);
let channel_size = if flat_files.options.low_memory {
flat_files.options.threads + 10
} else {
1000
};
let (item_sender, item_receiver): (Sender<Item>, Receiver<yajlparser::Item>) = bounded(channel_size);

let (stop_sender, stop_receiver) = bounded(1);

Expand Down Expand Up @@ -3426,8 +3433,10 @@ pub fn flatten(input: Box<dyn BufRead>, output: String, mut options: Options) ->
let mut output_paths = vec![];
let mut join_handlers = vec![];

let channel_size = if options.low_memory {options.threads + 10 } else { 1000 };

let (item_sender, item_receiver_initial): (Sender<Item>, Receiver<yajlparser::Item>) =
bounded(1000);
bounded(channel_size);

let mut stop_senders = vec![];

Expand Down Expand Up @@ -3932,6 +3941,11 @@ mod tests {
name.push_str("-low_disk")
}

if let Some(low_memory) = options["low_memory"].as_bool() {
flatten_options.low_memory = low_memory;
name.push_str("-low_memory")
}

if let Some(arrays_new_table) = options["arrays_new_table"].as_bool() {
flatten_options.arrays_new_table = arrays_new_table;
name.push_str("-arrow_new_table")
Expand Down Expand Up @@ -4318,6 +4332,15 @@ mod tests {
)
}

#[test]
fn test_low_memory() {
test_output(
"fixtures/basic.json",
vec![],
json!({"low_memory": true}),
)
}

#[test]
fn test_json_path() {
test_output(
Expand Down

0 comments on commit e8d8f4c

Please sign in to comment.