LogoDuyệtSr. Data Engineer
HomeAboutPhotosInsightsCV

Footer

Logo

Resources

  • Rust Tiếng Việt
  • /archives
  • /series
  • /tags
  • Status

me@duyet.net

  • About
  • LinkedIn
  • Resume
  • Projects

© 2026 duyet.net | Sr. Data Engineer

Airflow - một số ghi chép

Note: This post is over 7 years old. The information may be outdated.

Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.

Note (2025): Bài viết này viết cho Airflow 1.x. Một số API và best practices có thể đã thay đổi trong Airflow 2.x, nhưng các khái niệm cơ bản vẫn áp dụng được.

  • Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless).

    • Tránh sử dụng global variables, random values, hardware timers.
  • Một số tính năng nên biết

    • depends_on_past sử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.
    • LatestOnlyOperator để skip một số bước phía sau nếu một số task bị trễ.
    • BranchPythonOperator cho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
  • Sử dụng airflow test <dag-id> <task-id> ... để test task instance trên local khi code.

  • Sử dụng Docker Compose để thiết lập môi trường local cho dễ.

  • Để test DAG với scheduler, hãy set schedule_interval=@once, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnh airflow clear

  • Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi start_date hoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi tên dag_id nếu muốn thay đổi start_date hoặc interval.

  • Sử dụng Bitshift thay vì set_upstream() and set_downstream() để code dễ nhìn hơn, ví dụ

    op1 >> op2
    # tương đương: op1.set_downstream(op2)
    
    op1 >> op2 >> op3 << op4
    # tương đương:
    #    op1.set_downstream(op2)
    #    op2.set_downstream(op3)
    #    op3.set_upstream(op4)
    
    op1 >> [op2, op3] >> op4
    # tương đương
    #    op1 >> op2
    #    op1 >> op3
    #    op2 >> op4
    #    op3 >> op4
    # hoặc tương đương
    #    op1.set_downstream([op2, op3])
    #    op2.set_downstream(op4)
    #    op3.set_downstream(op4)
    
  • Sử dụng Variables để lưu trữ params của DAGs (Admin -> Variables)

    from airflow.models import Variable
    foo = Variable.get("foo")
    bar = Variable.get("bar", deserialize_json=True)
    baz = Variable.get("baz", default_var=None)
    

    hoặc sử dụng variable trong jinja template:

    echo {{ var.value.<variable_name> }}
    
  • Sử dụng Slack để nhận thông báo lỗi

  • Sử dụng default arguments để tránh lặp lại các tham số

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'params': { 'foo': 'baz' }
    }
    
    with DAG(dag_id='airflow', default_args=default_args):
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1')
        op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2')
        op1 >> op2
    
  • Lưu password, token trong Connections

    from airflow.hooks.base_hook import BaseHook
    aws_token = BaseHook.get_connection('aws_token').password
    
  • Có thể generate DAG một cách tự động, ví dụ

    def create_dag(id):
        dag = DAG(f'dag_job_{id}', default_args)
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag)
        ...
        return dag
    
    for i in range(100):
        globals()[f'dag_job_{id}'] = create_dag(id)
    
Aug 27, 2019·6 years ago
|Data Engineering|
Apache AirflowDataData Engineering
|Edit|