728x90
반응형

airflow 를 사용하면서 하나의 task 안에서 여러 개의 작업이 수행될 때 각 작업의 상태를 확인하거나 모든 작업에 동일한 값을 넘겨주고 싶은 경우가 있다. 이럴 때 airflow 제공하는 것이 airflow Dynamic Task Mapping 이라는 기능이다.

 

이 기능은 airflow 2.3.0 버전에서 새롭게 추가된 기능이라고 한다. 이 기능 외에도 다양한 기능이 추가된 것으로 알고 있다.

 

airflow 2.3.0 의 release note 는 아래에서 확인할 수 있다.

https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-3-0-2022-04-30

 

Release Notes — Airflow Documentation

 

airflow.apache.org

 

 

New Features 를 확인해보니 맨 위에 dynamic task mapping 이 새롭게 추가된 것을 확인할 수 있었다.

 

 

Airflow Dynamic Task Mapping

Dynamic Taks Mapping 은 이전 Task 의 결과를 가져와서 작업을 수행하게 된다.
Mapping 된 작업이 실행되기 전에 스케줄러는 각 입력에 대해서 하나씩 n 개의 작업 복사본을 생성한다고 한다.
일반적으로 MapReduce 방식을 생각할 수 있다.

 

그럼 Dynamic Task Mapping 기능을 사용할 때 다음과 같이 2개의 함수를 통해서 사용해볼 수 있다.

  • expand() 함수
  • partial() 함수

 

expand() 함수

expand() 함수는 mapping 하고자 하는 파라미터를 task 로 전달해서 파라미터의 개수만큼 동적으로 실행할 수 있도록 해주는 함수다.
그리고 expand() 함수는 keyword arguments 로만 전달할 수 있다.

 

expand() 함수에 대해서 이해하기 쉽게 직접 사용해보면서 확인해보았다.

from datetime import datetime

from airflow.decorators import dag, task


@dag("test_dynamic_task_mapping", start_date=datetime(2023, 12, 28))
def test_dynamic_task_mapping():
    @task()
    def add_one(num: int):
        return num + 1

    @task()
    def sum_all(values):
        total = sum(values)
        print(f"total was {total}")

    add_values = add_one.expand(num=[1, 2, 3])
    sum_all(add_values)


test_dynamic_task_mapping = test_dynamic_task_mapping()

 

코드에서 사용된 expand() 함수는 mapping 하고자 하는 파라미터인 num 의 개수에 맞게 task 를 병렬로 생성하게 된다.

add_one(num=1)
add_one(num=2)
add_one(num=3)

 

따라서, DAG 를 실행한 결과는 다음과 같이 나오게 된다.

total was 9

 

DAG 의 그래프를 확인해보면 다음과 같다.

 

add_one 이라는 task 가 3개 생긴 것을 확인할 수 있다. 그리고 task log 를 통해서 더 자세하게 확인해 볼 수 있다.

 

partial() 함수

partial() 함수는 expand() 함수와 같이 각 파라미터를 전달하지만 변경되지 않는 값을 전달하는 기능이라고 볼 수 있다.
쉽게 말해서 병렬로 생성된 task 에서 동일하게 사용되는 파라미터를 전달한다고 생각하면 된다.

 

partial() 함수도 코드를 통해 예를 들어서 사용해보면 이해하기 쉬울 것 같다.

from datetime import datetime

from airflow.decorators import dag, task


@dag("test_dynamic_task_mapping", start_date=datetime(2023, 12, 28))
def test_dynamic_task_mapping():
    @task()
    def add_one(num1: int, num2: int):
        return num1 + num2

    @task()
    def sum_all(values):
        total = sum(values)
        print(f"total was {total}")

    add_values = add_one.partial(num2=10).expand(num1=[1, 2, 3])
    sum_all(add_values)


test_dynamic_task_mapping = test_dynamic_task_mapping()

 

코드를 보면 partial() 함수를 통해 num2 의 값에 10 이라는 변경되지 않는 파라미터를 넘겨주게 되고
expand() 함수를 통해 num1 의 개수 만큼 task 를 생성하게 된다.

add_one(num1=1, num2=10)
add_one(num1=2, num2=10)
add_one(num1=3, num2=10)

 

따라서, DAG 를 실행한 결과는 다음과 같이 나오게 된다.

total was 36

 

DAG 그래프와 log 는 위와 동일하지만 결과가 달라진 것을 확인할 수 있다.

 

 

마지막으로 airflow 에서 제공되는 Dynamic Taks Mapping 기능을 통해 하나의 Task 를 동적으로 실행할 수 있는 기능을 알아보았고
expand() 함수와 partial() 함수를 통해 task 를 효율적으로 사용할 수 있을 것 같다.

 

 

reference

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html

 

Dynamic Task Mapping — Airflow Documentation

 

airflow.apache.org

https://wookiist.dev/170

 

[Airflow] Dynamic Task Mapping (동적 태스크 매핑)

Introduction 최근 사내에서 사용하는 몇몇 Airflow 클러스터 관리를 담당하게 되면서 Airflow 2.3.0으로의 버전업을 진행하게 되었습니다. 사실 이전에 다뤘던 두 포스트도 그 일환으로 작성했던 내용이

wookiist.dev

 

728x90
반응형
복사했습니다!