5 분 소요

Airflow 초급 1 :

Airflow의 특징, DAG 작성 순서, 스케줄링, 디자인 원칙

1. Airflow 소개

1.1. Airflow 특징

  • 파이썬 코드로 유연한 파이프라인 정의
  • 파이프라인 스케줄링 및 실행
    • 스케줄러 : DAG 분석 및 워커에 태스크 예약
    • 워커 : 예약된 태스크 선택 및 실행
    • 웹 서버 : 스케줄러가 분석한 DAG 시각화, DAG 실행과 결과 확인 인터페이스
  • 파이프라인 모니터링 및 실패 처리
    • 재시도, 알림, 트리거 규칙 등
  • 점진적 로딩 및 백필

1.2. Airflow 가 적합하지 않은 경우

  • Airflow는 배치 태스크에 초점이 맞춰져 있기 때문에 스트리밍 워크플로에는 적합하지 않음
  • 추가 및 삭제 태스크가 빈번한 동적 파이프라인에 적합하지 않음
    • DAG가 실행되는 동안 구조가 변경되지 않아야 한다.

2. DAG

2.1. DAG란?

  • 방향성 비순환 그래프로 Airflow의 워크플로를 표시할 때 사용

2.2. DAG 작성 순서

  • DAG 객체 인스턴스 생성
    • 모든 워크플로의 시작점으로 dag_id, start_date, schedule_interval 등 지정
  • Operator 객체 인스턴스 생성을 통한 태스크 지정
  • 태스크 실행 순서 정의
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def _task_2():
    pass

# DAG 객체 인스턴스 생성
dag = DAG(
    dag_id="dag_id",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval=None,
)

# Operator 객체 인스턴스 생성을 통한 태스크 지정
task_1 = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

task_2= PythonOperator(
    task_id="task_2", python_callable=_task_2, dag=dag
)

# 태스크 실행 순서 정의
task_1 >> task_2

2.3. 태스크와 오퍼레이터

  • DAG : 오퍼레이터 집합에 대한 실행을 오케스트레이션하는 역할
    • 오퍼레이터 시작, 정지, 완료 시 다음 태스크의 시작, 의존성 보장 등이 있다.
  • 오퍼레이터 : 단일 작업 수행 역할
    • BashOperator, PythonOperator, HTTPOperator 등이 있다.
  • 태스크 : 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 매니저(래퍼)
    • 오퍼레이터의 상태 관리, 사용자에게 상태 변경 표시하는 Airflow 내장 컴포넌트이다.

3. 스케줄링

3.1. 스케줄링 방법

  • start_date
    • 필수 인자로 실제로 DAG를 실행시키는 일자가 아니다.
    • 처리할 데이터 기간(Time Window)의 시작 일자라고 보는 게 낫다.
    • 2023.09.09에 2023.09.10라고 지정하면 2023.09.10 자정에 실행되지 않는다.
    • 2023.09.10의 모든 데이터를 모두 수집한 이후인 2023.09.11 자정에 DAG를 실행한다.

(1) Cron 기반

  • "분 시 일 월 요일"
    
    • 0 * * * * = 매시간 정시에 실행
    • 0 0 * * 0 = 매주 일요일 자정에 실행
dag = DAG(
    dag_id="dag_id",
    schedule_interval="0 * * * *",
    start_date=datetime(2023, 9, 10),
    end_date=datetime(2023, 9, 15),
)

(2) 매크로 기반

  • @once : 1회만 실행
  • @hourly : 매시간 변경 시 1회 실행
  • @weekly : 매주 일요일 자정에 1회 실행
  • @monthly : 매월 1일 자정에 1회 실행
dag = DAG(
    dag_id="dag_id",
		# 매일 자정에 1회 실행
    schedule_interval="@daily",
    start_date=datetime(2019, 1, 1),
    end_date=datetime(2019, 1, 5),
)

(3) 빈도 기반

  • 이전 작업 실행 시점을 기억해야 하는 경우 timedelta를 사용
dag = DAG(
    dag_id="dag_id",
		# 시작 시간으로부터 3일에 한 번 실행
    schedule_interval=dt.timedelta(days=3),
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

3.2. 활용 방안

(1) 데이터 증분 가져오기

  • 콘텍스트 변수(logical_date), data_interval_start, data_interval_end 활용
  • Extract : 동적으로 날짜를 참조하여 당일 데이터만 가져오기
  • Load : 날짜 별로 데이터 적재 (데이터 파티셔닝)
  • Transform : 날짜 별로 데이터 처리

(2) 과거 시점의 작업 실행

  • catchup=True : 아직 실행되지 않은 과거 스케줄 간격을 예약하고 실행 (백필)
  • catchup=False : 가장 최근 스케줄 간격에 대해서만 실행

3.3. 태스크 디자인 핵심 원칙

(1) 원자성 (Atomicity)

  • 모두 발생하거나 전혀 발생하지 않아야 함
    • 처리 도중에 에러가 발생한다면 결과에 영향을 미치면 안 된다.
  • 일반적으로 Operator는 원자성을 제공함
    • 유연성이 높은 BashOperator, PythonOperator 사용 시에만 원자성을 고려해 작성해야 한다.

(2) 멱등성 (Idempotency)

  • 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과가 동일해야 함
    • 덮어쓰기(upsert) 등을 통해 유지 가능하다.

REFERENCES

  • [서적] Apache Airflow 기반의 데이터 파이프라인 (제이펍, 2022)