We have all been there. With tens or hundreds of cron jobs running, and you’re (or your boss is) pulling your hair out trying to figure out why a critical job didn’t run last night. You’re puzzled and confused. Didn’t it run? Did it fail? Why could it have failed?
At AMPATH, our reliance on cron jobs to schedule ETL jobs was becoming increasingly impossible. Our ETL processes involve first denormalizing and flattening data , then use the denormalized data to build calculated tables. The calculated tables then produce real-time reports and decision support systems for the Ministry of Health and clinicians respectively. In the beginning, using cron jobs was a simple and effective way to execute the jobs.
The cron workflow was as follows:
- Run the denormalizing jobs every second minute of the hour
- Run the calculated tables jobs every fifth minute, with the hope that the previous jobs will take only three minutes to complete.
Over time as data and the demand for more reports increased, it became difficult to estimate the time a particular job would take and we more than often run into lock wait timeouts and deadlocks. Cron had reached its limit. We needed a way to:
- Handle complex relationships between jobs.
- Handle all the jobs centrally with a well defined user interface.
- Error reporting and alerting.
- Viewing and analyzing job run times.
- Security (protecting credentials of databases).
Enter Apache Airflow
Upon searching for solutions to improve or replace our ETL workflow, I stumbled upon an open source tool, Apache Airflow. Airflow’s Direct Acryclic Graph (DAG) concept offered a way to build and schedule complex and dynamic data pipelines in an extremely simple, testable and scalable way. And I freaked out! This is exactly what we have been looking for.
We quickly setup airflow using docker-compose like this. And started transforming some of the main ETL jobs we had into python code for Airflow. Here is the github repo for the scripts.
Airflow Concepts
We are going to cover some of the basic concepts to get you started with Airflow. For a more detailed documentation, head over to this official docs site here
DAGs and Operators
In Airflow, all workflows are considered to be DAG’s. You can think of DAG’s as a set of tasks with some sort of relationship. This is how a DAG looks like in Airflow Graph View:
A DAG usually has a schedule, a start time and a unique ID. The tasks inside DAG’s are made of operators. Operators are define what actually run for a particular task. Examples of Operators in Airflow include:
BashOperator
: To execute shell commands/scripts
PythonOperator
: To execute Python code.
You can define a simple DAG that simply prints out ‘Hello World!’ every 10 minutes like this:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperatordefault_args = {
'owner': 'airflow',
'email': ['fali@ampath.or.ke'],
'email_on_failure': True,
'email_on_retry': True,
'email_on_success': False,
'start_date': datetime(2019, 5, 31)
}dag = DAG(
dag_id='hello_world_dag',
default_args=default_args,
schedule_interval= '*/10 * * * *')task1 = BashOperator(
task_id="echo_hello_world"
bash_command='echo Hello World!',
dag=dag)
Extending Operators
What’s even more exciting is you can extend/create custom operators if none of the inbuilt operators match your needs. Here’s an example of how I extended the inbuilt MySQL Operator to return the results of a statement after execution:
from airflow.hooks.mysql_hook import MySqlHookclass CustomMySqlOperator(MySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(
mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_records(self.sql, parameters=self.parameters);....
# Use the custom operator in one of your taskstask = CustomMySqlOperator(
task_id='custom_mysql_task',
sql='select * from person;',
mysql_conn_id='mysql_conn',
database='etl',
dag=dag)
Defining relationships between tasks
Often, you find that some of your tasks need to execute one after another. In airflow you can define relationships like this:
task1 = BashOperator(
task_id="echo_hello_world"
bash_command='echo I will execute first!',
dag=dag)task2 = BashOperator(
task_id="echo_hello_world"
bash_command='echo I will execute second!',
dag=dag)# There are multiple ways you can define this relationship
# Using the bitwise operators or methods
# Option 1
task1 >> task2# Option 2
task2 << task1# Option 3
task1.set_downstream(task2)# Option 4
task2.set_upstream(task1)
Branching
Sometimes you want to execute tasks depending on certain conditions. That is very possible using airflow, here’s an example:
...run_task = BashOperator(
task_id="echo_hello_world"
bash_command='echo Hello World!',
dag=dag)sleep = BashOperator(
task_id="echo_hello_world"
bash_command='sleep 1m',
dag=dag)### function decides which task to run depending on the time by returning the task_iddef decide_path():
now = datetime.now(timezone('Africa/Nairobi'))
if now.hour >= 19:
return "run_task"
else:
return "sleep"branch = BranchPythonOperator(
task_id='check_time',
python_callable=decide_path,
dag=dag)...
Connections
One of the best things about Airflow is Security. Airflow handles and encrypts your credentials for you so you will never have to do that by yourself or include it in your code.
Once you add and save the credentials under connections, you can access it simply by adding the connection id in your operator when defining your task.
Conclusion
Airflow is a game changer when it comes to scheduling and monitoring workflows. In this article, I have only covered the basic concepts that helped me get started, but there’s a lot more that I haven’t covered. Feel free to dig deeper and reach out.