Moving from Cron To Apache Airflow

Fatma Ali
4 min readSep 6, 2019
It’s time to say goodbye, Cron

We have all been there. With tens or hundreds of cron jobs running, and you’re (or your boss is) pulling your hair out trying to figure out why a critical job didn’t run last night. You’re puzzled and confused. Didn’t it run? Did it fail? Why could it have failed?

At AMPATH, our reliance on cron jobs to schedule ETL jobs was becoming increasingly impossible. Our ETL processes involve first denormalizing and flattening data , then use the denormalized data to build calculated tables. The calculated tables then produce real-time reports and decision support systems for the Ministry of Health and clinicians respectively. In the beginning, using cron jobs was a simple and effective way to execute the jobs.

The cron workflow was as follows:

  1. Run the denormalizing jobs every second minute of the hour
  2. Run the calculated tables jobs every fifth minute, with the hope that the previous jobs will take only three minutes to complete.

Over time as data and the demand for more reports increased, it became difficult to estimate the time a particular job would take and we more than often run into lock wait timeouts and deadlocks. Cron had reached its limit. We needed a way to:

  1. Handle complex relationships between jobs.
  2. Handle all the jobs centrally with a well defined user interface.
  3. Error reporting and alerting.
  4. Viewing and analyzing job run times.
  5. Security (protecting credentials of databases).

Enter Apache Airflow

Upon searching for solutions to improve or replace our ETL workflow, I stumbled upon an open source tool, Apache Airflow. Airflow’s Direct Acryclic Graph (DAG) concept offered a way to build and schedule complex and dynamic data pipelines in an extremely simple, testable and scalable way. And I freaked out! This is exactly what we have been looking for.

We quickly setup airflow using docker-compose like this. And started transforming some of the main ETL jobs we had into python code for Airflow. Here is the github repo for the scripts.

Airflow Concepts

We are going to cover some of the basic concepts to get you started with Airflow. For a more detailed documentation, head over to this official docs site here

DAGs and Operators

In Airflow, all workflows are considered to be DAG’s. You can think of DAG’s as a set of tasks with some sort of relationship. This is how a DAG looks like in Airflow Graph View:

An example DAG

A DAG usually has a schedule, a start time and a unique ID. The tasks inside DAG’s are made of operators. Operators are define what actually run for a particular task. Examples of Operators in Airflow include:

BashOperator: To execute shell commands/scripts

PythonOperator: To execute Python code.

You can define a simple DAG that simply prints out ‘Hello World!’ every 10 minutes like this:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'email': ['fali@ampath.or.ke'],
'email_on_failure': True,
'email_on_retry': True,
'email_on_success': False,
'start_date': datetime(2019, 5, 31)
}
dag = DAG(
dag_id='hello_world_dag',
default_args=default_args,
schedule_interval= '*/10 * * * *')
task1 = BashOperator(
task_id="echo_hello_world"
bash_command='echo Hello World!',
dag=dag)

Extending Operators

What’s even more exciting is you can extend/create custom operators if none of the inbuilt operators match your needs. Here’s an example of how I extended the inbuilt MySQL Operator to return the results of a statement after execution:

from airflow.hooks.mysql_hook import MySqlHookclass CustomMySqlOperator(MySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(
mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_records(self.sql, parameters=self.parameters);
....
# Use the custom operator in one of your tasks
task = CustomMySqlOperator(
task_id='custom_mysql_task',
sql='select * from person;',
mysql_conn_id='mysql_conn',
database='etl',
dag=dag)

Defining relationships between tasks

Often, you find that some of your tasks need to execute one after another. In airflow you can define relationships like this:

task1 = BashOperator(
task_id="echo_hello_world"
bash_command='echo I will execute first!',
dag=dag)
task2 = BashOperator(
task_id="echo_hello_world"
bash_command='echo I will execute second!',
dag=dag)
# There are multiple ways you can define this relationship
# Using the bitwise operators or methods
# Option 1
task1 >> task2
# Option 2
task2 << task1
# Option 3
task1.set_downstream(task2)
# Option 4
task2.set_upstream(task1)

Branching

Sometimes you want to execute tasks depending on certain conditions. That is very possible using airflow, here’s an example:

...run_task = BashOperator(
task_id="echo_hello_world"
bash_command='echo Hello World!',
dag=dag)
sleep = BashOperator(
task_id="echo_hello_world"
bash_command='sleep 1m',
dag=dag)
### function decides which task to run depending on the time by returning the task_iddef decide_path():
now = datetime.now(timezone('Africa/Nairobi'))
if now.hour >= 19:
return "run_task"
else:
return "sleep"
branch = BranchPythonOperator(
task_id='check_time',
python_callable=decide_path,
dag=dag)
...

Connections

One of the best things about Airflow is Security. Airflow handles and encrypts your credentials for you so you will never have to do that by yourself or include it in your code.

Airflow connections tab

Once you add and save the credentials under connections, you can access it simply by adding the connection id in your operator when defining your task.

Conclusion

Airflow is a game changer when it comes to scheduling and monitoring workflows. In this article, I have only covered the basic concepts that helped me get started, but there’s a lot more that I haven’t covered. Feel free to dig deeper and reach out.

--

--

Fatma Ali

Software Engineer, passionate about code, food & art. Bollywood dancer sometimes💃 Find me on GitHub https://github.com/fatmali