Skip to content

Commit

Permalink
Merge pull request #7 from mrpowers-io/delta-log-generation
Browse files Browse the repository at this point in the history
Implement DELTA in python
  • Loading branch information
SemyonSinchenko authored Aug 28, 2024
2 parents caf4c54 + ac81ecb commit 8c665ec
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dynamic = ["version"]
dependencies = ["pyarrow", "typer", "deltalake"]
dependencies = ["pyarrow", "typer"]

[project.optional-dependencies]
dev = ["ruff", "ipython"]
Expand Down
64 changes: 45 additions & 19 deletions python/falsa/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import random
import shutil
from enum import Enum
from pathlib import Path
from uuid import uuid4

import pyarrow as pa
import typer
Expand All @@ -19,6 +22,8 @@
JoinSmallGenerator,
)

from .utils import generate_delta_log

help_str = """
[bold][green]H2O db-like-benchmark data generation.[/green][/bold]\n
[italic][red]This implementation is unofficial![/red][/italic]
Expand Down Expand Up @@ -61,12 +66,8 @@ class Format(str, Enum):

def pprint(self):
print(f"An output format is [green]{self.value}[/green]")
if self is Format.DELTA:
print("\n[red]Warning![/red]Batch writes are not supported for Delta!")
print("The whole dataset will be materialized first!")
else:
print("\nBatch mode is supported.")
print("In case of memory problems you can try to reduce a [green]batch_size[/green].")
print("\nBatch mode is supported.")
print("In case of memory problems you can try to reduce a [green]batch_size[/green].")
print()


Expand Down Expand Up @@ -131,9 +132,10 @@ def _create_filename(ds_type: str, n: int, k: int, nas: int, fmt: Format) -> str
def _clear_prev_if_exists(fp: Path, fmt: Format) -> None:
if fp.exists():
# All is file, delta is directory
# Delta delete dir by itself.
if fmt is not Format.DELTA:
fp.unlink()
else:
shutil.rmtree(fp, ignore_errors=True)


@app.command(help="Create H2O GroupBy Dataset")
Expand All @@ -143,12 +145,10 @@ def groupby(
k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 100,
nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0,
seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42,
batch_size: Annotated[
int, typer.Option(min=0, help="A batch-size (in rows)")
] = 5_000_000,
batch_size: Annotated[int, typer.Option(min=0, help="A batch-size (in rows)")] = 5_000_000,
data_format: Annotated[
Format,
typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"),
typer.Option(help="An output format for generated data."),
] = Format.CSV,
):
gb = GroupByGenerator(size._to(), k, nas, seed, batch_size)
Expand Down Expand Up @@ -196,7 +196,14 @@ def groupby(
writer.close()

if data_format is Format.DELTA:
write_deltalake(output_filepath, data=gb.iter_batches(), schema=schema)
output_filepath.mkdir(parents=True)
delta_file_pq = output_filepath.joinpath("data.parquet")
writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema)
for batch in track(gb.iter_batches(), total=len(gb.batches)):
writer.write_batch(batch)

writer.close()
generate_delta_log(output_filepath, schema)


@app.command(help="Create three H2O join datasets")
Expand All @@ -206,12 +213,10 @@ def join(
k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 10,
nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0,
seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42,
batch_size: Annotated[
int, typer.Option(min=0, help="A batch-size (in rows)")
] = 5_000_000,
batch_size: Annotated[int, typer.Option(min=0, help="A batch-size (in rows)")] = 5_000_000,
data_format: Annotated[
Format,
typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"),
typer.Option(help="An output format for generated data."),
] = Format.CSV,
):
random.seed(seed)
Expand Down Expand Up @@ -298,7 +303,14 @@ def join(
writer.close()

if data_format is Format.DELTA:
write_deltalake(output_small, data=join_small.iter_batches(), schema=schema_small)
output_small.mkdir(parents=True)
delta_file_pq = output_small.joinpath("data.parquet")
writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_small)
for batch in track(join_small.iter_batches(), total=len(join_small.batches)):
writer.write_batch(batch)

writer.close()
generate_delta_log(output_small, schema_small)

print()
print("An [bold]MEDIUM[/bold] data [green]schema[/green] is the following:")
Expand All @@ -320,7 +332,14 @@ def join(
writer.close()

if data_format is Format.DELTA:
write_deltalake(output_medium, data=join_medium.iter_batches(), schema=schema_medium)
output_medium.mkdir(parents=True)
delta_file_pq = output_medium.joinpath("data.parquet")
writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_medium)
for batch in track(join_medium.iter_batches(), total=len(join_medium.batches)):
writer.write_batch(batch)

writer.close()
generate_delta_log(output_medium, schema_medium)

print()
print("An [bold]BIG[/bold] data [green]schema[/green] is the following:")
Expand All @@ -342,7 +361,14 @@ def join(
writer.close()

if data_format is Format.DELTA:
write_deltalake(output_big, data=join_big.iter_batches(), schema=schema_big)
output_big.mkdir(parents=True)
delta_file_pq = output_big.joinpath("data.parquet")
writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_big)
for batch in track(join_big.iter_batches(), total=len(join_big.batches)):
writer.write_batch(batch)

writer.close()
generate_delta_log(output_big, schema_big)


def entry_point() -> None:
Expand Down
73 changes: 73 additions & 0 deletions python/falsa/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

import json
import time
from pathlib import Path
from uuid import uuid4

from pyarrow import Schema

PA_2_DELTA_DTYPES = {
"int32": "integer",
"int64": "long",
}


def generate_delta_log(output_filepath: Path, schema: Schema) -> None:
"""Generate a delta-log from existing parquet files and the given schema."""
file_len = 20
delta_dir = output_filepath.joinpath("_delta_log")
delta_dir.mkdir(exist_ok=True)
add_meta_log = "0" * file_len + ".json"

with open(delta_dir.joinpath(add_meta_log), "w") as meta_log:
jsons = []
# Generate "metaData"
jsons.append(
json.dumps(
{
"metaData": {
"id": uuid4().__str__(),
"format": {
"provider": "parquet",
"options": {},
},
"schemaString": json.dumps(
{
"type": "struct",
"fields": [
{
"name": field.name,
"type": PA_2_DELTA_DTYPES.get(field.type.__str__(), field.type.__str__()),
"nullable": field.nullable,
"metadata": {},
}
for field in schema
],
}
),
"configuration": {},
"partitionColumns": [],
}
}
)
)
# Generate "add"
for pp in output_filepath.glob("*.parquet"):
jsons.append(
json.dumps(
{
"add": {
"path": pp.relative_to(output_filepath).__str__(),
"partitionValues": {},
"size": pp.stat().st_size,
"modificationTime": int(time.time() * 1000),
"dataChange": True,
}
}
)
)

# Generate "protocol"
jsons.append(json.dumps({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}}))
meta_log.write("\n".join(jsons))

0 comments on commit 8c665ec

Please sign in to comment.