[Airflow] 초급편 (1)
Airflow 초급 1 :
Airflow의 특징, DAG 작성 순서, 스케줄링, 디자인 원칙
1. Airflow 소개
1.1. Airflow 특징
- 파이썬 코드로 유연한 파이프라인 정의
- 파이프라인 스케줄링 및 실행
- 스케줄러 : DAG 분석 및 워커에 태스크 예약
- 워커 : 예약된 태스크 선택 및 실행
- 웹 서버 : 스케줄러가 분석한 DAG 시각화, DAG 실행과 결과 확인 인터페이스
- 파이프라인 모니터링 및 실패 처리
- 재시도, 알림, 트리거 규칙 등
- 점진적 로딩 및 백필
1.2. Airflow 가 적합하지 않은 경우
- Airflow는 배치 태스크에 초점이 맞춰져 있기 때문에 스트리밍 워크플로에는 적합하지 않음
- 추가 및 삭제 태스크가 빈번한 동적 파이프라인에 적합하지 않음
- DAG가 실행되는 동안 구조가 변경되지 않아야 한다.
2. DAG
2.1. DAG란?
- 방향성 비순환 그래프로 Airflow의 워크플로를 표시할 때 사용
2.2. DAG 작성 순서
- DAG 객체 인스턴스 생성
- 모든 워크플로의 시작점으로 dag_id, start_date, schedule_interval 등 지정
- Operator 객체 인스턴스 생성을 통한 태스크 지정
- 태스크 실행 순서 정의
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def _task_2():
pass
# DAG 객체 인스턴스 생성
dag = DAG(
dag_id="dag_id",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)
# Operator 객체 인스턴스 생성을 통한 태스크 지정
task_1 = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'", # noqa: E501
dag=dag,
)
task_2= PythonOperator(
task_id="task_2", python_callable=_task_2, dag=dag
)
# 태스크 실행 순서 정의
task_1 >> task_2
2.3. 태스크와 오퍼레이터
- DAG : 오퍼레이터 집합에 대한 실행을 오케스트레이션하는 역할
- 오퍼레이터 시작, 정지, 완료 시 다음 태스크의 시작, 의존성 보장 등이 있다.
- 오퍼레이터 : 단일 작업 수행 역할
- BashOperator, PythonOperator, HTTPOperator 등이 있다.
- 태스크 : 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 매니저(래퍼)
- 오퍼레이터의 상태 관리, 사용자에게 상태 변경 표시하는 Airflow 내장 컴포넌트이다.
3. 스케줄링
3.1. 스케줄링 방법
- start_date
- 필수 인자로 실제로 DAG를 실행시키는 일자가 아니다.
- 처리할 데이터 기간(Time Window)의 시작 일자라고 보는 게 낫다.
- 2023.09.09에 2023.09.10라고 지정하면 2023.09.10 자정에 실행되지 않는다.
- 2023.09.10의 모든 데이터를 모두 수집한 이후인 2023.09.11 자정에 DAG를 실행한다.
(1) Cron 기반
-
"분 시 일 월 요일"
0 * * * *
= 매시간 정시에 실행0 0 * * 0
= 매주 일요일 자정에 실행
dag = DAG(
dag_id="dag_id",
schedule_interval="0 * * * *",
start_date=datetime(2023, 9, 10),
end_date=datetime(2023, 9, 15),
)
(2) 매크로 기반
@once
: 1회만 실행@hourly
: 매시간 변경 시 1회 실행@weekly
: 매주 일요일 자정에 1회 실행@monthly
: 매월 1일 자정에 1회 실행
dag = DAG(
dag_id="dag_id",
# 매일 자정에 1회 실행
schedule_interval="@daily",
start_date=datetime(2019, 1, 1),
end_date=datetime(2019, 1, 5),
)
(3) 빈도 기반
- 이전 작업 실행 시점을 기억해야 하는 경우 timedelta를 사용
dag = DAG(
dag_id="dag_id",
# 시작 시간으로부터 3일에 한 번 실행
schedule_interval=dt.timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
)
3.2. 활용 방안
(1) 데이터 증분 가져오기
- 콘텍스트 변수(
logical_date
),data_interval_start
,data_interval_end
활용 - Extract : 동적으로 날짜를 참조하여 당일 데이터만 가져오기
- Load : 날짜 별로 데이터 적재 (데이터 파티셔닝)
- Transform : 날짜 별로 데이터 처리
(2) 과거 시점의 작업 실행
catchup=True
: 아직 실행되지 않은 과거 스케줄 간격을 예약하고 실행 (백필)catchup=False
: 가장 최근 스케줄 간격에 대해서만 실행
3.3. 태스크 디자인 핵심 원칙
(1) 원자성 (Atomicity)
- 모두 발생하거나 전혀 발생하지 않아야 함
- 처리 도중에 에러가 발생한다면 결과에 영향을 미치면 안 된다.
- 일반적으로 Operator는 원자성을 제공함
- 유연성이 높은 BashOperator, PythonOperator 사용 시에만 원자성을 고려해 작성해야 한다.
(2) 멱등성 (Idempotency)
- 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과가 동일해야 함
- 덮어쓰기(upsert) 등을 통해 유지 가능하다.
REFERENCES
- [서적] Apache Airflow 기반의 데이터 파이프라인 (제이펍, 2022)