Address: 37 Appletree Dr, Cherrybrook NSW 2126, Australia

Airflow & SLA Management

blog image

Introduction

Came across this interesting feature of managing SLA’s natively in airflow. Failures can happen not just by an actual failure of a task/pipeline but may a slow running task/pipeline. A slow running task/pipeline may cause downstream tasks or DAGs which depend upon it to fail. This thought got me searching and I decided to write a post about Airflow and SLAs Management. What I found was a simple solution which can help manage failures and delays.

Before you jump into understanding how to manage SLAs in airflow make sure you are familiar with how to create airflow DAGs .

This post is divided into the following parts

Time duration is defined on a task. Airflow will monitor the performance of task/DAG when SLAs are enabled. When SLAs are breached airflow can trigger the following

  • An email notification
  • An action via a callback function call
Note 1: SLAs checks occur only for scheduled tasks only!!! Strange!
Note 2: SLAs monitoring is started from the scheduled time when the DAG is to be triggered! 
Note 3: There can be only one callback function for tasks and/or DAG level

 

Defining SLAs is done in three simple steps in defining SLAs in Airflow

  • Step 1 – Define a callback method
  • Step 2 – Pass the callback method to DAG
  • Step 3 – Define the SLA duration on task(s)

 

Define a callback method

Here is an example below of a simple callback function. Keep in mind to keep the same arguments as show.

def cw_sla_missed_take_action(*args, **kwargs):
# Logs to scheduler logs. Not in the scheduler UI.
# Location $AIRFLOW_HOME/logs/scheduler/
logger.info(“************************************* SLA missed! ***************************************”)
logger.info(args)

Function which prints out a log to the scheduler log. But your method may do a lot more like logging a ticket with ServiceNow or sending a slack notification or something much more complex.

Pass the callback method to DAG object

Here is an example

dag = DAG(‘hello_sla’,
schedule_interval=‘* * * * *’,
default_args=default_args,
catchup=False,
sla_miss_callback=dag_sla_missed_take_action  # callback function
)

 

Line 5 – The last line is where the reference to the callback method is passed to the DAG object. Nothing fancy at all till now!!

Define SLA on task(s)

Finally, we come to a point where you would like to define the SLA for a task. Keep in mind defining SLAs is optional for a task. So in a DAG with multiple tasks, there may be some tasks with SLAs and some without any SLAs. See the complete DAG definition below

# Filename: hello_SLA_tasks.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

def cw_sla_missed_take_action(*args, **kwargs):
# Logs to scheduler logs. Not in the scheduler UI.
# Location $AIRFLOW_HOME/logs/scheduler/
logger.info(“************************************* SLA missed! ***************************************”)
logger.info(args)

default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2020, 12, 14),
’email’: [‘vipin.chadha@gmail.com’],
’email_on_failure’: False,
’email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}

dag = DAG(‘hello_sla_tasks’,
schedule_interval=‘* * * * *’,
default_args=default_args,
catchup=False,
sla_miss_callback=cw_sla_missed_take_action  # callback function
)

next_command = ‘echo   Awake now!’

t1 = BashOperator(
task_id=‘sleep_for_10s’,  # Cause the task to miss the SLA
bash_command=‘sleep 10’,
sla=timedelta(seconds=5),  # SLA for the task
dag=dag
)

t2 = BashOperator(
task_id=‘next_task’,
bash_command=next_command,
dag=dag
)

t2.set_upstream(t1)

 

The first task has a SLA defined but not the next tasks.

When this DAG runs as a scheduled DAG, we will see that the task t1 will be delayed and miss the SLA. It will cause the entry to be created in the scheduler log. Also will generate an email notification to the DAG owner. See below

Additionally, I also got an email notification as I had setup my smtp in airflow.cfg.

Leave a Reply

Your email address will not be published. Required fields are marked *