13 분 소요

Airflow 초급 3 :

태스크 의존성 패턴, 조건에 따른 태스크 실행, 태스크 간 데이터 공유, Taskflow API

1. 태스크 의존성 패턴

1.1. 선형 의존성

선형의존성

  • 단일 업스트림 태스크가 단일 다운스트림 태스크에 연결되는 것 (일대일)

1.2. 팬인/팬아웃 의존성

팬인팬아웃의존성

(1) 팬아웃 의존성

  • 단일 업스트림 태스크가 여러 다운스트림 태스크에 연결되는 것 (일대다)
  • 2가지 이상의 태스크가 동시에 시작되는 경우 더미 태스크로 암묵적 팬아웃을 명시

(2) 팬인 의존성

  • 여러 업스트림 태스크가 단일 업스트림 태스크에 연결되는 것 (다대일)

(3) 독립적 실행

  • 팬인/팬아웃 의존성으로 인해 태스크가 병렬 실행, 독립적 실행 될 수 있음
    • 위의 그림에서 Data1 처리 태스크와 Data2 처리 태스크는 서로 직접적인 의존성이 없다.
    • 병렬 실행을 위해서는 별도로 Airflow 설정을 해줘야 한다.

2. 조건에 따른 태스크 실행

특정 조건에만 태스크가 수행되도록 다음과 같은 방법을 사용할 수 있다.

2.1. 태스크 내 조건 추가

간단하게 태스크 내에 조건을 부여해서 태스크 실행 여부를 결정할 수 있지만, 조건이 DAG 에서 확인이 불가능하다.

  • (예시) 당일에 학습한 모델만 배포할 때
def _deploy_model(**context):
    if context["logical_date"] == ...:
        deploy_model()

with DAG(
    ...
) as dag:
    ...
    deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
    ...
    join_datasets >> train_model >> deploy_model
  • 태스크 내에 조건을 부여할 경우 단점
    • 해당 태스크에 대해 PythonOperator만 사용 가능하다.
    • Airflow UI에서 태스크 결과 추적(모델을 배포했는지 안했는지)하기 어렵다.

2.2. 조건부 태스크 추가

조건 자체를 태스크로 만드는 것이 DAG 플로우를 이해하기 좋다.

조건부 태스크를 통해 조건을 테스트하고, 실패하는 경우 모든 다운스트림 작업을 건너뛰도록 AirflowSkipException을 발생시킨다.

조건부 태스크

  • (예시) 당일에 학습한 모델만 배포할 때
def _latest_only(**context):
    now = pendulum.now("UTC")
    left_window = context["dag"].following_schedule(context["logical_date"])
    right_window = context["dag"].following_schedule(left_window)

    if not left_window < now <= right_window:
        raise AirflowSkipException()

with DAG(
    ...
) as dag:
    ...
    train_model = DummyOperator(task_id="train_model")
    latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only)
    deploy_model = DummyOperator(task_id="deploy_model")
	...
    join_datasets >> train_model >> deploy_model
    latest_only >> deploy_model
  • (예시) 당일에 학습한 모델만 배포할 때 - 내장 오퍼레이터 사용
    • 가장 최근 실행한 DAG만 실행하는 경우, Airflow 의 내장 클래스인 LastOnlyOperator 클래스를 사용하면 간단하게 구현 가능하다.
from airflow.operators.latest_only import LatestOnlyOperator

with DAG(
    ...
) as dag:
    ...
    train_model = DummyOperator(task_id="train_model")
    latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
    deploy_model = DummyOperator(task_id="deploy_model")
	...
    join_datasets >> train_model >> deploy_model
    latest_only >> deploy_model

2.3. 브랜치 추가

기존에 사용하던 데이터 플랫폼을 변경하는 등의 이벤트에도 파이프라인 스케줄링을 지속할 수 있도록 브랜치를 형성한다.

이 때도 태스크 내에서 코드로 내부 브랜치를 만들 수 있지만 DAG의 가독성, 실행 추적, 플로우의 변동성을 고려한다면 태스크 셋으로 브랜치를 만드는 것이 좋다.

브랜치를 형성할 때에는 BranchPythonOperator, 트리거 규칙 설정, DummyOperator등을 활용한다.

브랜치 추가

  • (예시) 기존에 사용하던 데이터 플랫폼을 10월 10일에 변경 예정
    • BranchPythonOperator : 작업 결과로 다운스트림 태스크의 ID 반환하고, 특정 브랜치만 실행할 때 사용한다. (리스트로 여러 ID 반환 가능)
    • 트리거 규칙 : 특정 브랜치만 성공해도 이후의 다운스트림이 실행되도록 none_failed 설정한다.
    • DummyOperator : DAG 가독성을 위해 브랜치 결합에 사용한다.
