Skip to content

Commit

Permalink
new export dataset api
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Nov 21, 2024
1 parent a28b183 commit 4fa41ee
Showing 1 changed file with 72 additions and 34 deletions.
106 changes: 72 additions & 34 deletions pioreactor/actions/leader/export_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,19 @@
import click
from msgspec import DecodeError
from msgspec import ValidationError
from msgspec.json import decode as json_decode
from msgspec.yaml import decode as yaml_decode

from pioreactor.config import config
from pioreactor.logging import create_logger
from pioreactor.structs import Dataset


def is_valid_table_name(table_name: str) -> bool:
import re

return bool(re.fullmatch(r"^[a-zA-Z]\w*$", table_name))


def source_exists(cursor, table_name_to_check: str) -> bool:
query = "SELECT 1 FROM sqlite_master WHERE (type='table' or type='view') and name = ?"
return cursor.execute(query, (table_name_to_check,)).fetchone() is not None


def get_column_names(cursor, table_name: str) -> list[str]:
query = f"PRAGMA table_info({table_name})"
return [row[1] for row in cursor.execute(query).fetchall()]


def filter_to_timestamp_columns(column_names: list[str]) -> list[str]:
# We use a standard here: `timestamp` or ends in `_at`
return [c for c in column_names if (c == "timestamp") or c.endswith("_at")]


def generate_timestamp_to_localtimestamp_clause(timestamp_columns) -> str:
if not timestamp_columns:
return ""
Expand All @@ -50,8 +35,8 @@ def generate_timestamp_to_localtimestamp_clause(timestamp_columns) -> str:


def load_exportable_datasets() -> dict[str, Dataset]:
builtins = sorted(Path("/home/pioreactor/.pioreactor/exportable_datasets").glob("*.y*ml"))
plugins = sorted(Path("/home/pioreactor/.pioreactor/plugins/exportable_datasets").glob("*.y*ml"))
builtins = sorted(Path(".pioreactor/exportable_datasets").glob("*.y*ml"))
plugins = sorted(Path(".pioreactor/plugins/exportable_datasets").glob("*.y*ml"))
parsed_yaml = {}
for file in builtins + plugins:
try:
Expand All @@ -63,6 +48,10 @@ def load_exportable_datasets() -> dict[str, Dataset]:
return parsed_yaml


def decode_base64(string):
return json_decode(string)


