[Airflow] 초급편 (2)
Airflow 초급 2 :
콘텍스트 변수, 콘텍스트 변수 전달, 템플릿 작업
1. 태스크 콘텍스트 변수
태스크 콘텍스트 변수란, DAG에 대한 다양한 동적 정보들을 담고 있는 변수이다.
1.1. execution_date의 변화
- 기존 버전은
execution_date
로 데이터 처리 대상과 작업 실행 간격을 정의함- 문제점 1. 실제 실행 일자가 아닌 논리적으로 처리하려는 데이터의 시작 일자이기 때문에 이름의 가독성이 떨어진다.
- 문제점 2. 작업 대상 범위와 작업 실행 간격이 다른 경우에 설정이 복잡해진다.
- Airflow 2.2 버전부터 작업 대상 범위(TimeWindow)과 작업 실행 시점(스케줄 간격)이 분리됨
- 작업 대상 범위 :
data_interval_start
,data_interval_end
- 작업 실행 시점 :
logical_date
- 작업 대상 범위 :
1.2. Airflow 2.2 이후 콘텍스트 변수
변수 | 타입 | 설명 |
---|---|---|
{{ logical_date }} | pendulum.DateTime | 이전 버전과의 호환성을 위해 execution_date 대신 사용 |
{{ data_interval_start }} | pendulum.DateTime | 기존 execution_date |
{{ data_interval_end }} | pendulum.DateTime | 기존 next_execution_date |
{{ ds }} | str | YYYY-MM-DD |
{{ ds_nodash }} | str | YYYYMMDD |
{{ ts }} | str | 2018-01-01T00:00:00+00:00 |
{{ dag }} | DAG | 현재 실행 중인 dag |
{{ task }} | BaseOperator | 현재 실행 중이 BaseOperator |
{{ run_id }} | str | 현재 실행 중인 DagRun runID |
{{ prev_data_interval_start_success }} | pendulum.DateTime | 이전에 성공한 DagRun의 TimeWindow 시작점 |
2. 템플릿 작업
2.1. 오퍼레이터 인수 템플릿
- Pendulum 라이브러리는 파이썬의 datetime의 호환 객체
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
...
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"<https://dumps.wikimedia.org/other/pageviews/>"
"{{ logical_date.year }}/"
"{{ logical_date.year }}-{{ '{:02}'.format(logical_date.month) }}/"
"pageviews-{%{ logical_date.year }}"
"{{ '{:02}'.format(logical_date.month) }}"
"{{ '{:02}'.format(logical_date.day) }}-"
"{{ '{:02}'.format(logical_date.hour) }}0000.gz"
),
dag=dag,
)
2.2. PythonOperator 템플릿
- 키워드 인수를 사용해 콘텍스트 전체 전달
- PythonOperator가 호출 가능한 인수의 이름으로부터 콘텍스트 변수 호출 가능한지 판단한다.
- 모든 콘텍스트 변수는 키워드 인수로 전달된다.
def _print_context(**context):
start=context["data_interval_start"] #2023-09-10-13T14:00:00+00:00
end=context["data_interval_end"] #2023-09-10-13T15:00:00+00:00
print_context=PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag,
)
- 콘텍스트 변수 중 일부를 변수로 전달
def _print_context(logical_date, **context):
year, month, day, hour, *_ = logical_date.timetuple()
print_context=PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag,
)
- 콘텍스트 변수 외에도 추가 인수 전달 (kwargs)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"<https://dumps.wikimedia.org/other/pageviews/>"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ logical_date.year }}",
"month": "{{ logical_date.month }}",
"day": "{{ logical_date.day }}",
"hour": "{{ logical_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
- 콘텍스트 변수 외에도 추가 인수 전달 (args)
def _get_data(output_path, **context):
year, month, day, hour, *_ = context["logical_date"].timetable()
url = (
"<https://dumps.wikimedia.org/other/pageviews/>"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_args=["/tmp/wikipageviews.gz"],
dag=dag,
)
(3) 템플릿 인수 검사
-
기본적으로 Airflow UI의 Rendered Template에서 템플릿 인수 값을 검사 가능
- 단, Airflow에서 작업을 스케줄해야 확인 가능하다.
-
airflow tasks render [dag id] [task id] [desired logical date]
- Airflow CLI는 작업을 실행하지 않아도 모든 템플릿 속성을 렌더링하여 보여준다.
- 개발 과정에서 템플릿 인수 검사 시에는 Airflow CLI를 활용하는 것이 더 효율적이다.
REFERENCES
- [서적] Apache Airflow 기반의 데이터 파이프라인 (제이펍, 2022)
- [Doc] Templates reference
- [Github] Airflow 코드
- [블로그] AIP-39: Airflow ‘schedule_interval’의 변신, 그리고 ‘execution_date’의 종말