from airflow.operators.python import PythonOperator, BranchPythonOperator

CHANGE_DATE = airflow.utils.dates.days_ago(1)

def _pick_branch(**context):
    if context["logical_date"] < CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"

with DAG(
    ...
) as dag:
    start = DummyOperator(task_id="start")
    
    pick_branch = BranchPythonOperator(task_id="pick_branch", python_callable=_pick_branch)
    extract_1_old = PythonOperator(...)
    extract_1_new = PythonOperator(...)
    transform_1_old = PythonOperator(...)
    transform_1_new = PythonOperator(...)
    
    # join_branch의 모든 업스트림이 실행 및 성공할 필요 없음
    join_branch = DummyOperator(task_id="join_branch", trigger_rule="none_failed")

    extract_2 = PythonOperator(...)
    transform_2 = PythonOperator(...)

    join_datasets = DummyOperator(...)
    train_model = DummyOperator(...)
    deploy_model = DummyOperator(...)

    start >> [pick_branch, extract_2]
    
    pick_branch >> [extract_1_old, extract_1_new]
    extract_1_old >> transform_1_old
    extract_1_new >> transform_1_new
    [transform_1_old, transform_1_new] >> join_branch
    
    extract_2 >> transform_2
    
    [join_branch, transform_2] >> join_datasets
    
    join_datasets >> train_model >> deploy_model

2.4. 트리거 규칙

Airflow는 기본적으로 태스크 실행 전에 해당 태스크의 업스트림 태스크가 모두 성공적으로 완료(all_success)되어야 한다.

이는 업스트림 태스크 결과(성공, 실패, 스킵 등)가 다운스트림 태스크에도 전파되기 때문이다. 하지만 이를 Operator의 trigger_rule 인수를 통해 변경할 수 있다.

트리거 규칙 동작 사용 사례
all_success 모든 상위 태스크가 성공적으로 완료되면 트리거 일반적인 워크플로에 대한 기본 트리거 규칙
all_failed 모든 상위 태스크가 실패했거나 상위 태스크의 오류로 인해 실패했을 경우 트리거 태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드 트리거
all_done 결과 상태에 상관없이 모든 상위 태스크가 실행 완료 시 트리거 모든 태스크가 완료되었을 때 실행할 청소 코드 실행(시스템 종료, 클러스터 중지)
one_failed 하나 이상의 상위 태스크가 실패하자마자 트리거,
다른 상위 태스크 실행 완료 기다리지 않음
알림/롤백과 같은 일부 오류 처리 코드 빠르게 트리거
one_success 하나 이상의 상위 태스크가 성공하자마자 트리거,
다른 상위 태스크 실행 완료 기다리지 않음
하나의 결과 사용 가능한 즉시 다운스트림 연산/알림 빠르게 트리거
none_failed 실패한 상위 태스크가 없지만, 태스크가 성공/스킵된 경우 트리거 DAG상 조건부 브랜치의 결합
none_skipped 건너뛴 상위 태스크가 없지만, 태스크가 성공/실패한 경우 트리거 모든 업스트림 태스크가 실행된 경우, 해당 결과 무시하고 트리거
dummy 상위 태스크 상태와 관계없이 트리거 테스트 시

3. 태스크 간 데이터 공유

Airflow에서는 태스크 간 데이터 공유하는 방법에는 DB에 적재 후 불러오기, XCom에 적재 후 불러오기 2가지 방법이 있다.

여기서는 XCom을 활용한 방법을 알아보겠다.

3.1. Task Context의 task_instance

(1) 함수 내 사용

  • XCom 값 게시
    • context["task_instance"].xcom_push(key="key", value="value")
    • Key, Value, Timestamp, Logical Date, Task_id, Dag_id 등의 값이 등록된다.
    • Airflow Web UI > Admin > XCom 에서 확인 가능하다.
  • XCom 값 참조
    • context["task_instance"].xcom_pull(task_ids="id", key="key")
    • 일반적으로 task_id와 key만 지정해서 값을 가져온다.
    • dag_id와 logical_date를 지정하지 않으면 현재 DAG 실행을 통해 게시된 값만 가져온다.
def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")

with DAG(
    ...
) as dag:
    ...
    train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
    deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
    ...
    join_datasets >> train_model >> deploy_model

(2) Jinja 템플릿 사용

def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(templates_dict, **context):
    model_id = templates_dict["model_id"]
    print(f"Deploying model {model_id}")

with DAG(
    ...
) as dag:
    ...
    train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
    # jinja 템플릿 통해 XCom 값 참조
    deploy_model = PythonOperator(
        task_id="deploy_model",
        python_callable=_deploy_model,
        templates_dict={
            "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
        },
    )
	...
    join_datasets >> train_model >> deploy_model

