Skip to content

Commit

Permalink
0.19 release
Browse files Browse the repository at this point in the history
  • Loading branch information
kindly committed May 10, 2023
1 parent 13b3cc3 commit ebc9bb3
Show file tree
Hide file tree
Showing 8 changed files with 1,504 additions and 818 deletions.
2,201 changes: 1,437 additions & 764 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "flatterer"
version = "0.18.2"
version = "0.19.1"
authors = ["David Raznick <[email protected]>"]
edition = "2021"
license = "MIT"
Expand All @@ -11,18 +11,18 @@ license = "MIT"
clap = "3.2.16"
crossbeam-channel= "0.5.6"
serde_json = { version = "1.0.83", features = ["preserve_order"] }
pyo3 = { version = "0.16.5", features = ["extension-module", "eyre"] }
pyo3 = { version = "0.18.3", features = ["extension-module", "eyre"] }
eyre= "0.6.8"
#libflatterer={path = "../libflatterer"}
libflatterer="0.18.2"
libflatterer="0.19.1"

flatterer-web="0.18.1"
flatterer-web="0.19.1"
#flatterer-web={path = "../flatterer-web"}

env_logger= "0.9.0"
log= "0.4.17"
ctrlc= "3.2.2"
csvs_convert = "0.7.0"
csvs_convert = "0.8.2"
num_cpus = "1.13.1"
multi_reader = "0.1.0"

Expand Down
6 changes: 3 additions & 3 deletions fixtures/basic_expected/fields.csv
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
table_name,field_name,field_type,field_title,count
main,_link,integer,_link,2
main,_link,text,_link,2
main,id,number,id,2
main,title,text,title,2
main,releaseDate,date,releaseDate,2
main,rating_code,text,rating_code,2
main,rating_name,text,rating_name,2
developer,_link,text,_link,2
developer,_link_main,integer,_link_main,2
developer,_link_main,text,_link_main,2
developer,name,text,name,2
platforms,_link,text,_link,3
platforms,_link_main,integer,_link_main,3
platforms,_link_main,text,_link_main,3
platforms,name,text,name,3
Binary file added fixtures/daily_16.json.gz
Binary file not shown.
6 changes: 3 additions & 3 deletions fixtures/pushdown_expected/fields.csv
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
table_name,field_name,field_type,field_title,count
main,_link,integer,_link,2
main,_link,text,_link,2
main,id,number,id,2
main,title,text,title,2
main,releaseDate,date,releaseDate,2
main,rating_code,text,rating_code,2
main,rating_name,text,rating_name,2
developer,_link,text,_link,2
developer,_link_main,integer,_link_main,2
developer,_link_main,text,_link_main,2
developer,name,text,name,2
developer,main_id,number,main_id,2
developer,main_title,text,main_title,2
platforms,_link,text,_link,3
platforms,_link_main,integer,_link_main,3
platforms,_link_main,text,_link_main,3
platforms,name,text,name,3
platforms,main_id,number,main_id,3
platforms,main_title,text,main_title,3
44 changes: 33 additions & 11 deletions flatterer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def flatten(
pushdown=[],
sql_scripts=False,
evolve=False,
no_link=False
no_link=False,
stats=False,
low_disk=False,
gzip_input=False,
json_path="",
):
global LOGGING_SETUP
if not LOGGING_SETUP:
Expand Down Expand Up @@ -128,6 +132,8 @@ def flatten(

if sqlite_path:
sqlite = True

s3 = True if output_dir.startswith("s3://") else False

if method == 'flatten':
flatten_rs(input, output_dir, csv, xlsx, sqlite, parquet,
Expand All @@ -136,19 +142,27 @@ def flatten(
table_prefix, id_prefix, emit_obj, force,
schema, schema_titles, path, json_stream, ndjson,
sqlite_path, threads, log_error, postgres, postgres_schema,
drop, pushdown, sql_scripts, evolve, no_link)
drop, pushdown, sql_scripts, evolve, no_link, stats, low_disk, gzip_input, json_path)
elif method == 'iter':
if path:
raise AttributeError("path not allowed when supplying an iterator")
if s3:
raise AttributeError("s3 output not available when supplying an iterator")

iterator_flatten_rs(bytes_generator(input), output_dir, csv, xlsx, sqlite, parquet,
main_table_name, tables_csv, only_tables, fields_csv, only_fields,
inline_one_to_one, path_separator, preview,
table_prefix, id_prefix, emit_obj, force,
schema, schema_titles, sqlite_path, threads, log_error,
postgres, postgres_schema, drop, pushdown, sql_scripts, evolve, no_link)
postgres, postgres_schema, drop, pushdown, sql_scripts, evolve,
no_link, stats, low_disk, gzip_input, json_path)
else:
raise AttributeError("input needs to be a string or a generator of strings, dicts or bytes")


if s3:
return PrettyDict()

