Skip to content

Store latest datasets cronjob #831

Store latest datasets cronjob

Store latest datasets cronjob #831

name: Store latest datasets cronjob
on:
workflow_dispatch:
schedule:
- cron: "0 0 * * *"
env:
URL_STORAGE_PREFIX: "https://storage.googleapis.com/storage/v1/b/mdb-latest/o"
jobs:
get-urls:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.9
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install wheel numpy
- name: Create URLs matrix with the direct download and latest URLs
shell: python
run: |
import os
import json
import hashlib
import numpy as np
# OS constants
ROOT = os.getcwd()
GTFS_SCHEDULE_CATALOG_PATH_FROM_ROOT = "catalogs/sources/gtfs/schedule"
URL_STORAGE_PREFIX = """${{ env.URL_STORAGE_PREFIX }}"""
MATRIX_FILE = "urls_matrix.json"
# File constants
URLS = "urls"
DIRECT_DOWNLOAD = "direct_download"
LATEST = "latest"
AUTHENTICATION_TYPE = "authentication_type"
API_KEY_PARAMETER_NAME = "api_key_parameter_name"
# Github constants
MAX_JOB_NUMBER = 256
# Matrix constants
INCLUDE = "include"
DATA = "data"
BASE = "base"
HASH = "hash"
# Report constants
GET_URLS_REPORT = "get_urls_report.txt"
files = os.listdir(os.path.join(ROOT, GTFS_SCHEDULE_CATALOG_PATH_FROM_ROOT))
urls = {}
for file in files:
base = os.path.splitext(os.path.basename(file))[0]
with open(os.path.join(ROOT, GTFS_SCHEDULE_CATALOG_PATH_FROM_ROOT, file), "r") as fp:
file_json = json.load(fp)
direct_download_url = file_json.get(URLS, {}).get(DIRECT_DOWNLOAD)
latest_url = file_json.get(URLS, {}).get(LATEST)
authentication_type = file_json.get(URLS, {}).get(AUTHENTICATION_TYPE)
api_key_parameter_name = file_json.get(URLS, {}).get(API_KEY_PARAMETER_NAME)
if direct_download_url is None or latest_url is None:
file_log = (
f"{base}: FAILURE! Direct download URL is {direct_download_url} and latest URL is {latest_url}. "
f"Both URLS must be defined to update the source latest URL.\n"
)
else:
urls[base] = {
DIRECT_DOWNLOAD: direct_download_url,
LATEST: latest_url.replace(URL_STORAGE_PREFIX, ""),
AUTHENTICATION_TYPE: authentication_type,
API_KEY_PARAMETER_NAME: api_key_parameter_name
}
file_log = (
f"{base}: SUCCESS! Both direct download and latest URLs were fetched.\n"
)
with open(GET_URLS_REPORT, "a") as fp:
fp.write(file_log)
urls_data = []
jobs = np.array_split(list(urls.keys()), min(MAX_JOB_NUMBER, len(list(urls.keys()))))
jobs = [list(job) for job in jobs]
for job in jobs:
urls_data_string = ""
while len(job) > 0:
file_base = job.pop()
file_information = {
BASE: file_base,
DIRECT_DOWNLOAD: urls[file_base][DIRECT_DOWNLOAD],
LATEST: urls[file_base][LATEST],
AUTHENTICATION_TYPE: urls[file_base][AUTHENTICATION_TYPE],
API_KEY_PARAMETER_NAME: urls[file_base][API_KEY_PARAMETER_NAME]
}
urls_data_string = urls_data_string + json.dumps(
file_information, separators=(",", ":")
)
job_data = {
HASH: hashlib.sha1(urls_data_string.encode("utf-8")).hexdigest(),
DATA: urls_data_string.replace("}{", "} {")
}
urls_data.append(job_data)
matrix_data = {INCLUDE: urls_data}
with open(os.path.join(ROOT, MATRIX_FILE), "w") as fp:
file_json = json.dump(matrix_data, fp)
- name: Set URLs matrix
id: set-matrix
run: |
DATASETS=$(jq . ./urls_matrix.json -c)
echo $DATASETS
echo "matrix=$DATASETS" >> $GITHUB_OUTPUT
- name: Persist URLs matrix artifact
if: always()
uses: actions/upload-artifact@v3
with:
name: urls_matrix
path: ./urls_matrix.json
- name: Persist Get URLS report artifact
if: always()
uses: actions/upload-artifact@v3
with:
name: get_urls_report
path: ./get_urls_report.txt
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
store-datasets:
needs: [ get-urls ]
runs-on: ubuntu-latest
strategy:
matrix: ${{ fromJson(needs.get-urls.outputs.matrix) }}
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.9
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install wheel requests
- name: Validate and download the datasets
shell: python
env:
API_SOURCE_SECRETS: ${{ secrets.API_SOURCE_SECRETS }}
run: |
import os
import json
import requests
from zipfile import ZipFile
# OS constants
ROOT = os.getcwd()
DATASETS = "datasets"
FOLDER = """${{ matrix.hash }}"""
DOWNLOAD_REPORTS = "download_reports"
# Jobs constants
BASE = "base"
DIRECT_DOWNLOAD = "direct_download"
AUTHENTICATION_TYPE = "authentication_type"
API_KEY_PARAMETER_NAME = "api_key_parameter_name"
# Secrets constants
API_SOURCE_SECRETS = "API_SOURCE_SECRETS"
# Load API source secrets
api_source_secrets = json.loads(os.environ[API_SOURCE_SECRETS])
jobs = """${{ matrix.data }}""".split()
for job in jobs:
job_json = json.loads(job)
base = job_json[BASE]
url = job_json[DIRECT_DOWNLOAD]
authentication_type = job_json[AUTHENTICATION_TYPE]
api_key_parameter_name = job_json[API_KEY_PARAMETER_NAME]
api_key_parameter_value = api_source_secrets.get(base)
# Download the dataset
zip_path = os.path.join(ROOT, DATASETS, FOLDER, f"{base}.zip")
os.makedirs(os.path.dirname(zip_path), exist_ok=True)
params = {}
headers = {}
if authentication_type == 1:
params[api_key_parameter_name] = api_key_parameter_value
elif authentication_type == 2:
headers[api_key_parameter_name] = api_key_parameter_value
is_downloadable = True
try:
zip_file_req = requests.get(url, params=params, headers=headers, allow_redirects=True)
zip_file_req.raise_for_status()
except Exception as e:
is_downloadable = False
file_log = (
f"{base}: FAILURE! Exception {e} occurred when downloading URL {url}.\n"
)
if is_downloadable:
zip_file = zip_file_req.content
with open(zip_path, "wb") as f:
f.write(zip_file)
# Make sure that the download file is a zip file
try:
ZipFile(zip_path, "r")
file_log = (
f"{base}: SUCCESS! A GTFS dataset zip file was downloaded.\n"
)
except Exception as e:
os.remove(zip_path)
file_log = (
f"{base}: FAILURE! Exception {e} occurred when loading the zip file.\n"
)
report_path = os.path.join(ROOT, DOWNLOAD_REPORTS, f"{base}.txt")
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as fp:
fp.write(file_log)
- name: Set up and authorize Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.ARCHIVE_DATASET_SA_KEY }}
- name: Upload datasets to Google Cloud Storage
id: upload-datasets
uses: google-github-actions/[email protected]
with:
path: datasets/${{ matrix.hash }}
destination: mdb-latest
parent: false
- name: Persist Download Reports artifact
if: always()
uses: actions/upload-artifact@v3
with:
name: download_reports
path: download_reports
validate-latest:
needs: [ get-urls, store-datasets ]
runs-on: ubuntu-latest
strategy:
matrix: ${{ fromJson(needs.get-urls.outputs.matrix) }}
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.9
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install wheel requests
- name: Validate the latest datasets
shell: python
run: |
import os
import json
import requests
from zipfile import ZipFile
# OS constants
ROOT = os.getcwd()
DATASETS = "datasets"
FOLDER = """${{ matrix.hash }}"""
LATEST_REPORTS = "latest_reports"
URL_STORAGE_PREFIX = """${{ env.URL_STORAGE_PREFIX }}"""
# Jobs constants
BASE = "base"
LATEST = "latest"
jobs = """${{ matrix.data }}""".split()
for job in jobs:
job_json = json.loads(job)
base = job_json[BASE]
url = URL_STORAGE_PREFIX + job_json[LATEST]
# Download the dataset
zip_path = os.path.join(ROOT, DATASETS, FOLDER, f"{base}.zip")
os.makedirs(os.path.dirname(zip_path), exist_ok=True)
is_downloadable = True
try:
zip_file_req = requests.get(url, allow_redirects=True)
zip_file_req.raise_for_status()
except Exception as e:
is_downloadable = False
file_log = (
f"{base}: FAILURE! Exception {e} occurred when downloading URL {url}.\n"
)
if is_downloadable:
zip_file = zip_file_req.content
with open(zip_path, "wb") as f:
f.write(zip_file)
# Make sure that the download file is a zip file
try:
ZipFile(zip_path, "r")
file_log = (
f"{base}: SUCCESS! A GTFS dataset zip file was downloaded.\n"
)
except Exception as e:
os.remove(zip_path)
file_log = (
f"{base}: FAILURE! Exception {e} occurred when loading the zip file.\n"
)
report_path = os.path.join(ROOT, LATEST_REPORTS, f"{base}.txt")
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as fp:
fp.write(file_log)
- name: Persist Latest Reports artifact
if: always()
uses: actions/upload-artifact@v3
with:
name: latest_reports
path: latest_reports