diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e5ed15c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +src/ +venv/ diff --git a/.env.example b/.env.example index 8957f34..834ad5f 100644 --- a/.env.example +++ b/.env.example @@ -1,11 +1,14 @@ -POSTGRES_USER='fer' -POSTGRES_PASSWORD='nanda' -POSTGRES_HOST='localhost' -POSTGRES_DATABASE='estaciones' +POSTGRES_USER='postgres' +POSTGRES_PASSWORD='password' +POSTGRES_HOST='your_host' +POSTGRES_DB='db_name' +POSTGRES_PORT='5432' -MYSQL_USER='fer' -MYSQL_PASSWORD='nanda' -MYSQL_HOST='localhost' -MYSQL_DATABASE='estaciones_remote' +MYSQL_USER='mysql_user' +MYSQL_PASSWORD='secret_pass' +MYSQL_HOST='mysql_host' +MYSQL_DB='mysql_db' +MYSQL_PORT='3306' -AIRNOW_API_KEY='your_secret_airnow_api_key' \ No newline at end of file +PIPELINE_HOST='localhost' +PIPELINE_PORT='6789' \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7cee35e..9ef37ea 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,6 @@ tmp* # mage drafts etl-pipeline/drafts/ + +#mage +.mage_data/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0547e4b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,36 @@ +ARG PYTHON_VERSION=3.10-slim-buster + +FROM python:${PYTHON_VERSION} + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# Set the working directory +WORKDIR /app + +RUN apt-get update && \ + apt-get install -y libgomp1 && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Copy only requirements.txt to the container +COPY requirements.txt /app/ +COPY run_app.sh /app/ + +# Install dependencies and clean up cache +RUN set -ex && \ + pip install --upgrade pip && \ + pip install -r /app/requirements.txt && \ + rm -rf /root/.cache/ + +# Copy the rest of the application files into the container +COPY . /app/ + +# Ensure run_app.sh is executable +RUN chmod +x /app/run_app.sh + +# Expose the port for the app +EXPOSE 6789 + +# Define the default command +CMD ["/app/run_app.sh"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..356574c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,54 @@ +services: + db: + image: postgres:16.4-alpine3.20 + restart: always + container_name: db_aire + env_file: + - .env + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + ports: + - "${POSTGRES_PORT}:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] + interval: 10s + timeout: 5s + retries: 5 + + pipeline: + build: + context: . + dockerfile: Dockerfile + restart: always + environment: + MAGE_DATA_DIR: ../.mage_data + PIPELINE_POSTGRES_USER: ${POSTGRES_USER} + PIPELINE_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + PIPELINE_POSTGRES_DB: ${POSTGRES_DB} + PIPELINE_POSTGRES_HOST: db + PIPELINE_POSTGRES_PORT: 5432 + PIPELINE_MYSQL_USER: ${MYSQL_USER} + PIPELINE_MYSQL_PASSWORD: ${MYSQL_PASSWORD} + PIPELINE_MYSQL_DB: ${MYSQL_DB} + PIPELINE_MYSQL_HOST: ${MYSQL_HOST} + PIPELINE_MYSQL_PORT: ${MYSQL_PORT} + ports: + - "${PIPELINE_PORT}:6789" + depends_on: + db: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "curl -s -o /dev/null -w '%{http_code}' http://localhost:${PIPELINE_PORT} | grep -q 200"] + interval: 1m30s + timeout: 30s + retries: 5 + start_period: 10s + volumes: + - /home/vnbl/data_retriever:/app + +volumes: + postgres_data: diff --git a/etl-pipeline/custom/check_station_status.sql b/etl-pipeline/custom/check_station_status.sql new file mode 100644 index 0000000..0b03b12 --- /dev/null +++ b/etl-pipeline/custom/check_station_status.sql @@ -0,0 +1,20 @@ +INSERT INTO health_checks (run_date, station_id, last_reading_id, is_on) +SELECT + DATE_TRUNC('hour', '{{ execution_date }}'::timestamp) AS run_date, -- Truncate to the hour + s.id AS station_id, + sr.id AS last_reading_id, + CASE + WHEN sr.date_utc >= DATE_TRUNC('hour', '{{ execution_date }}'::timestamp) - INTERVAL '6 hours' THEN TRUE + ELSE FALSE + END AS is_on +FROM + stations s +LEFT JOIN LATERAL ( + SELECT sr.id, sr.date_utc + FROM station_readings_gold sr + WHERE sr.station = s.id + ORDER BY sr.date_utc DESC + LIMIT 1 +) sr ON TRUE +ORDER BY + s.id; \ No newline at end of file diff --git a/etl-pipeline/custom/create_insert_inference_run.sql b/etl-pipeline/custom/create_insert_inference_run.sql new file mode 100644 index 0000000..21c078c --- /dev/null +++ b/etl-pipeline/custom/create_insert_inference_run.sql @@ -0,0 +1,2 @@ +INSERT INTO inference_runs (run_date) +VALUES ('{{ execution_date }}'::timestamp); \ No newline at end of file diff --git a/etl-pipeline/custom/create_table_region_readings.sql b/etl-pipeline/custom/create_table_region_readings.sql index 7570c16..92f2e45 100644 --- a/etl-pipeline/custom/create_table_region_readings.sql +++ b/etl-pipeline/custom/create_table_region_readings.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS region_readings( id SERIAL PRIMARY KEY, date_utc TIMESTAMP WITH TIME ZONE, - region VARCHAR REFERENCES regions(region_code), + region INTEGER REFERENCES regions(id), pm2_5_region_avg FLOAT, pm2_5_region_max FLOAT, pm2_5_region_skew FLOAT, diff --git a/etl-pipeline/custom/create_table_stations.sql b/etl-pipeline/custom/create_table_stations.sql index 935f7c6..01ada9d 100644 --- a/etl-pipeline/custom/create_table_stations.sql +++ b/etl-pipeline/custom/create_table_stations.sql @@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS stations ( name VARCHAR, latitude FLOAT, longitude FLOAT, - region VARCHAR REFERENCES regions(region_code), + region INTEGER REFERENCES regions(id), is_station_on BOOLEAN, is_pattern_station BOOLEAN ); @@ -19,16 +19,16 @@ INSERT INTO stations ( is_station_on, is_pattern_station ) VALUES - (1, 'Campus de la UNA', '-25.33360102213910', '-57.5139365997165', 'GRAN_ASUNCION', TRUE, FALSE), - (2, 'Zona Multiplaza', '-25.32014521770180', '-57.56050041876730', 'GRAN_ASUNCION', TRUE, FALSE), - (3, 'Acceso Sur', '-25.34024024382230', '-57.58431466296320', 'GRAN_ASUNCION', TRUE, FALSE), - (4, 'Primero de Marzo y Perón', '-25.32836979255080', '-57.62706899084150', 'GRAN_ASUNCION', TRUE, FALSE), - (5, 'Villa Morra', '-25.29511316679420', '-57.57708610966800', 'GRAN_ASUNCION', TRUE, FALSE), - (6, 'Barrio Jara', '-25.28833455406130', '-57.60329900309440', 'GRAN_ASUNCION', TRUE, FALSE), - (7, 'San Roque', '-25.28936695307490', '-57.62515967711810', 'GRAN_ASUNCION', TRUE, FALSE), - (8, 'Centro de Asunción', '-25.28640403412280', '-57.64701121486720', 'GRAN_ASUNCION', TRUE, FALSE), - (9, 'Ñu Guasu', '-25.26458493433890', '-57.54793468862770', 'GRAN_ASUNCION', TRUE, FALSE), - (10, 'Botánico', '-25.24647398851810', '-57.54928501322870', 'GRAN_ASUNCION', TRUE, FALSE) + (1, 'Campus de la UNA', '-25.33360102213910', '-57.5139365997165', 1, TRUE, FALSE), + (2, 'Zona Multiplaza', '-25.32014521770180', '-57.56050041876730', 1, TRUE, FALSE), + (3, 'Acceso Sur', '-25.34024024382230', '-57.58431466296320', 1, TRUE, FALSE), + (4, 'Primero de Marzo y Perón', '-25.32836979255080', '-57.62706899084150', 1, TRUE, FALSE), + (5, 'Villa Morra', '-25.29511316679420', '-57.57708610966800', 1, TRUE, FALSE), + (6, 'Barrio Jara', '-25.28833455406130', '-57.60329900309440', 1, TRUE, FALSE), + (7, 'San Roque', '-25.28936695307490', '-57.62515967711810', 1, TRUE, FALSE), + (8, 'Centro de Asunción', '-25.28640403412280', '-57.64701121486720', 1, TRUE, FALSE), + (9, 'Ñu Guasu', '-25.26458493433890', '-57.54793468862770', 1, TRUE, FALSE), + (10, 'Botánico', '-25.24647398851810', '-57.54928501322870', 1, TRUE, FALSE) ; @@ -42,4 +42,4 @@ INSERT INTO stations ( is_station_on, is_pattern_station ) VALUES - (101, 'US Embassy - Asunción', '-25.296368', '-57.604671', 'GRAN_ASUNCION', TRUE, TRUE) + (101, 'US Embassy - Asunción', '-25.296368', '-57.604671', 1, TRUE, TRUE) diff --git a/etl-pipeline/custom/create_table_weather_stations.sql b/etl-pipeline/custom/create_table_weather_stations.sql index 5ebaf22..9444ba2 100644 --- a/etl-pipeline/custom/create_table_weather_stations.sql +++ b/etl-pipeline/custom/create_table_weather_stations.sql @@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS weather_stations ( name VARCHAR, latitude FLOAT, longitude FLOAT, - region VARCHAR REFERENCES regions(region_code) + region INTEGER REFERENCES regions(id) ); INSERT INTO weather_stations ( @@ -13,5 +13,5 @@ INSERT INTO weather_stations ( longitude, region ) VALUES - (1, 'Silvio Pettirossi Airport', '-25.2667', '-57.6333', 'GRAN_ASUNCION') + (1, 'Silvio Pettirossi Airport', '-25.2667', '-57.6333', 1) ; \ No newline at end of file diff --git a/etl-pipeline/custom/get_healthy_station_ids.sql b/etl-pipeline/custom/get_healthy_station_ids.sql new file mode 100644 index 0000000..92ab30c --- /dev/null +++ b/etl-pipeline/custom/get_healthy_station_ids.sql @@ -0,0 +1,15 @@ +-- Docs: https://docs.mage.ai/guides/sql-blocks +WITH execution_date AS ( + SELECT run_date + FROM inference_runs + ORDER BY id DESC + LIMIT 1 +) + +SELECT + station_id as id +FROM health_checks +WHERE + run_date = (SELECT date_trunc('hour', run_date) - INTERVAL '1 HOUR' FROM execution_date) + AND is_on = TRUE + AND station_id NOT IN (SELECT id FROM stations WHERE is_pattern_station = TRUE); \ No newline at end of file diff --git a/etl-pipeline/custom/get_last_airnow_reading_silver_and_bbox.sql b/etl-pipeline/custom/get_last_airnow_reading_silver_and_bbox.sql index afa1417..0ec35f5 100644 --- a/etl-pipeline/custom/get_last_airnow_reading_silver_and_bbox.sql +++ b/etl-pipeline/custom/get_last_airnow_reading_silver_and_bbox.sql @@ -1,6 +1,6 @@ SELECT s.id AS station_id, r.bbox FROM regions r -LEFT JOIN stations s ON s.region = r.region_code +LEFT JOIN stations s ON s.region = r.id WHERE s.is_pattern_station = TRUE GROUP BY s.id, r.bbox; \ No newline at end of file diff --git a/etl-pipeline/custom/predict_aqi_6_12_hours.py b/etl-pipeline/custom/predict_aqi_6_12_hours.py new file mode 100644 index 0000000..44c15e2 --- /dev/null +++ b/etl-pipeline/custom/predict_aqi_6_12_hours.py @@ -0,0 +1,135 @@ +import pandas as pd +import json +from glob import glob +from datetime import datetime +import os +from darts import TimeSeries +from darts.models import LightGBMModel + +if 'custom' not in globals(): + from mage_ai.data_preparation.decorators import custom +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + +def prepare_data(data): + data.drop(columns=['station', 'inference_run'], inplace=True) + run_date = pd.to_datetime(data['run_date'].iloc[0], utc=True).tz_convert(None) + data['date_utc'] = pd.to_datetime(data['date_utc'], utc=True).dt.tz_convert(None) + data = data.drop_duplicates(subset='date_utc').sort_values(by='date_utc') + min_date_utc = data['date_utc'].min() + + if pd.api.types.is_datetime64tz_dtype(min_date_utc): + min_date_utc = min_date_utc.tz_convert(None) + + full_range = pd.date_range(start=min_date_utc, end=run_date, freq='H') + + data = data.set_index('date_utc').reindex(full_range).rename_axis('date_utc').reset_index() + + data.fillna(method='ffill', inplace=True) + data.fillna(method='bfill', inplace=True) + data = data[data['date_utc'] <= run_date] + data.reset_index(drop=True, inplace=True) + + if 'run_date' in data.columns: + data.drop(columns=['run_date'], inplace=True) + return data + +def get_latest_model_path(model_dir, model_name, klogger): + try: + all_files = os.listdir(model_dir) + model_files = [] + for file_name in all_files: + file_name_no_ext = os.path.splitext(file_name)[0] + try: + # Format: + date_part, version_part, model_part = file_name_no_ext.split('_') + datetime.strptime(date_part, "%Y-%m-%d") + + if model_part == model_name and version_part.startswith('v'): + version_numbers = version_part[1:].split('.') + if all(num.isdigit() for num in version_numbers): + model_files.append(file_name) + except (ValueError, IndexError): + klogger.warning(f'Unvalid filename: {file_name}') + continue + + if not model_files: + klogger.exception(f"No valid models found for {model_name} in directory {model_dir}") + return None + # Latest model first in array: + model_files.sort(reverse=True) + return os.path.join(model_dir, model_files[0]) + except Exception as e: + klogger.exception(f"An error occurred while getting latest model path: {e}") + return None + + +def load_models(klogger, model_dir='etl-pipeline/models/'): + try: + model_12h_path = get_latest_model_path(model_dir, 'model-12h', klogger) + model_6h_path = get_latest_model_path(model_dir, 'model-6h', klogger) + + model_12h = LightGBMModel.load(model_12h_path) + model_6h = LightGBMModel.load(model_6h_path) + + return model_6h, model_12h + except Exception as e: + klogger.exception(f'An error occurred while loading models: {e}') + +def predict_aqi(data, model, output_length, klogger): + try: + target = 'aqi_pm2_5' + covariates = list(data.columns.drop(['date_utc'])) + ts = TimeSeries.from_dataframe(data, time_col='date_utc', value_cols=[target] + covariates, freq='h') + target_data = ts[target] + covariates_data = ts[covariates] + + y_pred = model.predict(output_length, series=target_data, past_covariates=covariates_data) + + y_pred_series = y_pred.pd_series().round(0) + result = [ + { + "timestamp": timestamp.isoformat(), + "value": int(value) + } + for timestamp, value in y_pred_series.items() + ] + return result + except Exception as e: + klogger.exception(f'An error occurred while predicting aqi: {e}') + +@custom +def transform_custom(data, *args, **kwargs): + klogger = kwargs.get('logger') + try: + station = data['station'].iloc[0] + inference_run = data['inference_run'].iloc[0] + + pred_data = prepare_data(data) + + aqi_df = pred_data[['date_utc','aqi_pm2_5']] + aqi_json_list = aqi_df.apply(lambda row: {"timestamp": row['date_utc'].isoformat(), "value": int(row['aqi_pm2_5'])}, axis=1).tolist() + aqi_json = json.dumps(aqi_json_list, indent=4) + + model_6h, model_12h = load_models(klogger=klogger) + forecast_12h = predict_aqi(pred_data, model_12h, + output_length=12, klogger=klogger) + forecast_6h = predict_aqi(pred_data, model_6h, + output_length=6, klogger=klogger) + result_df = pd.DataFrame({ + 'inference_run': [inference_run], + 'station': [station], + 'aqi_input': [aqi_json], + 'forecasts_6h': [forecast_6h], + 'forecasts_12h': [forecast_12h] + }) + return result_df + except Exception as e: + klogger.exception(e) + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' \ No newline at end of file diff --git a/etl-pipeline/custom/trigger_inference.py b/etl-pipeline/custom/trigger_inference.py new file mode 100644 index 0000000..03da526 --- /dev/null +++ b/etl-pipeline/custom/trigger_inference.py @@ -0,0 +1,23 @@ +from mage_ai.orchestration.triggers.api import trigger_pipeline +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader + + +@custom +def trigger(*args, **kwargs): + """ + Trigger another pipeline to run. + + Documentation: https://docs.mage.ai/orchestration/triggers/trigger-pipeline + """ + + trigger_pipeline( + 'inference_results', # Required: enter the UUID of the pipeline to trigger + variables={}, # Optional: runtime variables for the pipeline + check_status=False, # Optional: poll and check the status of the triggered pipeline + error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception + poll_interval=60, # Optional: check the status of triggered pipeline every N seconds + poll_timeout=None, # Optional: raise an exception after N seconds + schedule_name='inference_from_API', + verbose=True, # Optional: print status of triggered pipeline run + ) \ No newline at end of file diff --git a/etl-pipeline/data_exporters/insert_inference_result.py b/etl-pipeline/data_exporters/insert_inference_result.py new file mode 100644 index 0000000..3e74fc1 --- /dev/null +++ b/etl-pipeline/data_exporters/insert_inference_result.py @@ -0,0 +1,25 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + schema_name = 'public' + table_name = 'inference_results' + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, + if_exists='append', + ) \ No newline at end of file diff --git a/etl-pipeline/data_exporters/load_region_readings.py b/etl-pipeline/data_exporters/load_region_readings.py index 291b186..c7d8848 100644 --- a/etl-pipeline/data_exporters/load_region_readings.py +++ b/etl-pipeline/data_exporters/load_region_readings.py @@ -21,13 +21,20 @@ def export_data_to_postgres(df: DataFrame, **kwargs) -> None: config_path = path.join(get_repo_path(), 'io_config.yaml') config_profile = 'default' - with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: - loader.export( - df, - schema_name, - table_name, - index=False, # Specifies whether to include index in exported table - if_exists='append', # Specify resolution policy if table name already exists - unique_conflict_method = 'UPDATE', - unique_constraints = ['region','date_utc'] - ) \ No newline at end of file + klogger = kwargs.get('logger') + + try: + if df.empty: + klogger.exception('Dataframe is empty') + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='append', # Specify resolution policy if table name already exists + unique_conflict_method = 'UPDATE', + unique_constraints = ['region','date_utc'] + ) + except Exception as e: + klogger.exception(e) \ No newline at end of file diff --git a/etl-pipeline/data_loaders/extract_data_for_inference.sql b/etl-pipeline/data_loaders/extract_data_for_inference.sql new file mode 100644 index 0000000..89d4bfe --- /dev/null +++ b/etl-pipeline/data_loaders/extract_data_for_inference.sql @@ -0,0 +1,59 @@ +WITH execution_date AS ( + SELECT id, run_date + FROM inference_runs + ORDER BY run_date DESC + LIMIT 1 +) + +SELECT + srg.date_utc, + srg.station, + srg.pm1, + srg.pm2_5, + srg.pm10, + srg.pm2_5_avg_6h, + srg.pm2_5_max_6h, + srg.pm2_5_skew_6h, + srg.pm2_5_std_6h, + srg.aqi_pm2_5, + srg.aqi_pm10, + srg.aqi_level, + srg.aqi_pm2_5_max_24h, + srg.aqi_pm2_5_skew_24h, + srg.aqi_pm2_5_std_24h, + rr.pm2_5_region_avg, + rr.pm2_5_region_max, + rr.pm2_5_region_skew, + rr.pm2_5_region_std, + rr.aqi_region_avg, + rr.aqi_region_max, + rr.aqi_region_skew, + rr.aqi_region_std, + rr.level_region_max, + wr.temperature, + wr.humidity, + wr.pressure, + wr.wind_speed, + wr.wind_dir_cos, + wr.wind_dir_sin, + execution_date.id AS inference_run, + execution_date.run_date AS run_date +FROM + station_readings_gold srg +INNER JOIN + region_readings rr + ON srg.date_utc = rr.date_utc +INNER JOIN + weather_readings_gold wr + ON srg.date_utc = wr.date_localtime +CROSS JOIN execution_date +WHERE + srg.station = {{ block_output(parse=lambda data, vars: data[0]["id"]) }} + AND srg.date_utc BETWEEN execution_date.run_date - INTERVAL '25 HOURS' + AND execution_date.run_date - INTERVAL '1 HOURS' + AND rr.date_utc BETWEEN execution_date.run_date - INTERVAL '25 HOURS' + AND execution_date.run_date - INTERVAL '1 HOURS' + AND wr.date_localtime BETWEEN execution_date.run_date - INTERVAL '25 HOURS' + AND execution_date.run_date - INTERVAL '1 HOURS' +ORDER BY + srg.date_utc ASC; diff --git a/etl-pipeline/data_loaders/extract_data_for_regional_stats.sql b/etl-pipeline/data_loaders/extract_data_for_regional_stats.sql index c220e7d..c520f1a 100644 --- a/etl-pipeline/data_loaders/extract_data_for_regional_stats.sql +++ b/etl-pipeline/data_loaders/extract_data_for_regional_stats.sql @@ -12,7 +12,7 @@ WITH unprocessed_readings AS ( JOIN stations s ON gold.station = s.id WHERE gold.processed_to_region = FALSE - AND s.region = 'GRAN_ASUNCION' + AND s.region = 1--'GRAN_ASUNCION' AND s.is_pattern_station = FALSE ), existing_readings AS ( @@ -29,7 +29,7 @@ existing_readings AS ( JOIN stations s ON gold.station = s.id WHERE gold.processed_to_region = TRUE - AND s.region = 'GRAN_ASUNCION' + AND s.region = 1--'GRAN_ASUNCION' AND s.is_pattern_station = FALSE AND gold.date_utc IN ( SELECT DISTINCT unprocessed.date_utc diff --git a/etl-pipeline/metadata.yaml b/etl-pipeline/metadata.yaml index de8dbdb..84dd7bd 100755 --- a/etl-pipeline/metadata.yaml +++ b/etl-pipeline/metadata.yaml @@ -1,6 +1,6 @@ project_type: standalone -variables_dir: ~/.mage_data +variables_dir: /app/.mage_data/ # remote_variables_dir: s3://bucket/path_prefix variables_retention_period: '90d' diff --git a/etl-pipeline/models/2024-09-26_v0.1.0_model-12h.pkl b/etl-pipeline/models/2024-09-26_v0.1.0_model-12h.pkl new file mode 100644 index 0000000..fe01662 Binary files /dev/null and b/etl-pipeline/models/2024-09-26_v0.1.0_model-12h.pkl differ diff --git a/etl-pipeline/models/2024-09-26_v0.1.0_model-6h.pkl b/etl-pipeline/models/2024-09-26_v0.1.0_model-6h.pkl new file mode 100644 index 0000000..818405b Binary files /dev/null and b/etl-pipeline/models/2024-09-26_v0.1.0_model-6h.pkl differ diff --git a/etl-pipeline/pipelines/calibration_factors/metadata.yaml b/etl-pipeline/pipelines/calibration_factors/metadata.yaml index e0a4c5c..1c20c27 100755 --- a/etl-pipeline/pipelines/calibration_factors/metadata.yaml +++ b/etl-pipeline/pipelines/calibration_factors/metadata.yaml @@ -114,8 +114,9 @@ run_pipeline_in_one_process: false settings: triggers: null spark_config: {} -tags: [] +tags: +- fiuna type: python uuid: calibration_factors -variables_dir: /app/.mage_data/ +variables_dir: /app/.mage_data widgets: [] diff --git a/etl-pipeline/pipelines/calibration_factors/triggers.yaml b/etl-pipeline/pipelines/calibration_factors/triggers.yaml index f5d122b..fe0a15d 100644 --- a/etl-pipeline/pipelines/calibration_factors/triggers.yaml +++ b/etl-pipeline/pipelines/calibration_factors/triggers.yaml @@ -1,7 +1,7 @@ triggers: - description: null envs: [] - last_enabled_at: 2024-10-02 03:26:29.651538+00:00 + last_enabled_at: 2024-10-16 18:06:47.165954 name: monthly calibration pipeline_uuid: calibration_factors schedule_interval: '@monthly' diff --git a/etl-pipeline/pipelines/etl_airnow_bronze/triggers.yaml b/etl-pipeline/pipelines/etl_airnow_bronze/triggers.yaml index 4c34710..b44cab2 100644 --- a/etl-pipeline/pipelines/etl_airnow_bronze/triggers.yaml +++ b/etl-pipeline/pipelines/etl_airnow_bronze/triggers.yaml @@ -1,7 +1,7 @@ triggers: - description: null envs: [] - last_enabled_at: 2024-10-04 19:57:56.794455 + last_enabled_at: 2024-10-08 23:36:24.949476 name: hourly airnow data pipeline_uuid: etl_airnow_bronze schedule_interval: '30 * * * * ' diff --git a/etl-pipeline/pipelines/etl_fiuna_bronze/metadata.yaml b/etl-pipeline/pipelines/etl_fiuna_bronze/metadata.yaml index 5f99c85..c092801 100755 --- a/etl-pipeline/pipelines/etl_fiuna_bronze/metadata.yaml +++ b/etl-pipeline/pipelines/etl_fiuna_bronze/metadata.yaml @@ -66,7 +66,7 @@ blocks: upstream_blocks: - extract_fiuna_data uuid: transform_fiuna_bronze -- all_upstream_blocks_executed: true +- all_upstream_blocks_executed: false color: null configuration: file_path: data_exporters/load_fiuna_bronze.py @@ -87,7 +87,7 @@ blocks: upstream_blocks: - transform_fiuna_bronze uuid: load_fiuna_bronze -- all_upstream_blocks_executed: true +- all_upstream_blocks_executed: false color: null configuration: file_path: custom/trigger_etl_fiuna_silver.py diff --git a/etl-pipeline/pipelines/etl_fiuna_bronze/triggers.yaml b/etl-pipeline/pipelines/etl_fiuna_bronze/triggers.yaml index cd00780..9eab980 100644 --- a/etl-pipeline/pipelines/etl_fiuna_bronze/triggers.yaml +++ b/etl-pipeline/pipelines/etl_fiuna_bronze/triggers.yaml @@ -1,7 +1,7 @@ triggers: - description: null envs: [] - last_enabled_at: 2024-10-02 03:17:24.474816 + last_enabled_at: 2024-10-08 23:36:26.078433 name: hourly fiuna data pipeline_uuid: etl_fiuna_bronze schedule_interval: '@hourly' diff --git a/etl-pipeline/pipelines/etl_meteostat_bronze/triggers.yaml b/etl-pipeline/pipelines/etl_meteostat_bronze/triggers.yaml index da552b2..23d40db 100644 --- a/etl-pipeline/pipelines/etl_meteostat_bronze/triggers.yaml +++ b/etl-pipeline/pipelines/etl_meteostat_bronze/triggers.yaml @@ -1,7 +1,7 @@ triggers: - description: null envs: [] - last_enabled_at: 2024-10-02 03:15:58.196779 + last_enabled_at: 2024-10-08 23:36:26.547508 name: hourly meteostat updates pipeline_uuid: etl_meteostat_bronze schedule_interval: '@hourly' diff --git a/etl-pipeline/pipelines/health_checks/metadata.yaml b/etl-pipeline/pipelines/health_checks/metadata.yaml index b3d8f9c..2788999 100755 --- a/etl-pipeline/pipelines/health_checks/metadata.yaml +++ b/etl-pipeline/pipelines/health_checks/metadata.yaml @@ -40,7 +40,8 @@ run_pipeline_in_one_process: false settings: triggers: null spark_config: {} -tags: [] +tags: +- fiuna type: python uuid: health_checks variables_dir: /app/.mage_data diff --git a/etl-pipeline/pipelines/health_checks/triggers.yaml b/etl-pipeline/pipelines/health_checks/triggers.yaml new file mode 100644 index 0000000..eaf0789 --- /dev/null +++ b/etl-pipeline/pipelines/health_checks/triggers.yaml @@ -0,0 +1,14 @@ +triggers: +- description: null + envs: [] + last_enabled_at: 2024-10-13 03:56:43.692839 + name: hourly_health_check + pipeline_uuid: health_checks + schedule_interval: '30 * * * * ' + schedule_type: time + settings: null + sla: null + start_time: 2024-10-06 01:22:00 + status: active + token: 5d0431e1d4da484b99510ea94b1a83b1 + variables: {} diff --git a/etl-pipeline/pipelines/inference_results/metadata.yaml b/etl-pipeline/pipelines/inference_results/metadata.yaml index 62fa363..2bf95e4 100755 --- a/etl-pipeline/pipelines/inference_results/metadata.yaml +++ b/etl-pipeline/pipelines/inference_results/metadata.yaml @@ -12,7 +12,7 @@ blocks: limit: 1000 use_raw_sql: true downstream_blocks: - - impressive_sound + - get_healthy_station_ids executor_config: null executor_type: local_python has_callback: false @@ -33,6 +33,9 @@ blocks: disable_query_preprocessing: false dynamic: true export_write_policy: append + file_path: custom/get_healthy_station_ids.sql + file_source: + path: custom/get_healthy_station_ids.sql limit: 1000 use_raw_sql: true downstream_blocks: @@ -41,14 +44,14 @@ blocks: executor_type: local_python has_callback: false language: sql - name: impressive sound + name: get_healthy_station_ids retry_config: null status: executed timeout: null type: custom upstream_blocks: - create_insert_inference_run - uuid: impressive_sound + uuid: get_healthy_station_ids - all_upstream_blocks_executed: true color: null configuration: @@ -71,7 +74,7 @@ blocks: timeout: null type: data_loader upstream_blocks: - - impressive_sound + - get_healthy_station_ids uuid: extract_data_for_inference - all_upstream_blocks_executed: true color: null @@ -87,13 +90,13 @@ blocks: language: python name: predict_aqi_6_12_hours retry_config: null - status: updated + status: executed timeout: null type: custom upstream_blocks: - extract_data_for_inference uuid: predict_aqi_6_12_hours -- all_upstream_blocks_executed: false +- all_upstream_blocks_executed: true color: null configuration: {} downstream_blocks: [] diff --git a/etl-pipeline/pipelines/initialize_database/metadata.yaml b/etl-pipeline/pipelines/initialize_database/metadata.yaml index 96de239..853b72b 100755 --- a/etl-pipeline/pipelines/initialize_database/metadata.yaml +++ b/etl-pipeline/pipelines/initialize_database/metadata.yaml @@ -276,5 +276,5 @@ tags: - init type: python uuid: initialize_database -variables_dir: /app/.mage_data/ +variables_dir: /app/.mage_data widgets: [] diff --git a/run_app.sh b/run_app.sh new file mode 100755 index 0000000..696d09f --- /dev/null +++ b/run_app.sh @@ -0,0 +1,2 @@ +#!/bin/bash +mage start etl-pipeline/ \ No newline at end of file