output = PrettyDict(
fields=pandas.read_csv(os.path.join(output_dir, 'fields.csv')),
tables=pandas.read_csv(os.path.join(output_dir, 'tables.csv')),
Expand Down Expand Up @@ -218,14 +232,17 @@ def iterator_flatten(*args, **kw):
help='Only output this `preview` amount of lines in final results')
@click.option('--threads', default=1,
help='Number of threads, default 1, 0 means use number of CPUs')
@click.option('--json-path', default="",
help='JSON path within each object to use to filter which objects to select')
@click.option('--postgres-schema', default="", help='When loading to postgres, put all tables into this schema.')
@click.option('--evolve', is_flag=True, default=False, help='When loading to postgres or sqlite, evolve tables to fit data')
@click.option('--drop', is_flag=True, default=False, help='When loading to postgres or sqlite, drop table if already exists.')
@click.option('--id-prefix', default="", help='Prefix for all `_link` id fields')
@click.argument('input_file', required=False)
@click.argument('output_directory', required=False)
@click.option('--stats', is_flag=True, default=False, help='Produce stats about the data in the datapackage.json file')
@click.argument('inputs', required=False, nargs=-1)
@click.argument('output_directory', required=True)
def cli(
input_file,
inputs,
output_directory,
web=False,
csv=True,
Expand Down Expand Up @@ -255,7 +272,9 @@ def cli(
drop=False,
pushdown=[],
id_prefix="",
no_link=False
no_link=False,
stats=False,
json_path="",
):
if web:
import pathlib
Expand All @@ -269,7 +288,7 @@ def cli(
web_rs()
return

if not input_file:
if not inputs:
click.echo("An input file is needed as first argument.")
raise click.Abort

Expand All @@ -281,14 +300,14 @@ def cli(


if not main_table_name:
main_table_name = input_file.split('/')[-1].split('.')[0]
main_table_name = 'main'

path_list = []
if path:
path_list.append(path)

try:
flatten(input_file,
flatten(inputs,
output_directory,
csv=csv,
xlsx=xlsx,
Expand Down Expand Up @@ -318,6 +337,9 @@ def cli(
pushdown=pushdown,
id_prefix=id_prefix,
sqlite_path=sqlite_path,
no_link=no_link)
no_link=no_link,
files=True,
stats=stats,
json_path=json_path)
except IOError:
pass
48 changes: 21 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crossbeam_channel::{bounded, Sender, Receiver};
use csvs_convert::{merge_datapackage_with_options, datapackage_to_parquet_with_options, datapackage_to_sqlite_with_options};
use eyre::{Result, WrapErr, eyre};
use libflatterer::{flatten, Options, TERMINATE, FlatFiles};
use eyre::{Result, eyre};
use libflatterer::{flatten_all, Options, TERMINATE, FlatFiles};
use serde_json::Value;
use std::thread;
use std::path::PathBuf;

use env_logger::Env;
use pyo3::prelude::*;
use pyo3::types::PyIterator;
use std::fs::{File, remove_dir_all};
use std::io::BufReader;
use std::fs::remove_dir_all;
use std::sync::atomic::Ordering;

#[pymodule]
Expand Down Expand Up @@ -73,6 +72,10 @@ fn flatterer(_py: Python, m: &PyModule) -> PyResult<()> {
sql_scripts: bool,
evolve: bool,
no_link: bool,
stats: bool,
low_disk:bool,
gzip_input:bool,
json_path_selector: String
) -> Result<()> {

let mut op = Options::default();
Expand Down Expand Up @@ -107,29 +110,12 @@ fn flatterer(_py: Python, m: &PyModule) -> PyResult<()> {
op.sql_scripts = sql_scripts;
op.evolve = evolve;
op.no_link = no_link;
op.stats = stats;
op.low_disk = low_disk;
op.gzip_input = gzip_input;
op.json_path_selector = json_path_selector;

let mut readers = vec![];

for path in input_files {
match File::open(&path) {
Ok(input) => {
readers.push(input);
//file = BufReader::new(input);
}
Err(err) => {
if log_error {
log::error!("Can not open file `{}`: {}", path, &err)
};
let result: Result<()> = Err(err.into());
return result.wrap_err_with(|| format!("Can not open file `{}`", path));
}
};
}
let input = BufReader::new(multi_reader::MultiReader::new(readers.iter()));

log::info!("Starting processing input.");

if let Err(err) = flatten(input, output_dir, op) {
if let Err(err) = flatten_all(input_files, output_dir, op) {
if log_error {
log::error!("{}", err)
};
Expand Down Expand Up @@ -172,7 +158,11 @@ fn flatterer(_py: Python, m: &PyModule) -> PyResult<()> {
pushdown: Vec<String>,
sql_scripts: bool,
evolve: bool,
no_link: bool
no_link: bool,
stats: bool,
low_disk:bool,
gzip_input:bool,
json_path_selector: String
) -> Result<()> {
let mut options = Options::default();

Expand Down Expand Up @@ -203,6 +193,10 @@ fn flatterer(_py: Python, m: &PyModule) -> PyResult<()> {
options.sql_scripts = sql_scripts;
options.evolve = evolve;
options.no_link = no_link;
options.stats = stats;
options.low_disk = low_disk;
options.gzip_input = gzip_input;
options.json_path_selector = json_path_selector;

let final_output_path = PathBuf::from(output_dir);
let parts_path = final_output_path.join("parts");
Expand Down
7 changes: 2 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use clap::{arg, Command};
use env_logger::Env;
use eyre::Result;
use libflatterer::{flatten, Options, TERMINATE};
use std::fs::File;
use std::io::BufReader;
use libflatterer::{flatten_all, Options, TERMINATE};
use std::path::PathBuf;
use std::sync::atomic::Ordering;

Expand Down Expand Up @@ -54,7 +52,6 @@ fn main() -> Result<()> {
eprintln!("Can not find file {}", input);
return Ok(());
}
let input = BufReader::new(File::open(input)?);

let output_dir = matches.value_of("OUT_DIR").unwrap(); //ok as parser will detect

Expand Down Expand Up @@ -118,7 +115,7 @@ fn main() -> Result<()> {
options.tables_csv = tables.into();
};

flatten(input, output_dir.into(), options)?;
flatten_all(vec![input.into()], output_dir.into(), options)?;

log::info!("All finished with no errors!");

Expand Down

0 comments on commit ebc9bb3

Please sign in to comment.