Skip to content

Commit

Permalink
python code for chap 9 and learn how to test airflow dag
Browse files Browse the repository at this point in the history
  • Loading branch information
ssupecial committed Oct 12, 2024
1 parent 1690a73 commit 612c19e
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 0 deletions.
11 changes: 11 additions & 0 deletions airflow/chap9/dags/bash_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow import DAG
from airflow.operators.bash import BashOperator
import datetime as dt
with DAG(
dag_id="bash_command",
start_date=dt.datetime(2024, 10,11),
schedule_interval="@once"
):
BashOperator(
task_id="this_should_fail"
)
14 changes: 14 additions & 0 deletions airflow/chap9/dags/cycle_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from airflow.operators.dummy import DummyOperator
from airflow import DAG
import airflow.utils.dates

with DAG(
dag_id="cycle_dag",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@once"
) as dag:
t1 = DummyOperator(task_id="t1")
t2 = DummyOperator(task_id="t2")
t3 = DummyOperator(task_id="t3")

t1 >> t2 >> t3 >> t1
212 changes: 212 additions & 0 deletions airflow/chap9/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
x-airflow-common:
&airflow-common
build:
context: ./docker/airflow
dockerfile: Dockerfile
args:
AIRFLOW_UID: ${AIRFLOW_UID}
environment:
&airflow-environment
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__CORE_LOAD_DEFAULT_CONNECTIONS: 'false'
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__STORE_DAG_CODE: "true"
AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "true"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "true"
AIRFLOW_CONN_MY_POSTGRES: postgresql://airflow:airflow@wiki_results:5432/airflow
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-postgres apache-airflow-providers-docker pytest
volumes:
&airflow-volumes
- ${AIRFLOW_PROJ_DIR:-.}/dags/:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/config/:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/tests:/opt/airflow/tests
- ${AIRFLOW_PROJ_DIR:-.}/mypackage:/opt/airflow/mypackage
user: "${AIRFLOW_UID}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
networks:
&airflow-networks
- airflow

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks: *airflow-networks

redis:
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
networks: *airflow-networks

init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-environment
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- .:/sources

scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
init:
condition: service_completed_successfully

triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
init:
condition: service_completed_successfully

worker:
<<: *airflow-common
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-environment
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
init:
condition: service_completed_successfully

webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
init:
condition: service_completed_successfully

volumes:
postgres-db-volume:

networks:
airflow:
name: airflow
20 changes: 20 additions & 0 deletions airflow/chap9/docker/airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Base image (Airflow)
FROM apache/airflow:2.10.2

# Set build arguments for UID and GID (use default 50000 if not provided)
ARG AIRFLOW_UID

# Set environment variables (if needed)
ENV AIRFLOW_HOME=/opt/airflow

USER root

# Modify airflow user and group to match the provided UID and GID
RUN usermod -u $AIRFLOW_UID -g 0 airflow

# Create required directories with appropriate permissions
RUN mkdir -p /opt/airflow/logs /opt/airflow/dags /opt/airflow/config /opt/airflow/plugins && \
chown -R airflow: /opt/airflow/logs /opt/airflow/dags /opt/airflow/config /opt/airflow/plugins

# Set user to airflow (with modified UID and GID)
USER airflow
27 changes: 27 additions & 0 deletions airflow/chap9/tests/dags/test_dag_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import glob
import importlib.util
import os

import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import check_cycle

DAG_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)

dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects

for dag in dag_objects:
# Test cycles
check_cycle(dag)

0 comments on commit 612c19e

Please sign in to comment.