Skip to content

Commit

Permalink
telegram and twitter bots
Browse files Browse the repository at this point in the history
  • Loading branch information
vnbl committed Oct 20, 2024
1 parent 897d0ee commit 7d8675e
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 4 deletions.
84 changes: 84 additions & 0 deletions etl-pipeline/custom/construct_send_telegram_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import telebot
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'custom' not in globals():
from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test

AQI_LEVELS = [
(0, 50, "Bueno 😁"), # Good
(51, 100, "Moderado 🙂"), # Moderate
(101, 150, "Insalubre para grupos sensibles 😷"), # Unhealthy for sensitive groups
(151, 200, "Insalubre 😶‍🌫️"), # Unhealthy
(201, 300, "Muy insalubre 😰"), # Very Unhealthy
(301, 500, "Peligroso 💀") # Hazardous
]

AQI_MESSAGES = [
(0, 50,"🟢 ¡Hoy es un excelente día para estar al aire libre! 🏃‍♂️🍃"), # Good
(51, 100, "🟡 Hoy es un buen día para estar al aire libre para la mayoría de las personas. 👍🏻"), # Moderate
(101, 150, "🟠 Las personas sensibles deben prestar atención a síntomas 😷"), # Unhealthy for sensitive groups
(151, 200, "🔴 Hoy es un mal día para estar al aire libre.\n Se recomienda el uso de tapabocas para la población general💨😷"), # Unhealthy
(201, 300, "🟣 Hoy es un muy mal día para estar al aire libre. \n Se recomienda el uso de tapabocas para la población general🔥😷"), # Very Unhealthy
(301, 500, "⚫ Evite la exposición al aire libre. 💀😷\nAnte la aparición de síntomas, consulte a su médico.") # Hazardous

]

def get_aqi_label(aqi_value):
"""Returns the AQI level label for a given AQI value."""
for min_val, max_val, label in AQI_LEVELS:
if min_val <= aqi_value <= max_val:
return label
return "Fuera del AQI 💀🤯"

def get_aqi_message(aqi_value):
for min_val, max_val, message in AQI_MESSAGES:
if min_val <= aqi_value <= max_val:
return message
return " "

def get_latest_aqi_summary(df):
avg_value = int(df['forecast_avg'].mean())
max_value = int(df['forecast_avg'].max())
min_value = int(df['forecast_avg'].min())

return avg_value, max_value, min_value

def construct_message(avg_aqi, max_aqi, min_aqi):
avg_label = get_aqi_label(avg_aqi)
max_label = get_aqi_label(max_aqi)
min_label = get_aqi_label(min_aqi)
aqi_message = get_aqi_message(avg_aqi)

message = (
f"📊 **Reporte de Calidad del Aire para Gran Asunción - Pronóstico de 12 horas**\n"
f"🔹 **AQI Promedio:** {avg_aqi} ({avg_label})\n"
f"🔺 **AQI Máximo:** {max_aqi} ({max_label})\n"
f"🔻 **AQI Mínimo:** {min_aqi} ({min_label})\n\n"
f"{aqi_message}\n\n"
"🔗 Podés acceder al pronóstico actualizado en tu zona ingresando a www.proyectorespira.net"
)
return message

def send_message(token, chat_id, msg=''):
bot = telebot.TeleBot(token)
return bot.send_message(chat_id=chat_id, text=msg, parse_mode='Markdown')

@custom
def transform_custom(data, *args, **kwargs):
telegram_token = get_secret_value('TELEGRAM_BOT_TOKEN')
telegram_chat_id = get_secret_value('TELEGRAM_CHAT_ID')
aqi_summary = get_latest_aqi_summary(data)
if aqi_summary:
avg_aqi, max_aqi, min_aqi = aqi_summary
message = construct_message(avg_aqi, max_aqi, min_aqi)
send_message(token=telegram_token, chat_id=telegram_chat_id, msg=message)
return data


@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
68 changes: 68 additions & 0 deletions etl-pipeline/custom/construct_send_twitter_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import tweepy
from mage_ai.data_preparation.shared.secrets import get_secret_value

if 'custom' not in globals():
from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test

AQI_LEVELS = [
(0, 50, "🟢 Bueno 😁"), # Good
(51, 100, "🟡 Moderado 🙂"), # Moderate
(101, 150, "🟠 Insalubre para grupos sensibles 😷"), # Unhealthy for sensitive groups
(151, 200, "🔴 Insalubre 😶‍🌫️"), # Unhealthy
(201, 300, "🟣 Muy insalubre 😰"), # Very Unhealthy
(301, 500, "⚫ Peligroso 💀") # Hazardous
]

def get_aqi_label(aqi_value):
for min_val, max_val, label in AQI_LEVELS:
if min_val <= aqi_value <= max_val:
return label
return "Fuera del AQI 💀🤯"

def get_latest_aqi_summary(df):
avg_value = int(df['forecast_avg'].mean())
max_value = int(df['forecast_avg'].max())
min_value = int(df['forecast_avg'].min())

return avg_value, max_value, min_value

def construct_message(avg_aqi, max_aqi, min_aqi):
avg_label = get_aqi_label(avg_aqi)
max_label = get_aqi_label(max_aqi)
min_label = get_aqi_label(min_aqi)

