diff --git a/etl-pipeline/custom/construct_send_telegram_message.py b/etl-pipeline/custom/construct_send_telegram_message.py new file mode 100644 index 0000000..90a8b87 --- /dev/null +++ b/etl-pipeline/custom/construct_send_telegram_message.py @@ -0,0 +1,84 @@ +import telebot +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'custom' not in globals(): + from mage_ai.data_preparation.decorators import custom +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + +AQI_LEVELS = [ + (0, 50, "Bueno 😁"), # Good + (51, 100, "Moderado 🙂"), # Moderate + (101, 150, "Insalubre para grupos sensibles 😷"), # Unhealthy for sensitive groups + (151, 200, "Insalubre 😶‍🌫️"), # Unhealthy + (201, 300, "Muy insalubre 😰"), # Very Unhealthy + (301, 500, "Peligroso 💀") # Hazardous +] + +AQI_MESSAGES = [ + (0, 50,"🟢 ¡Hoy es un excelente día para estar al aire libre! 🏃‍♂️🍃"), # Good + (51, 100, "🟡 Hoy es un buen día para estar al aire libre para la mayoría de las personas. 👍🏻"), # Moderate + (101, 150, "🟠 Las personas sensibles deben prestar atención a síntomas 😷"), # Unhealthy for sensitive groups + (151, 200, "🔴 Hoy es un mal día para estar al aire libre.\n Se recomienda el uso de tapabocas para la población general💨😷"), # Unhealthy + (201, 300, "🟣 Hoy es un muy mal día para estar al aire libre. \n Se recomienda el uso de tapabocas para la población general🔥😷"), # Very Unhealthy + (301, 500, "⚫ Evite la exposición al aire libre. 💀😷\nAnte la aparición de síntomas, consulte a su médico.") # Hazardous + +] + +def get_aqi_label(aqi_value): + """Returns the AQI level label for a given AQI value.""" + for min_val, max_val, label in AQI_LEVELS: + if min_val <= aqi_value <= max_val: + return label + return "Fuera del AQI 💀🤯" + +def get_aqi_message(aqi_value): + for min_val, max_val, message in AQI_MESSAGES: + if min_val <= aqi_value <= max_val: + return message + return " " + +def get_latest_aqi_summary(df): + avg_value = int(df['forecast_avg'].mean()) + max_value = int(df['forecast_avg'].max()) + min_value = int(df['forecast_avg'].min()) + + return avg_value, max_value, min_value + +def construct_message(avg_aqi, max_aqi, min_aqi): + avg_label = get_aqi_label(avg_aqi) + max_label = get_aqi_label(max_aqi) + min_label = get_aqi_label(min_aqi) + aqi_message = get_aqi_message(avg_aqi) + + message = ( + f"📊 **Reporte de Calidad del Aire para Gran Asunción - Pronóstico de 12 horas**\n" + f"🔹 **AQI Promedio:** {avg_aqi} ({avg_label})\n" + f"🔺 **AQI Máximo:** {max_aqi} ({max_label})\n" + f"🔻 **AQI Mínimo:** {min_aqi} ({min_label})\n\n" + f"{aqi_message}\n\n" + "🔗 Podés acceder al pronóstico actualizado en tu zona ingresando a www.proyectorespira.net" + ) + return message + +def send_message(token, chat_id, msg=''): + bot = telebot.TeleBot(token) + return bot.send_message(chat_id=chat_id, text=msg, parse_mode='Markdown') + +@custom +def transform_custom(data, *args, **kwargs): + telegram_token = get_secret_value('TELEGRAM_BOT_TOKEN') + telegram_chat_id = get_secret_value('TELEGRAM_CHAT_ID') + aqi_summary = get_latest_aqi_summary(data) + if aqi_summary: + avg_aqi, max_aqi, min_aqi = aqi_summary + message = construct_message(avg_aqi, max_aqi, min_aqi) + send_message(token=telegram_token, chat_id=telegram_chat_id, msg=message) + return data + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/etl-pipeline/custom/construct_send_twitter_message.py b/etl-pipeline/custom/construct_send_twitter_message.py new file mode 100644 index 0000000..7d96092 --- /dev/null +++ b/etl-pipeline/custom/construct_send_twitter_message.py @@ -0,0 +1,68 @@ +import tweepy +from mage_ai.data_preparation.shared.secrets import get_secret_value + +if 'custom' not in globals(): + from mage_ai.data_preparation.decorators import custom +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + +AQI_LEVELS = [ + (0, 50, "🟢 Bueno 😁"), # Good + (51, 100, "🟡 Moderado 🙂"), # Moderate + (101, 150, "🟠 Insalubre para grupos sensibles 😷"), # Unhealthy for sensitive groups + (151, 200, "🔴 Insalubre 😶‍🌫️"), # Unhealthy + (201, 300, "🟣 Muy insalubre 😰"), # Very Unhealthy + (301, 500, "⚫ Peligroso 💀") # Hazardous +] + +def get_aqi_label(aqi_value): + for min_val, max_val, label in AQI_LEVELS: + if min_val <= aqi_value <= max_val: + return label + return "Fuera del AQI 💀🤯" + +def get_latest_aqi_summary(df): + avg_value = int(df['forecast_avg'].mean()) + max_value = int(df['forecast_avg'].max()) + min_value = int(df['forecast_avg'].min()) + + return avg_value, max_value, min_value + +def construct_message(avg_aqi, max_aqi, min_aqi): + avg_label = get_aqi_label(avg_aqi) + max_label = get_aqi_label(max_aqi) + min_label = get_aqi_label(min_aqi) + + message = ( + f"📊 Calidad del Aire para Gran Asunción - Próximas 12 hs\n" + f"🔹 AQI Promedio: {avg_aqi} ({avg_label})\n" + f"🔺 AQI Máximo: {max_aqi} ({max_label})\n" + f"🔻 AQI Mínimo: {min_aqi} ({min_label})\n\n" + "🔗 ¡Pronto podrás acceder al pronóstico en tu zona! www.proyectorespira.net" + ) + return message + +def send_message(msg=''): + consumer_key = get_secret_value('TWITTER_CONSUMER_KEY') + consumer_secret = get_secret_value('TWITTER_CONSUMER_SECRET') + access_token = get_secret_value('TWITTER_ACCESS_TOKEN') + access_token_secret = get_secret_value('TWITTER_ACCESS_TOKEN_SECRET') + + client = tweepy.Client(consumer_key=consumer_key, + consumer_secret=consumer_secret, + access_token=access_token, + access_token_secret=access_token_secret) + response = client.create_tweet(text=msg) + + return + +@custom +def transform_custom(data, *args, **kwargs): + + + aqi_summary = get_latest_aqi_summary(data) + if aqi_summary: + avg_aqi, max_aqi, min_aqi = aqi_summary + message = construct_message(avg_aqi, max_aqi, min_aqi) + send_message(msg=message) + return data diff --git a/etl-pipeline/data_loaders/get_latest_forecast_12h.sql b/etl-pipeline/data_loaders/get_latest_forecast_12h.sql new file mode 100644 index 0000000..e2a0f4f --- /dev/null +++ b/etl-pipeline/data_loaders/get_latest_forecast_12h.sql @@ -0,0 +1,12 @@ +-- Docs: https://docs.mage.ai/guides/sql-blocks +WITH inference_run_id AS ( + SELECT id + FROM inference_runs + WHERE run_date BETWEEN ('{{ execution_date }}'::timestamp - INTERVAL '2 hours') + AND ('{{ execution_date }}'::timestamp + INTERVAL '1 hours') + ORDER BY run_date DESC + LIMIT 1 +) +SELECT inference_run, station, forecasts_12h +FROM inference_results +WHERE inference_run = (SELECT id FROM inference_run_id); \ No newline at end of file diff --git a/etl-pipeline/pipelines/telegram_bot/metadata.yaml b/etl-pipeline/pipelines/telegram_bot/metadata.yaml index 5493ee1..70a923c 100755 --- a/etl-pipeline/pipelines/telegram_bot/metadata.yaml +++ b/etl-pipeline/pipelines/telegram_bot/metadata.yaml @@ -1,4 +1,66 @@ -blocks: [] +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + file_path: data_loaders/get_latest_forecast_12h.sql + file_source: + path: data_loaders/get_latest_forecast_12h.sql + limit: 1000 + use_raw_sql: true + downstream_blocks: + - calculate_regional_forecast + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: get_latest_forecast_12h + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: get_latest_forecast_12h +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + export_write_policy: append + downstream_blocks: + - construct_send_telegram_message + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: calculate_regional_forecast + retry_config: null + status: updated + timeout: null + type: transformer + upstream_blocks: + - get_latest_forecast_12h + uuid: calculate_regional_forecast +- all_upstream_blocks_executed: false + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: construct_send_telegram_message + retry_config: null + status: executed + timeout: null + type: custom + upstream_blocks: + - calculate_regional_forecast + uuid: construct_send_telegram_message cache_block_output_in_memory: false callbacks: [] concurrency_config: {} @@ -22,5 +84,5 @@ tags: - bots type: python uuid: telegram_bot -variables_dir: /app/.mage_data/ +variables_dir: /app/.mage_data widgets: [] diff --git a/etl-pipeline/pipelines/twitter_bot/metadata.yaml b/etl-pipeline/pipelines/twitter_bot/metadata.yaml index 96c4134..5b5b945 100755 --- a/etl-pipeline/pipelines/twitter_bot/metadata.yaml +++ b/etl-pipeline/pipelines/twitter_bot/metadata.yaml @@ -1,4 +1,66 @@ -blocks: [] +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + file_path: data_loaders/get_latest_forecast_12h.sql + file_source: + path: data_loaders/get_latest_forecast_12h.sql + limit: 1000 + use_raw_sql: true + downstream_blocks: + - calculate_regional_forecast + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: get_latest_forecast_12h + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: get_latest_forecast_12h +- all_upstream_blocks_executed: true + color: null + configuration: + file_path: transformers/calculate_regional_forecast.py + file_source: + path: transformers/calculate_regional_forecast.py + downstream_blocks: + - construct_send_twitter_message + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: calculate_regional_forecast + retry_config: null + status: executed + timeout: null + type: transformer + upstream_blocks: + - get_latest_forecast_12h + uuid: calculate_regional_forecast +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: construct_send_twitter_message + retry_config: null + status: updated + timeout: null + type: custom + upstream_blocks: + - calculate_regional_forecast + uuid: construct_send_twitter_message cache_block_output_in_memory: false callbacks: [] concurrency_config: {} @@ -22,5 +84,5 @@ tags: - bots type: python uuid: twitter_bot -variables_dir: /app/.mage_data/ +variables_dir: /app/.mage_data widgets: [] diff --git a/etl-pipeline/transformers/calculate_regional_forecast.py b/etl-pipeline/transformers/calculate_regional_forecast.py new file mode 100644 index 0000000..4770222 --- /dev/null +++ b/etl-pipeline/transformers/calculate_regional_forecast.py @@ -0,0 +1,34 @@ +import pandas as pd +if 'transformer' not in globals(): + from mage_ai.data_preparation.decorators import transformer +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@transformer +def transform(data, *args, **kwargs): + klogger = kwargs.get('logger') + try: + if data.empty: + klogger.exception('No available forecasts') + df_exploded = data.explode('forecasts_12h').reset_index(drop=True) + df_exploded[['timestamp', 'value']] = pd.json_normalize(df_exploded['forecasts_12h']) + df_transformed = df_exploded.drop(columns=['forecasts_12h']) + df_transformed['timestamp'] = pd.to_datetime(df_transformed['timestamp'], utc=True) + + df_grouped = ( + df_transformed.groupby(['timestamp'], as_index=False)['value'] + .mean().round() + .rename(columns={'value': 'forecast_avg'}) + ) + return df_grouped + except Exception as e: + klogger.exception(f'An error occurred: {e}') + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' \ No newline at end of file