XÂY DỰNG WORKFLOW ĐẦU TIÊN VỚI AIRFLOW 3.1.0
Chào mừng bạn đến với thế giới đầy tiềm năng của Apache Airflow! Trong hướng dẫn này, chúng ta sẽ cùng nhau khám phá những khái niệm cốt lõi của Airflow, từng bước xây dựng workflow đầu tiên của bạn. Dù bạn đã quen thuộc với Python hay mới bắt đầu hành trình này, chúng tôi cam kết sẽ biến trải nghiệm học tập của bạn trở nên thú vị và dễ dàng.
Workflow là gì? (DAG - Directed Acyclic Graph)
Về cơ bản, một workflow (trong Airflow gọi là DAG – Directed Acyclic Graph) là một tập hợp các tác vụ được sắp xếp theo một cách đặc biệt, phản ánh mối quan hệ và sự phụ thuộc giữa chúng. Hãy hình dung nó như một bản đồ chi tiết cho quy trình làm việc của bạn, nơi mỗi tác vụ được kết nối với các tác vụ khác. Đừng lo lắng nếu khái niệm này nghe có vẻ phức tạp; chúng ta sẽ cùng nhau làm rõ từng bước một.
Định nghĩa Pipeline mẫu
Hãy bắt đầu với một ví dụ pipeline đơn giản. Ban đầu, nó có thể trông hơi choáng ngợp, nhưng chúng ta sẽ đi sâu vào giải thích từng dòng code một.
airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
Hiểu về file định nghĩa DAG
Hãy xem script Python của Airflow như một file cấu hình, nơi bạn phác thảo cấu trúc của DAG bằng code. Các tác vụ thực tế mà bạn định nghĩa ở đây sẽ chạy trong một môi trường khác, có nghĩa là script này không dành cho việc xử lý dữ liệu. Nhiệm vụ chính của nó là định nghĩa đối tượng DAG, và nó cần được đánh giá nhanh chóng vì bộ xử lý file DAG kiểm tra nó thường xuyên để tìm bất kỳ thay đổi nào.
Nhập các module cần thiết
Để bắt đầu, chúng ta cần nhập các thư viện cần thiết. Đây là bước đầu tiên điển hình trong bất kỳ script Python nào.
airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
Để biết thêm chi tiết về cách Python và Airflow xử lý các module, hãy tìm hiểu thêm về Modules Management.
Thiết lập đối số mặc định
Khi tạo một DAG và các tác vụ của nó, bạn có thể truyền các đối số trực tiếp cho từng tác vụ hoặc định nghĩa một tập hợp các tham số mặc định trong một dictionary. Cách tiếp cận thứ hai thường hiệu quả và gọn gàng hơn.
airflow/example_dags/tutorial.py
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
Nếu bạn muốn tìm hiểu sâu hơn về các tham số của BaseOperator, hãy tham khảo tài liệu về BaseOperator.
Tạo một DAG
Tiếp theo, chúng ta cần tạo một đối tượng DAG để chứa các tác vụ của mình. Chúng ta sẽ cung cấp một định danh duy nhất cho DAG, được gọi là `dag_id`, và chỉ định các đối số mặc định mà chúng ta vừa định nghĩa. Chúng ta cũng sẽ thiết lập một lịch trình để DAG của chúng ta chạy hàng ngày.
airflow/example_dags/tutorial.py
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
Hiểu về Operators
Một operator đại diện cho một đơn vị công việc trong Airflow. Chúng là những khối xây dựng cơ bản của các workflow của bạn, cho phép bạn định nghĩa các tác vụ sẽ được thực thi. Mặc dù chúng ta có thể sử dụng các operator cho nhiều tác vụ, Airflow cũng cung cấp TaskFlow API cho một cách tiếp cận Pythonic hơn để định nghĩa các workflow, mà chúng ta sẽ đề cập sau.
Tất cả các operator đều bắt nguồn từ BaseOperator, bao gồm các đối số cần thiết để chạy các tác vụ trong Airflow. Một số operator phổ biến bao gồm PythonOperator, BashOperator, và KubernetesPodOperator. Trong hướng dẫn này, chúng ta sẽ tập trung vào BashOperator để thực hiện một số lệnh bash đơn giản.
Định nghĩa các tác vụ
Để sử dụng một operator, bạn phải khởi tạo nó như một tác vụ. Các tác vụ quy định cách operator sẽ thực hiện công việc của nó trong ngữ cảnh của DAG. Trong ví dụ dưới đây, chúng ta khởi tạo BashOperator hai lần để chạy hai script bash khác nhau. `task_id` đóng vai trò là một định danh duy nhất cho mỗi tác vụ.
airflow/example_dags/tutorial.py
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
Bạn sẽ nhận thấy cách chúng ta kết hợp các đối số cụ thể của operator (như `bash_command`) với các đối số chung (như `retries`) được thừa hưởng từ BaseOperator. Cách tiếp cận này giúp đơn giản hóa code của chúng ta. Trong tác vụ thứ hai, chúng ta thậm chí còn ghi đè tham số `retries` để đặt nó thành 3.
Thứ tự ưu tiên cho các đối số tác vụ như sau:
- Các đối số được truyền trực tiếp
- Các giá trị từ dictionary `default_args`
- Các giá trị mặc định của operator, nếu có
Lưu ý: Hãy nhớ, mỗi tác vụ phải bao gồm hoặc thừa hưởng các đối số `task_id` và `owner`. Nếu không, Airflow sẽ báo lỗi. May mắn thay, một cài đặt Airflow mới sẽ đặt `owner` mặc định là `airflow`, vì vậy bạn chủ yếu cần đảm bảo `task_id` được thiết lập.
Sử dụng Jinja cho Templating
Airflow khai thác sức mạnh của Jinja Templating, mang đến cho bạn quyền truy cập vào các tham số và macro tích hợp để nâng cao workflow của bạn. Phần này sẽ giới thiệu cho bạn những điều cơ bản về templating trong Airflow, tập trung vào biến template thường được sử dụng: `{{ ds }}`, đại diện cho dấu thời gian của ngày hiện tại.
airflow/example_dags/tutorial.py
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
Bạn sẽ thấy rằng `templated_command` bao gồm logic trong các khối `{% %}` và tham chiếu các tham số như `{{ ds }}`. Bạn cũng có thể truyền các file vào `bash_command`, chẳng hạn như `bash_command='templated_command.sh'`, cho phép tổ chức code tốt hơn. Bạn thậm chí có thể định nghĩa `user_defined_macros` và `user_defined_filters` để tạo các biến và bộ lọc riêng của mình để sử dụng trong các template. Để biết thêm về các bộ lọc tùy chỉnh, hãy tham khảo Jinja Documentation.
Để biết thêm thông tin về các biến và macro có thể được tham chiếu trong các template, vui lòng đọc qua tài liệu Templates reference.
Thêm tài liệu cho DAG và Tasks
Bạn có thể thêm tài liệu cho DAG hoặc các tác vụ riêng lẻ của mình. Trong khi tài liệu DAG hiện hỗ trợ markdown, tài liệu tác vụ có thể ở dạng văn bản thuần túy, markdown reStructuredText, JSON hoặc YAML. Việc đưa tài liệu vào đầu file DAG là một thực hành tốt.
airflow/example_dags/tutorial.py
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this


