Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
Note (2025): Bài viết này viết cho Airflow 1.x. Trong Airflow 2.x, import path đã thay đổi:
airflow.contrib.operators.slack_webhook_operator→airflow.providers.slack.operators.slack_webhook. Khái niệm và cách setup vẫn tương tự.
1. Slack Incoming Webhooks và Airflow Connection
Truy cập Slack App Directory tìm Incoming Webhooks: https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks

Ở mục Post to Channel chọn Channel, sau đó bấm Add Incoming Webhooks integration

Sau đó bạn sẽ nhận được 1 URL có dạng: https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

Vào Airflow > Admin > Connections để thêm một connection mới
- Conn Id:
Slack - Conn Type:
HTTP - Host:
https://hooks.slack.com/services - Password:
/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

2. Slack alert Utils
Tạo file utils chứa function alert, ví dụ: /dags/utils/slack_alert.py
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag_id=context.get('dag').dag_id,
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
3. Config Slack alert cho từng DAG
Với mỗi DAG muốn alert, ta thêm thuộc tính on_failure_callback cho mỗi DAG. Ví dụ như dưới dây:
example_dag.py
from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert
default_args = {
**params['default_args'],
'owner': DAG_OWNER,
'on_failure_callback': task_fail_slack_alert,
...
}
dag = DAG('dag_id', default_args=default_args)
...
Kết quả:

Tham khảo
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- airflow.operators.slack_operator: https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Chúc các bạn thành công.