
airflow 를 사용하면서 하나의 task 안에서 여러 개의 작업이 수행될 때 각 작업의 상태를 확인하거나 모든 작업에 동일한 값을 넘겨주고 싶은 경우가 있다. 이럴 때 airflow 제공하는 것이 airflow Dynamic Task Mapping 이라는 기능이다.
이 기능은 airflow 2.3.0 버전에서 새롭게 추가된 기능이라고 한다. 이 기능 외에도 다양한 기능이 추가된 것으로 알고 있다.
airflow 2.3.0 의 release note 는 아래에서 확인할 수 있다.
https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-3-0-2022-04-30
Release Notes — Airflow Documentation
airflow.apache.org
New Features 를 확인해보니 맨 위에 dynamic task mapping 이 새롭게 추가된 것을 확인할 수 있었다.

1. Airflow Dynamic Task Mapping
Dynamic Taks Mapping 은 이전 Task 의 결과를 가져와서 작업을 수행하게 된다.
Mapping 된 작업이 실행되기 전에 스케줄러는 각 입력에 대해서 하나씩 n 개의 작업 복사본을 생성한다고 한다.
일반적으로 MapReduce 방식을 생각할 수 있다.
그럼 Dynamic Task Mapping 기능을 사용할 때 다음과 같이 2개의 함수를 통해서 사용해볼 수 있다.
- expand() 함수
- partial() 함수
1.1. expand() 함수
expand() 함수는 mapping 하고자 하는 파라미터를 task 로 전달해서 파라미터의 개수만큼 동적으로 실행할 수 있도록 해주는 함수다.
그리고 expand() 함수는 keyword arguments 로만 전달할 수 있다.
expand() 함수에 대해서 이해하기 쉽게 직접 사용해보면서 확인해보았다.
from datetime import datetime
from airflow.decorators import dag, task
@dag("test_dynamic_task_mapping", start_date=datetime(2023, 12, 28))
def test_dynamic_task_mapping():
@task()
def add_one(num: int):
return num + 1
@task()
def sum_all(values):
total = sum(values)
print(f"total was {total}")
add_values = add_one.expand(num=[1, 2, 3])
sum_all(add_values)
test_dynamic_task_mapping = test_dynamic_task_mapping()
코드에서 사용된 expand() 함수는 mapping 하고자 하는 파라미터인 num 의 개수에 맞게 task 를 병렬로 생성하게 된다.
add_one(num=1)
add_one(num=2)
add_one(num=3)
따라서, DAG 를 실행한 결과는 다음과 같이 나오게 된다.
total was 9
DAG 의 그래프를 확인해보면 다음과 같다.

add_one 이라는 task 가 3개 생긴 것을 확인할 수 있다. 그리고 task log 를 통해서 더 자세하게 확인해 볼 수 있다.

1.2. partial() 함수
partial() 함수는 expand() 함수와 같이 각 파라미터를 전달하지만 변경되지 않는 값을 전달하는 기능이라고 볼 수 있다.
쉽게 말해서 병렬로 생성된 task 에서 동일하게 사용되는 파라미터를 전달한다고 생각하면 된다.
partial() 함수도 코드를 통해 예를 들어서 사용해보면 이해하기 쉬울 것 같다.
from datetime import datetime
from airflow.decorators import dag, task
@dag("test_dynamic_task_mapping", start_date=datetime(2023, 12, 28))
def test_dynamic_task_mapping():
@task()
def add_one(num1: int, num2: int):
return num1 + num2
@task()
def sum_all(values):
total = sum(values)
print(f"total was {total}")
add_values = add_one.partial(num2=10).expand(num1=[1, 2, 3])
sum_all(add_values)
test_dynamic_task_mapping = test_dynamic_task_mapping()
코드를 보면 partial() 함수를 통해 num2 의 값에 10 이라는 변경되지 않는 파라미터를 넘겨주게 되고
expand() 함수를 통해 num1 의 개수 만큼 task 를 생성하게 된다.
add_one(num1=1, num2=10)
add_one(num1=2, num2=10)
add_one(num1=3, num2=10)
따라서, DAG 를 실행한 결과는 다음과 같이 나오게 된다.
total was 36
DAG 그래프와 log 는 위와 동일하지만 결과가 달라진 것을 확인할 수 있다.
마지막으로 airflow 에서 제공되는 Dynamic Taks Mapping 기능을 통해 하나의 Task 를 동적으로 실행할 수 있는 기능을 알아보았고
expand() 함수와 partial() 함수를 통해 task 를 효율적으로 사용할 수 있을 것 같다.
2. reference
Dynamic Task Mapping — Airflow Documentation
airflow.apache.org
[Airflow] Dynamic Task Mapping (동적 태스크 매핑)
Introduction 최근 사내에서 사용하는 몇몇 Airflow 클러스터 관리를 담당하게 되면서 Airflow 2.3.0으로의 버전업을 진행하게 되었습니다. 사실 이전에 다뤘던 두 포스트도 그 일환으로 작성했던 내용이
wookiist.dev
'데이터 엔지니어링 > Airflow' 카테고리의 다른 글
airflow default_args error - Invalid arguments were: **kwargs: {'provide_context': False} (0) | 2024.02.07 |
---|---|
airflow Default Arguments (0) | 2024.02.07 |
3. 공공데이터포털 데이터 전처리하기(2) (0) | 2023.01.02 |
2. 공공데이터포털 데이터 전처리하기 (1) (0) | 2022.12.31 |
1. 공공데이터포털 데이터 가져오기 (0) | 2022.12.25 |