Thiết lập Dependencies
Trong Airflow, các tác vụ có thể phụ thuộc vào nhau. Ví dụ, nếu bạn có các tác vụ `t1`, `t2` và `t3`, bạn có thể định nghĩa sự phụ thuộc của chúng theo một số cách:
t1.set_downstream(t2)
# Điều này có nghĩa là t2 sẽ phụ thuộc vào t1
# chạy thành công để chạy.
# Nó tương đương với:
t2.set_upstream(t1)
# Toán tử dịch bit cũng có thể được
# sử dụng để xâu chuỗi các hoạt động:
t1 >> t2
# Và sự phụ thuộc upstream với
# toán tử dịch bit:
t2 << t1
# Việc xâu chuỗi nhiều phụ thuộc trở nên
# ngắn gọn với toán tử dịch bit:
t1 >> t2 >> t3
# Một danh sách các tác vụ cũng có thể được thiết lập làm
# dependencies. Các hoạt động này
# đều có cùng hiệu quả:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
Hãy lưu ý rằng Airflow sẽ báo lỗi nếu phát hiện các chu trình trong DAG của bạn hoặc nếu một phụ thuộc được tham chiếu nhiều lần.
Làm việc với múi giờ
Tạo một DAG nhận biết múi giờ rất đơn giản. Chỉ cần đảm bảo bạn sử dụng các ngày nhận biết múi giờ với pendulum. Tránh sử dụng thư viện chuẩn datetime.timezone vì chúng có những hạn chế đã biết.
Tổng kết
Xin chúc mừng! Đến bây giờ, bạn đã có kiến thức cơ bản về cách tạo một DAG, định nghĩa các tác vụ và sự phụ thuộc của chúng, cũng như sử dụng templating trong Airflow. Code của bạn sẽ trông tương tự như sau:
airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
Kiểm thử Pipeline của bạn
Đã đến lúc kiểm thử pipeline của bạn! Đầu tiên, hãy đảm bảo rằng script của bạn được phân tích cú pháp thành công. Nếu bạn đã lưu code của mình vào `tutorial.py` trong thư mục Dags được chỉ định trong `airflow.cfg` của bạn, bạn có thể chạy:
python ~/airflow/dags/tutorial.py
Nếu script chạy mà không có lỗi, xin chúc mừng! DAG của bạn đã được thiết lập chính xác.
Kiểm tra siêu dữ liệu từ dòng lệnh
Hãy xác thực script của bạn thêm bằng cách chạy một vài lệnh:
# khởi tạo các bảng cơ sở dữ liệu
airflow db migrate
# in danh sách các DAG đang hoạt động
airflow dags list
# in danh sách các tác vụ trong DAG "tutorial"
airflow tasks list tutorial
# in biểu diễn graphviz của DAG "tutorial"
airflow dags show tutorial
Kiểm thử các phiên bản tác vụ và chạy DAG
Bạn có thể kiểm thử các phiên bản tác vụ cụ thể cho một ngày hợp lý được chỉ định. Điều này mô phỏng trình lập lịch chạy tác vụ của bạn cho một ngày và giờ cụ thể.
Lưu ý: Hãy nhớ rằng trình lập lịch chạy tác vụ của bạn cho một ngày và giờ cụ thể, chứ không nhất thiết vào ngày hoặc giờ đó. Ngày hợp lý là dấu thời gian mà một lần chạy DAG được đặt tên theo, và nó thường tương ứng với kết thúc của khoảng thời gian mà workflow của bạn đang hoạt động — hoặc thời điểm mà lần chạy DAG được kích hoạt thủ công. Airflow sử dụng ngày hợp lý này để tổ chức và theo dõi mỗi lần chạy; đó là cách bạn tham chiếu đến một lần thực thi cụ thể trong giao diện người dùng, nhật ký và code. Khi kích hoạt DAG qua giao diện người dùng hoặc API, bạn có thể cung cấp ngày hợp lý của riêng mình để chạy workflow kể từ một thời điểm cụ thể.
# bố cục lệnh: command subcommand [dag_id] [task_id] [(tùy chọn) date]
# kiểm thử print_date
airflow tasks test tutorial print_date 2015-06-01
# kiểm thử sleep
airflow tasks test tutorial sleep 2015-06-01
Bạn cũng có thể xem các template của mình được hiển thị như thế nào bằng cách chạy:
# kiểm thử templated
airflow tasks test tutorial templated 2015-06-01
Lệnh này sẽ cung cấp nhật ký chi tiết và thực thi lệnh bash của bạn.
Hãy nhớ rằng lệnh `airflow tasks test` chạy các phiên bản tác vụ cục bộ, xuất nhật ký của chúng ra stdout và không theo dõi trạng thái trong cơ sở dữ liệu. Đây là một cách tiện lợi để kiểm thử từng phiên bản tác vụ riêng lẻ.
Tương tự, `airflow dags test` chạy một lần chạy DAG mà không đăng ký bất kỳ trạng thái nào trong cơ sở dữ liệu, điều này hữu ích cho việc kiểm thử toàn bộ DAG của bạn cục bộ.
Bước tiếp theo của bạn là gì?
Vậy là bạn đã hoàn thành! Bạn đã viết và kiểm thử thành công pipeline Airflow đầu tiên của mình. Khi tiếp tục hành trình, hãy cân nhắc việc hợp nhất code của bạn vào một kho lưu trữ với một Scheduler đang chạy chống lại nó, điều này sẽ cho phép DAG của bạn được kích hoạt và thực thi hàng ngày.
-
Tiếp tục với bước tiếp theo của hướng dẫn: Pythonic Dags với TaskFlow API
-
Khám phá phần Core Concepts để biết giải thích chi tiết về các khái niệm Airflow như Dags, Tasks, Operators và nhiều hơn nữa.
MagicFlow | TechData.AI