Skip to content

Commit

Permalink
drop local file support, run change detection prior to upload to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
smnorris committed Jan 4, 2025
1 parent 8eb3ac9 commit 01767dc
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/download-daily.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ jobs:
python -m pip install .
- name: Process source
run: |
fit_downloader process -v --out-path s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s D sources/${{ matrix.arg }}.json
fit_downloader process -v --prefix s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s D sources/${{ matrix.arg }}.json
2 changes: 1 addition & 1 deletion .github/workflows/download-monthly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ jobs:
python -m pip install .
- name: Process source
run: |
fit_downloader process -v --out-path s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s M sources/${{ matrix.arg }}.json
fit_downloader process -v --prefix s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s M sources/${{ matrix.arg }}.json
2 changes: 1 addition & 1 deletion .github/workflows/download-quarterly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ jobs:
python -m pip install .
- name: Process source
run: |
fit_downloader process -v --out-path s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s Q sources/${{ matrix.arg }}.json
fit_downloader process -v --prefix s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s Q sources/${{ matrix.arg }}.json
2 changes: 1 addition & 1 deletion .github/workflows/download-weekly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ jobs:
python -m pip install .
- name: Process source
run: |
fit_downloader process -v --out-path s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s W sources/${{ matrix.arg }}.json
fit_downloader process -v --prefix s3://$BUCKET/Change_Detection/${{ matrix.arg }} -s W sources/${{ matrix.arg }}.json
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Usage: fit_downloader process [OPTIONS] CONFIG_FILE
Options:
-l, --layer TEXT Layer to process in provided config.
-o, --out-path PATH Output path or s3 prefix.
-p, --prefix S3 prefix.
-f, --force Force download to out-path without running
change detection.
-s, --schedule [D|W|M|Q|A] Process only sources with given schedule tag.
Expand All @@ -49,9 +49,11 @@ Examples:

fit_downloader process -vV example_config.json

2. Download and process layers defined in `example_config.json` configuration file, saving to `/my/output/path` on the local filesystem:
2. Process data defined in `sources/CAPRD/victoria.json` configuration file, saving to `s3://$BUCKET/CAPRD/victoria`:

fit_downloader process -o my/output/path example_config.json
fit_downloader process -v \
--prefix s3://$BUCKET/Change_Detection/CAPRD/victoria \
sources/CAPRD/victoria.json


## Configuration
Expand Down
105 changes: 58 additions & 47 deletions src/fit_opendatadownloader/fit_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
import os
import re
import shutil
import sys
import zipfile
Expand All @@ -11,6 +10,9 @@

import boto3
import click
import fit_changedetector as fcd
import geopandas
from botocore.exceptions import ClientError
from cligj import quiet_opt, verbose_opt

import fit_opendatadownloader as fdl
Expand Down Expand Up @@ -46,6 +48,20 @@ def zip_gdb(folder_path, zip_path):
zipf.write(file_path, relative_path)


def s3_key_exists(s3_client, s3_key):
"""Return True if s3 key exists, False if it does not"""
try:
s3_client.head_object(Bucket=os.environ.get("BUCKET"), Key=s3_key)
return True # If no exception, the object exists
except ClientError as e:
# If the error code is '404', the object doesn't exist
if e.response["Error"]["Code"] == "NoSuchKey":
return False
else:
LOG.error(f"Error checking object: {e}")
return False


@click.group()
@click.version_option(version=fdl.__version__, message="%(version)s")
def cli():
Expand Down Expand Up @@ -87,17 +103,9 @@ def list_configs(path, schedule, verbose, quiet):
help="Layer to process in provided config.",
)
@click.option(
"--out-path",
"-o",
type=click.Path(),
default=".",
help="Output path or s3 prefix.",
)
@click.option(
"--force",
"-f",
is_flag=True,
help="Force download to out-path without running change detection.",
"--prefix",
"-p",
help="S3 prefix.",
)
@click.option(
"--schedule",
Expand All @@ -116,8 +124,7 @@ def list_configs(path, schedule, verbose, quiet):
def process(
config_file,
layer,
out_path,
force,
prefix,
schedule,
validate,
verbose,
Expand Down Expand Up @@ -152,52 +159,56 @@ def process(
df,
fields=layer.fields,
primary_key=layer.primary_key,
fdl_primary_key="fdl_load_id",
hash_fields=layer.hash_fields,
precision=0.1,
)
# if no primary key provided, use "fdl_primary_key"
if layer.primary_key:
primary_key = layer.primary_key
else:
primary_key = "fdl_load_id"

# process and dump to file if "validate" option is not set
if not validate:
# write to gdb in cwd
# write download to zipped gdb in cwd
out_file = layer.out_layer + ".gdb"
df.to_file(out_file, driver="OpenFileGDB", layer=layer.out_layer)

# run change detection unless otherwise specified
# if not force:
# - get previous version (if present)
# - compare to previous version
# - if changes detected, modify output path to include <fcd_YYYYMMDD> prefix,
# - write diffs / reports

# then write data

# zip and write to target location
zip_gdb(out_file, out_file + ".zip")

# copy to s3 if out_path prefix is s3://
if bool(re.compile(r"^s3://").match(out_path)):
s3_key = urlparse(out_path, allow_fragments=False).path.lstrip("/")
s3_client = boto3.client("s3")
# derive output path
s3_key = (
urlparse(prefix, allow_fragments=False).path.lstrip("/") + "/" + out_file + ".zip"
)
# create s3 client
s3_client = boto3.client("s3")

# default to writing
write = True

# run change detection if out file/ s3 key already exists
if s3_key_exists(s3_client, s3_key):
# read from existing file on s3
df2 = geopandas.read_file(os.path.join("s3://", os.environ.get("BUCKET"), s3_key))
# run change detection
diff = fcd.gdf_diff(df2, df, primary_key=primary_key, return_type="gdf")
# do not write new data if nothing has changed
if len(diff["UNCHANGED"]) == len(df2) == len(df):
LOG.info(f"Data unchanged {s3_key}")
write = False
else:
LOG.info("Changes found")

# todo // write changes to log

# todo // alert users that new data is available

if write:
LOG.info(f"Writing {layer.out_layer} to {s3_key}")
s3_client.upload_file(out_file + ".zip", os.environ.get("BUCKET"), s3_key)
LOG.info(f"layer {layer.out_layer} saved to {s3_key}")
os.unlink(out_file + ".zip")

# alternatively, move to local path
elif out_path != ".":
Path(out_path).mkdir(parents=True, exist_ok=True)
destination = os.path.join(
out_path,
out_file + ".zip",
)
os.rename(out_file + ".zip", destination)
LOG.info(f"layer {layer.out_layer} saved to {destination}")

# do nothing if out_path is empty
elif out_path == ".":
LOG.info(f"layer {layer.out_layer} saved to {out_file}.zip")

# cleanup
shutil.rmtree(out_file)
os.unlink(out_file + ".zip")


if __name__ == "__main__":
Expand Down

0 comments on commit 01767dc

Please sign in to comment.