Posted on 

 by 

 in 

Scheduling Tasks with Airflow

As hosting data and servers in the cloud have become more popular and data analysis have become more important to businesses, the tools and ecosystem to accommodate these tasks have become more sophisticated and robust as well.  In this post, we look at Apache Airflow, a workload scheduler that can be used to schedule tasks.

Problem Statement

A business process oftentimes has a series of tasks that need to be run, some in parallel and some serially, with some tasks requiring other tasks to finish before they can start.  As the number of tasks increases, running the tasks in the right order, while handling failures, and allowing the end user to intervene becomes more difficult.  Workflow schedulers are made to solve this problem.  One example of a workflow scheduler is Apache Airflow.

Advantages and Disadvantages

+ Out of the box scheduler so you don’t have to program those functions yourself

+ Handles failures and retries automatically

+ Can run on Kubernetes, which can scale automatically, allowing computing resources to be scaled up and down as necessary

+ Dependencies between tasks are specified in Python code

+ Comes with out of the box solutions to interface with various databases

+ Open Source

– Can only use Python

– UI is not nearly as nice as some other workflow schedulers

– Have found it to be buggier than some other workflow schedulers, though less and less as time has gone on

– Open Source, so if there are bugs, you may be the one digging through the Airflow code to find it

Airflow DAGs

Airflow has the concept of a DAG, which essentially is a group of tasks that must be run, with or without dependencies.  Below is an example of a “demo” DAG that was created (along with the default Airflow sample DAGs), as seen from the Web UI:

Airflow UI – Main Screen

Each DAG can be scheduled to run at different times and with different time intervals (i.e. daily @ 12:00 pm, monthly @ 1:00 pm, etc). The scheduling is done in code using standard cron convention.

Clicking on the “demo” DAG brings you to a page showing the DAG itself.

The “demo” DAG

Here, we have created 5 dummy tasks. “print_a” runs first, followed by “print_b” and “wait_ten_min”, followed by “print_c” and finally “print_d.”

“print_b” and “wait_ten_min” can run concurrently, and both must finish before “print_c” can run. Note that with Airflow, you can set it up to run on a Kubernetes cluster and have it scale your compute resources automatically. This makes managing large pipelines with many concurrent tasks much easier.

Python Code

Let’s take a look at the Python code for this dummy example.

 
import datetime
import time
from pathlib import Path
from airflow import DAG
from airflow.operators.python_
operator import PythonOperator

cwd = Path(__file__).parent

def print_a():
    print "a"

def print_b():
    print "b"

def print_c():
    print "c"

def wait_ten_min():
    time.sleep(600)

def print_d():
    print "d"

demo_dag = DAG(
    dag_id="demo",
   start_date=datetime.datetime(2019, 1, 1),
    schedule_interval="0 0 * * 1-5",
    catchup=False,
   template_searchpath="{}/".format(cwd.joinpath("sql"))
)

print_a_op = PythonOperator(
    task_id="print_a",
    python_callable=print_a,
    dag=demo_dag)

print_b_op = PythonOperator(
    task_id="print_b",
    python_callable=print_b,
    dag=demo_dag)

print_c_op = PythonOperator(
    task_id="print_c",
    python_callable=print_c,
    dag=demo_dag)

wait_ten_min_op = PythonOperator(
    task_id="wait_ten_min",
    python_callable=wait_ten_min,
    dag=demo_dag)

print_d_op = PythonOperator(
    task_id="print_d",
    python_callable=print_d,
    dag=demo_dag)

print_a_op >> print_b_op
print_a_op >> wait_ten_min_op

print_b_op >> print_c_op
wait_ten_min_op >> print_c_op

print_c_op >> print_d_op

Here, each task is a PythonOperator, which allows us to call a Python function. There are many other operators that are useful in the real world, including ones that operate with the large cloud providers. For example, the BigQueryOperator runs a BigQuery SQL statement and outputs the result into a BigQuery table. If you are running an ETL pipeline, your DAG could include a large number of BigQueryOperators to transform your data.

At the bottom of the sample code is where we specify dependencies. We use the “>>” and “<<” operators to specify which tasks depend on which. When the Airflow scheduler starts up, it parses all your Python DAG files and creates the dependencies in the right order.

Putting it Together

We have shown a trivial example of tasks being scheduled and run. In real world applications, each step could take a long time and failure could potentially occur at many different points. DAGs are generally much larger and the tasks to perform are obviously more complicated. Among other things, Airflow makes it much easier to schedule and run tasks and debug any issues that occur.

We have barely scratched the surface of Airflow.  Here, we only have one thread running all the tasks.  In a production environment where Airflow is heavily used, it will likely be running on Kubernetes, scaling up and down machines as necessary, and running many tasks in parallel.  If running in one of the cloud service providers, such as GCP, machines can be set to run in different zones for greater redundancy.  Configuration settings can be set to automatically write log files to Amazon S3 or Google GCS.  Automatic retries for failed tasks can be set and notifications by email or Slack can be sent.

Discover more from Sparkle Technologies

Subscribe now to keep reading and get access to the full archive.

Continue reading