February 13, 2019
Add sentry to your airflow
Since this pull request in airflow github repository, it is possible to send errors to sentry. This way you get a sentry alert every time an error or a SIGTERM happens.
My configuration
dags/notify_sentry.py
import os
import sentry_sdk
from sentry_sdk import capture_exception, configure_scope
# requires sentry-sdk==0.7.0
def sentry_capture_exception(context):
"""
Sends Error to Sentry.
op = BashOperator(
dag=my_dag,
task_id='my_task',
python_callable=my_python_job,
on_failure_callback=sentry_capture_exception
)
"""
dsn = os.getenv('SENTRY_DSN')
sentry_sdk.init(dsn)
error = context.get('exception')
with configure_scope() as scope:
scope.set_extra('dag', context.get('dag'))
scope.set_extra('dag_run', context.get('dag_run'))
scope.set_extra('ds', context.get('ds'))
scope.set_extra('end_date', context.get('end_date'))
scope.set_extra('execution_date', context.get('execution_date'))
scope.set_extra('latest_date', context.get('latest_date'))
scope.set_extra('params', context.get('params'))
scope.set_extra('run_id', context.get('run_id'))
scope.set_extra('task', context.get('task'))
scope.set_extra('task_instance', context.get('task_instance'))
scope.set_extra('ti', context.get('ti'))
scope.set_extra('ts', context.get('ts'))
scope.set_extra('yesterday_ds', context.get('yesterday_ds'))
capture_exception(error)
then in your dag:
dags/my_dag.py
from notify_sentry import sentry_capture_exception
...
op = BashOperator(
dag=my_dag,
task_id='my_task',
python_callable=my_python_job,
on_failure_callback=sentry_capture_exception
)
Sometimes, you might need to add provide_context=True
to the operator parameters.