4 분 소요

Airflow 초급 2 :

콘텍스트 변수, 콘텍스트 변수 전달, 템플릿 작업

1. 태스크 콘텍스트 변수

태스크 콘텍스트 변수란, DAG에 대한 다양한 동적 정보들을 담고 있는 변수이다.

1.1. execution_date의 변화

  • 기존 버전은 execution_date로 데이터 처리 대상과 작업 실행 간격을 정의함
    • 문제점 1. 실제 실행 일자가 아닌 논리적으로 처리하려는 데이터의 시작 일자이기 때문에 이름의 가독성이 떨어진다.
    • 문제점 2. 작업 대상 범위와 작업 실행 간격이 다른 경우에 설정이 복잡해진다.
  • Airflow 2.2 버전부터 작업 대상 범위(TimeWindow)작업 실행 시점(스케줄 간격)이 분리됨
    • 작업 대상 범위 : data_interval_start, data_interval_end
    • 작업 실행 시점 : logical_date

1.2. Airflow 2.2 이후 콘텍스트 변수

변수 타입 설명
{{ logical_date }} pendulum.DateTime 이전 버전과의 호환성을 위해 execution_date 대신 사용
{{ data_interval_start }} pendulum.DateTime 기존 execution_date
{{ data_interval_end }} pendulum.DateTime 기존 next_execution_date
{{ ds }} str YYYY-MM-DD
{{ ds_nodash }} str YYYYMMDD
{{ ts }} str 2018-01-01T00:00:00+00:00
{{ dag }} DAG 현재 실행 중인 dag
{{ task }} BaseOperator 현재 실행 중이 BaseOperator
{{ run_id }} str 현재 실행 중인 DagRun runID
{{ prev_data_interval_start_success }} pendulum.DateTime 이전에 성공한 DagRun의 TimeWindow 시작점

2. 템플릿 작업

2.1. 오퍼레이터 인수 템플릿

  • Pendulum 라이브러리는 파이썬의 datetime의 호환 객체
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    ...
)

get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "<https://dumps.wikimedia.org/other/pageviews/>"
        "{{ logical_date.year }}/"
        "{{ logical_date.year }}-{{ '{:02}'.format(logical_date.month) }}/"
        "pageviews-{%{ logical_date.year }}"
        "{{ '{:02}'.format(logical_date.month) }}"
        "{{ '{:02}'.format(logical_date.day) }}-"
        "{{ '{:02}'.format(logical_date.hour) }}0000.gz"
    ),
    dag=dag,
)

2.2. PythonOperator 템플릿

  • 키워드 인수를 사용해 콘텍스트 전체 전달
    • PythonOperator가 호출 가능한 인수의 이름으로부터 콘텍스트 변수 호출 가능한지 판단한다.
    • 모든 콘텍스트 변수는 키워드 인수로 전달된다.
def _print_context(**context):
    start=context["data_interval_start"] #2023-09-10-13T14:00:00+00:00
		end=context["data_interval_end"] #2023-09-10-13T15:00:00+00:00

print_context=PythonOperator(
	task_id="print_context",
	python_callable=_print_context,
	dag=dag,
)
  • 콘텍스트 변수 중 일부를 변수로 전달
def _print_context(logical_date, **context):
    year, month, day, hour, *_ = logical_date.timetuple()

print_context=PythonOperator(
	task_id="print_context",
	python_callable=_print_context,
	dag=dag,
)
  • 콘텍스트 변수 외에도 추가 인수 전달 (kwargs)
def _get_data(year, month, day, hour, output_path, **_):
    url = (
        "<https://dumps.wikimedia.org/other/pageviews/>"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)

get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_kwargs={
        "year": "{{ logical_date.year }}",
        "month": "{{ logical_date.month }}",
        "day": "{{ logical_date.day }}",
        "hour": "{{ logical_date.hour }}",
        "output_path": "/tmp/wikipageviews.gz",
    },
    dag=dag,
)
  • 콘텍스트 변수 외에도 추가 인수 전달 (args)
def _get_data(output_path, **context):
		year, month, day, hour, *_ = context["logical_date"].timetable()
    url = (
        "<https://dumps.wikimedia.org/other/pageviews/>"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)

get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_args=["/tmp/wikipageviews.gz"],
    dag=dag,
)

(3) 템플릿 인수 검사

  • 기본적으로 Airflow UI의 Rendered Template에서 템플릿 인수 값을 검사 가능

    • 단, Airflow에서 작업을 스케줄해야 확인 가능하다.
  • airflow tasks render [dag id] [task id] [desired logical date]
    
    • Airflow CLI는 작업을 실행하지 않아도 모든 템플릿 속성을 렌더링하여 보여준다.
    • 개발 과정에서 템플릿 인수 검사 시에는 Airflow CLI를 활용하는 것이 더 효율적이다.

REFERENCES