message = (
f"📊 Calidad del Aire para Gran Asunción - Próximas 12 hs\n"
f"🔹 AQI Promedio: {avg_aqi} ({avg_label})\n"
f"🔺 AQI Máximo: {max_aqi} ({max_label})\n"
f"🔻 AQI Mínimo: {min_aqi} ({min_label})\n\n"
"🔗 ¡Pronto podrás acceder al pronóstico en tu zona! www.proyectorespira.net"
)
return message

def send_message(msg=''):
consumer_key = get_secret_value('TWITTER_CONSUMER_KEY')
consumer_secret = get_secret_value('TWITTER_CONSUMER_SECRET')
access_token = get_secret_value('TWITTER_ACCESS_TOKEN')
access_token_secret = get_secret_value('TWITTER_ACCESS_TOKEN_SECRET')

client = tweepy.Client(consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token=access_token,
access_token_secret=access_token_secret)
response = client.create_tweet(text=msg)

return

@custom
def transform_custom(data, *args, **kwargs):


aqi_summary = get_latest_aqi_summary(data)
if aqi_summary:
avg_aqi, max_aqi, min_aqi = aqi_summary
message = construct_message(avg_aqi, max_aqi, min_aqi)
send_message(msg=message)
return data
12 changes: 12 additions & 0 deletions etl-pipeline/data_loaders/get_latest_forecast_12h.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Docs: https://docs.mage.ai/guides/sql-blocks
WITH inference_run_id AS (
SELECT id
FROM inference_runs
WHERE run_date BETWEEN ('{{ execution_date }}'::timestamp - INTERVAL '2 hours')
AND ('{{ execution_date }}'::timestamp + INTERVAL '1 hours')
ORDER BY run_date DESC
LIMIT 1
)
SELECT inference_run, station, forecasts_12h
FROM inference_results
WHERE inference_run = (SELECT id FROM inference_run_id);
66 changes: 64 additions & 2 deletions etl-pipeline/pipelines/telegram_bot/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,66 @@
blocks: []
blocks:
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: postgres
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
file_path: data_loaders/get_latest_forecast_12h.sql
file_source:
path: data_loaders/get_latest_forecast_12h.sql
limit: 1000
use_raw_sql: true
downstream_blocks:
- calculate_regional_forecast
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: get_latest_forecast_12h
retry_config: null
status: executed
timeout: null
type: data_loader
upstream_blocks: []
uuid: get_latest_forecast_12h
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: postgres
data_provider_profile: default
export_write_policy: append
downstream_blocks:
- construct_send_telegram_message
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: calculate_regional_forecast
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- get_latest_forecast_12h
uuid: calculate_regional_forecast
- all_upstream_blocks_executed: false
color: null
configuration: {}
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: construct_send_telegram_message
retry_config: null
status: executed
timeout: null
type: custom
upstream_blocks:
- calculate_regional_forecast
uuid: construct_send_telegram_message
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
Expand All @@ -22,5 +84,5 @@ tags:
- bots
type: python
uuid: telegram_bot
variables_dir: /app/.mage_data/
variables_dir: /app/.mage_data
widgets: []
66 changes: 64 additions & 2 deletions etl-pipeline/pipelines/twitter_bot/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,66 @@
blocks: []
blocks:
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: postgres
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
file_path: data_loaders/get_latest_forecast_12h.sql
file_source:
path: data_loaders/get_latest_forecast_12h.sql
limit: 1000
use_raw_sql: true
downstream_blocks:
- calculate_regional_forecast
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: get_latest_forecast_12h
retry_config: null
status: executed
timeout: null
type: data_loader
upstream_blocks: []
uuid: get_latest_forecast_12h
- all_upstream_blocks_executed: true
color: null
configuration:
file_path: transformers/calculate_regional_forecast.py
file_source:
path: transformers/calculate_regional_forecast.py
downstream_blocks:
- construct_send_twitter_message
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: calculate_regional_forecast
retry_config: null
status: executed
timeout: null
type: transformer
upstream_blocks:
- get_latest_forecast_12h
uuid: calculate_regional_forecast
- all_upstream_blocks_executed: true
color: null
configuration: {}
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: construct_send_twitter_message
retry_config: null
status: updated
timeout: null
type: custom
upstream_blocks:
- calculate_regional_forecast
uuid: construct_send_twitter_message
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
Expand All @@ -22,5 +84,5 @@ tags:
- bots
type: python
uuid: twitter_bot
variables_dir: /app/.mage_data/
variables_dir: /app/.mage_data
widgets: []
34 changes: 34 additions & 0 deletions etl-pipeline/transformers/calculate_regional_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@transformer
def transform(data, *args, **kwargs):
klogger = kwargs.get('logger')
try:
if data.empty:
klogger.exception('No available forecasts')
df_exploded = data.explode('forecasts_12h').reset_index(drop=True)
df_exploded[['timestamp', 'value']] = pd.json_normalize(df_exploded['forecasts_12h'])
df_transformed = df_exploded.drop(columns=['forecasts_12h'])
df_transformed['timestamp'] = pd.to_datetime(df_transformed['timestamp'], utc=True)

df_grouped = (
df_transformed.groupby(['timestamp'], as_index=False)['value']
.mean().round()
.rename(columns={'value': 'forecast_avg'})
)
return df_grouped
except Exception as e:
klogger.exception(f'An error occurred: {e}')


@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'

0 comments on commit 7d8675e

Please sign in to comment.