3.2. Implicit XCom

일부 오퍼레이터는 XCom 메서드를 사용하지 않아도 암묵적으로 XCom을 게시하는 기능도 제공한다.

(1) BashOperator

  • xcom_push 옵션을 true로 지정 시 stdout에 기록된 마지막 행을 XCom 값으로 게시

(2) PythonOperator

  • 파이썬 호출 가능한 인수에서 반환된 값을 XCom 값으로 게시
def _train_model(**context):
    model_id = str(uuid.uuid4())
    return model_id

3.3. XCom의 단점

  • 태스크 간 암시적인 의존성이 있지만, 태스크 스케줄에 고려되지 않음
  • 태스크 간 암시적인 의존성으로 인해 Operator의 원자성을 해칠 가능성 있음
  • 저장되는 모든 값은 직렬화를 지원해야 함
  • Airflow의 메타스토어에 저장되기 때문에 백엔드에 따라 저장 크기가 제한

→ 따라서, 사용한다면 태스크 간 의존성을 명확히 기록해야 한다!

3.4. 커스텀 XCom 백엔드 지정 옵션

  • Airflow 2 이후부터 커스텀 XCom 백엔드 지정 옵션을 통해 커스텀 클래스 정의 가능
  • S3, Blob, GCS 등의 다양한 서비스 백엔드 사용 가능하기에 큰 값 저장 가능
  • 커스텀 클래스 정의 시 필수 사항
    • BaseXCom 기본 클래스 상속
    • 직렬화, 역직렬화 위한 2가지 정적 메서드 정의

4. Taskflow API 로 코드 간소화

4.1. Taskflow API 란?

  • 파이썬 태스크 및 의존성 정의를 위한 새로운 데코레이터 기반 API
  • PythonOperator 기반 태스크가 대부분인 경우 사용 권장

4.2. 일반 API와 Taskflow API

플로우

(1) 일반 API

  • 태스크 생성 : 함수 정의 → PythonOperator로 태스크 생성
  • 데이터 전달 : XCom 메서드 사용, key 값 일치시키기, XCom으로 인한 데이터 의존성 명시 불가능
def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].XCom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id = context["task_instance"].XCom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")

with DAG(
    ...
) as dag:
	...
    train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
    deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
    ...
    # 두 태스크 간에 데이터가 공유되는지 알 수 없음
    join_datasets >> train_model >> deploy_model

(2) Taskflow API

  • 태스크 생성 : @task 데코레이터와 함께 함수 정의
    • 함수 호출 → 태스크에 대한 오퍼레이터 인스턴스 생성 → XCom 반환/XCom값 전달
  • 데이터 전달 : return 값과 함수에 인수 전달로 데이터 의존성 명시
from airflow.decorators import task

with DAG(
    ...
) as dag:
	# 함수 정의와 동시에 태스크 생성
    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")
	# 두 태스크 간에 데이터가 공유됨을 한 번에 알 수 있음
    model_id = train_model()
    deploy_model(model_id)

4.3. 장단점

  • 장점
    • PythonOperator 기반 태스크 정의 단순화
    • 태스크 간 XCom으로 인한 데이터 의존성 명시 가능
    • XCom 으로 데이터 전달 시 코드 단순화
  • 단점
    • PythonOperator 만 지원
    • 2가지 스타일을 혼용해서 사용하는 경우 플로우가 직관적이지 못함
    • XCom의 제약사항(직렬화, 크기 제한 등)은 그대로 적용
with DAG(
    ...
) as dag:
    ...
    join_datasets = DummyOperator(task_id="join_datasets")
    
    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")
	
    # Case1. 일반 API만 사용
    # join_datasets >> train_model >> deploy_model
    
    # Case2. TaskflowAPI + 일반 API
    model_id = train_model()
    deploy_model(model_id)
    
    join_datasets >> model_id

5. 정리

  • 일대일, 일대다, 다대일로 의존성을 정의할 수 있다.
  • DAG 상에 조건 등을 명시해야 가독성도 좋고, 태스크 결과 추적에 용이하다.
  • BranchPythonOpertor, 트리거 규칙, 조건부 태스크 등을 활용해서 조건에 따른 의존성을 부여할 수 있다.
  • Taskflow API는 파이썬으로만 이루어진 태스크에만 사용하는 게 좋을 것 같다.
  • 상태 등의 작은 데이터는 XCom을 활용하여 태스크 간에 전달할 수 있다.
  • 커스텀 XCom 백엔드 지정 옵션을 조금 더 알아볼 필요가 있다.

REFERENCES

  • [서적] Apache Airflow 기반의 데이터 파이프라인 (제이펍, 2022)