def validate_dataset_information(dataset: Dataset, cursor):
if not (dataset.table or dataset.query):
raise ValueError("query or table must be defined.")
Expand All @@ -81,11 +70,30 @@ def create_experiment_clause(
else:
quoted_experiments = ", ".join(f":experiment{i}" for i in range(len(experiments)))
existing_placeholders = existing_placeholders | {
f":experiment{i}": experiment for i, experiment in enumerate(experiments)
f"experiment{i}": experiment for i, experiment in enumerate(experiments)
}
return f"experiment IN ({quoted_experiments})", existing_placeholders


def create_timespan_clause(
start_time: str | None, end_time: str | None, time_column: str, existing_placeholders: dict[str, str]
) -> tuple[str, dict[str, str]]:
if start_time is not None and end_time is not None:
existing_placeholders["start_time"] = start_time
existing_placeholders["end_time"] = end_time
return f"{time_column} >= :start_time AND {time_column} <= :end_time", existing_placeholders

elif start_time is not None:
existing_placeholders["start_time"] = start_time
return f"{time_column} >= :start_time", existing_placeholders

elif end_time is not None:
existing_placeholders["end_time"] = end_time
return f"{time_column} <= :end_time", existing_placeholders
else:
raise ValueError


def create_sql_query(
selects: list[str],
table_or_subquery: str,
Expand All @@ -97,7 +105,7 @@ def create_sql_query(
Constructs an SQL query with SELECT, FROM, WHERE, and ORDER BY clauses.
"""
# Base SELECT and FROM clause
query = f"SELECT {', '.join(selects)} FROM {table_or_subquery}"
query = f"SELECT {', '.join(selects)} FROM ({table_or_subquery})"

# Add WHERE clause if provided
if where_clauses:
Expand All @@ -115,7 +123,10 @@ def export_experiment_data(
experiments: list[str],
dataset_names: list[str],
output: str,
start_time: str | None,
end_time: str | None,
partition_by_unit: bool = False,
partition_by_experiment: bool = True,
) -> None:
"""
Set an experiment, else it defaults to the entire table.
Expand All @@ -138,21 +149,30 @@ def export_experiment_data(
available_datasets = load_exportable_datasets()

with zipfile.ZipFile(output, mode="w", compression=zipfile.ZIP_DEFLATED) as zf, closing(
sqlite3.connect(config["storage"]["database"])
sqlite3.connect(config.get("storage", "database"))
) as con:
con.create_function(
"BASE64", 1, decode_base64
) # TODO: until next OS release which implements a native sqlite3 base64 function

con.set_trace_callback(print)

cursor = con.cursor()

for dataset_name in dataset_names:
try:
dataset = available_datasets[dataset_name]
except IndexError:
click.echo(
f"Dataset `{dataset_name}` is not found as an available exportable dataset. A yaml file needs to be added to ~/.pioreactor/exportable_datasets."
except KeyError:
click.secho(
f"Dataset `{dataset_name}` is not found as an available exportable dataset. A yaml file needs to be added to ~/.pioreactor/exportable_datasets. Skipping. Available datasets are {list(available_datasets.keys())}",
fg="red",
)
continue

validate_dataset_information(dataset, cursor)

_partition_by_unit = dataset.has_unit and (partition_by_unit or dataset.always_partition_by_unit)
_partition_by_experiment = dataset.has_experiment and partition_by_experiment
filenames: list[str] = []
placeholders: dict[str, str] = {}
timestamp_to_localtimestamp_clause = generate_timestamp_to_localtimestamp_clause(
Expand All @@ -169,6 +189,13 @@ def export_experiment_data(
experiment_clause, placeholders = create_experiment_clause(experiments, placeholders)
where_clauses.append(experiment_clause)

if dataset.timestamp_columns and (start_time or end_time):
assert dataset.default_order_by is not None
timespan_clause, placeholders = create_timespan_clause(
start_time, end_time, dataset.default_order_by, placeholders
)
where_clauses.append(timespan_clause)

query, placeholders = create_sql_query(
selects, table_or_subquery, placeholders, where_clauses, order_by
)
Expand All @@ -177,15 +204,18 @@ def export_experiment_data(

headers = [_[0] for _ in cursor.description]

try:
iloc_experiment = headers.index("experiment")
except IndexError:
if _partition_by_experiment:
try:
iloc_experiment = headers.index("experiment")
except ValueError:
iloc_experiment = None
else:
iloc_experiment = None

if _partition_by_unit:
try:
iloc_unit = headers.index("pioreactor_unit")
except IndexError:
except ValueError:
iloc_unit = None
else:
iloc_unit = None
Expand All @@ -195,12 +225,13 @@ def export_experiment_data(
with ExitStack() as stack:
for row in cursor:
rows_partition = (
row[iloc_experiment] if iloc_experiment else "all_experiments",
row[iloc_unit] if iloc_unit else "all_units",
row[iloc_experiment] if iloc_experiment is not None else "all_experiments",
row[iloc_unit] if iloc_unit is not None else "all_units",
)

if rows_partition not in parition_to_writer_map:
filename = "-".join(rows_partition) + f"-{dataset}-{time}.csv"
filename = f"{dataset_name}-" + "-".join(rows_partition) + f"-{time}.csv"
filename = filename.replace(" ", "_")
filenames.append(filename)
path_to_file = Path(Path(output).parent / filename)
parition_to_writer_map[rows_partition] = csv.writer(
Expand All @@ -223,9 +254,16 @@ def export_experiment_data(
@click.option("--experiment", multiple=True, default=[])
@click.option("--output", default="./output.zip")
@click.option("--partition-by-unit", is_flag=True)
@click.option("--partition-by-experiment", is_flag=True)
@click.option("--dataset-name", multiple=True, default=[])
def click_export_experiment_data(experiment, output, partition_by_unit, dataset_name):
@click.option("--start-time", help="iso8601")
@click.option("--end-time", help="iso8601")
def click_export_experiment_data(
experiment, output, partition_by_unit, partition_by_experiment, dataset_name, start_time, end_time
):
"""
(leader only) Export tables from db.
(leader only) Export datasets from db.
"""
export_experiment_data(experiment, dataset_name, output, partition_by_unit)
export_experiment_data(
experiment, dataset_name, output, start_time, end_time, partition_by_unit, partition_by_experiment
)

0 comments on commit 4fa41ee

Please sign in to comment.