Table of contents
Abstract
For quick set up and to start learning Apache Airflow, we will deploy airflow using docker-compose and running on AWS EC2
Table Of Contents
๐ Introduction
The docker-compose.yaml
contains several service definitions: - airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
airflow-webserver
- The webserver is available at http://localhost:8080.airflow-worker
- The worker that executes the tasks given by the scheduler. -airflow-init
- The initialization service. - flower - The flower app for monitoring the environment. It is available at http://localhost:5555.postgres
- The database.redis - The redis - broker that forwards messages from scheduler to worker.
Some directories in the container are mounted, which means that their contents are synchronized between the services and persistent.
./dags
- you can put your DAG files here../logs
- contains logs from task execution and scheduler../plugins
- you can put your custom plugins here.
๐ Additional PIP requirements
airflow image contains almost enough PIP packages for operating, but we still need to install extra packages such as
clickhouse-driver
,pandahouse
and apache-airflow-providers-slack.Airflow from 2.1.1 supports ENV PIPADDITIONAL_REQUIREMENTS to add additional requirements when starting all containers
_PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
- It's not recommended to use this way on production, we should build our own image which contains all necessary pip packages then push to AWS ECR
๐ How to build customised airflow image
Build your own image.
requirements.txt
pandahouse
clickhouse-driver
Dockerfile
FROM apache/airflow:2.1.2-python3.9
COPY requirements.txt .
RUN pip install -r requirements.txt
docker build -t my-airlfow .
๐ Persistent airflow log, dags, and plugins
Not only persistent the folders but also share these between scheduler, worker and web-server
volumes:
- /mnt/airflow/dags:/opt/airflow/dags
- /mnt/airflow/logs:/opt/airflow/logs
- /mnt/airflow/plugins:/opt/airflow/plugins
- /mnt/airflow/data:/opt/airflow/data
๐ Using git-sync to up-to-date DAGs
The git-sync service will poll the registered project each 10s to clone the new commit to /dags
We use HTTP method and Access key token to provide permission for the container
af-gitsync:
container_name: af-gitsync
image: k8s.gcr.io/git-sync/git-sync:v3.2.2
environment:
- GIT_SYNC_REV=HEAD
- GIT_SYNC_DEPTH=1
- GIT_SYNC_USERNAME=airflow
- GIT_SYNC_MAX_FAILURES=0
- GIT_KNOWN_HOSTS=false
- GIT_SYNC_DEST=repo
- GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
- GIT_SYNC_WAIT=60
- GIT_SYNC_TIMEOUT=120
- GIT_SYNC_ADD_USER=true
- GIT_SYNC_PASSWORD=
- GIT_SYNC_ROOT=/dags
- GIT_SYNC_BRANCH=master
volumes:
- /mnt/airflow/dags:/dags
๐ How to run
- Initializing Environment
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
- Prepare
docker-compose.yaml
docker-compose.yaml
version:
- Run airflow-init to setup airflow database
docker-compose up airflow-init
- Running airflow - Up
docker-compose up -d
- Read more Extension fields to understand docker-compose.yaml contents
๐ Add airflow connectors
- Add slack connection
airflow connections add 'airflow-slack' \
--conn-type 'http'
--conn-password '/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' \
--conn-host 'https://hooks.slack.com/services'
- We can add a connector in UI
- We can add connection through env file eg.
.env
in docker-compose
AIRFLOW_CONN_MY_POSTGRES_CONN: 'postgresql://airflow:airflow@postgres-pg:5432/airflow'
AIRFLOW_CONN_AIRFLOW_SLACK_CONN: 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
๐ Understand airflow parameters in airflow.models
-
Context
on_failure_callback
(TaskStateChangeCallback) โ a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
on_execute_callback
(TaskStateChangeCallback) โ much like the on_failure_callback except that it is executed right before the task is executed.
on_retry_callback
(TaskStateChangeCallback) โ much like the on_failure_callback except that it is executed when retries occur.
on_success_callback
(TaskStateChangeCallback) โ much like the on_failure_callback except that it is executed when the task succeeds.
trigger_rule
(str) โ defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule