본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 태스크간 의존성 정의하기

by 개복취 2024. 2. 19.

  1. 기본 의존성 유형
  2. 브랜치하기
  3. 조건부 태스크
  4. XCom(cross-communication)을 사용하여 데이터를 공유하기
  5. XCom 사용시 고려사항

 

기본 의존성 유형

 

  • 선형체인(linear chain) : 연속적으로 실행되는 작업

  • 팬아웃/팬인 (fan-out/fan-in) : 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대의 동작을 수행하는 유형

 

선형 의존성 유형

download_launches >> get_pictures # 작업 의존성을 각각 설정하기
get_pictures >> notify

download_launches >> get_pictures >> notify # 또는 여러 개의 의존성을 설정할수 있다.
  • 태스크 의존성을 통해 Airflow 업스트림 의존성이 성공적으로 실행된 뒤 다음 태스크 실행을 시작할 수 있다.
  • 태스크 의존성을 명시적으로 지정하면 여러 태스크에서 (암묵적인)순서가 명확하게 정의된다.
  • 이를 통해 Airflow는 의존성이 충족된 경우에만 다음 태스크를 스케쥴할 수 있다.
    • 가령, Cron 으로 개별 태스크를 차례로 스케쥴하고 두 번째 태스크가 시작될 때 이전 태스크가 완료되기를 기다리는 것보다 확실하게 처리할 수 있다.

팬아웃/팬인 유형

  • 태스크 간 복잡한 의존성 관계를 만들 수 있다. 두 데이터 세트를 결합하여 모델을 학습하는것이다.
  • 팬아웃 의존성에 추가하기 위해 DummyOperator을 사용한다.

팬아웃(일 대 다) 의존성 추가하기

fetch_weather >> clean_weather
fetch_sales >> clean_sales
from airflow.operators.dummy import DummyOperator

start=DummyOperator(task_id="start") # 더미 시작 태스크 생성하기
start >> [fetch_weather, fetch_sales] # 팬아웃(일 대 다) 의존성 태스크 생성하기

 

팬 인(다 대 일) 의존성 추가하기

[clean_weather, clean_sales] >> join_datasets

 

브랜치하기

  • 태스크 내에서 브랜치하기: 수집 태스크를 다시 작성하여 실행 날짜를 통해 판매 데이터 수집 및 처리를 위한 개별코드로 분리한다. 일반적인 PythonOperator와 같은 일반적인 Airflow 오퍼레이터로 대체하여 태스크에 유연하게 대처할 수 있다.
  • DAG 내부에서 브랜치하기: 두 개의 개별 태스크 세트를 개발하고 DAG가 이전 또는 새로운 ERP시스템에서 데이터 수집 작업을 실행을 선택할 수 있도록 하는것이다. 나머지 태스크를 DAG에 연결하고 Airflow가 언제 작업을 실행해야 하는지 확인한다. 
    • Airflow는 다운스트림 태스크 세트중 선택할 수 있는 기능을 BranchPythonOperator을 통해 제공한다. 이는 PythonOperator과 같이 파이썬 콜러블 인수를 사용할 수 있다.

def _pick_erp_system(**context):
		...
	pick_erp_system = BranchPythonOperator(
		task_id = "pick_erp_system",
		python_callable = _pick_erp_system,
)

 

 

조건부 태스크

  • 특정 조건에 따라 DAG에서 특정 태스크를 건너뛸 수 있는 다른 방법도 제공한다.
  • 특정 데이터 세트를 사용할 수 있을 때만 실행하거나 최근에 실행된 DAG인 경우만 태스크를 실행할 수 있다.

당일에 학습한 모델만 배포할 때

def _latest_only(**context):
        ...
    latest_only=PythonOperator(
        task_id = "latest_only",
        python_callable = _latest_only
)

latest_only >> deploy_model

 

내장 오퍼레이터 사용하기

from airflow.operators.latest_only import LatestOnlyOperator

latest_only = LatestOnlyOperator(
    task_id="latest_only",
    dag=dag
)
  • LatestOnlyOperator을 사용하면 조건부 배포를 구현하기 위해 복잡한 로직을 작성할 필요가 없다.
  • 더 복잡한 경우에는 PythonOperator으로 구현하는것이 더 효율적이다.

 

트리거 규칙에 대한 추가 정보

  • 태스크의 의존성 기능처럼 Airflow가 태스크가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건이다. Airflow의 기본 트리거 규칙은 all_success 이며, 태스크를 실행하려면 모든 의존적인 태스크가 모두 성공적으로 완료되어야 함을 의미한다.

태스크 간 데이터 공유

  • Airflow의 XCom을 사용하여 태스크 간에 작은 데이터를 공유할 수 있다. XCom은 기본적으로 태스크 간에 메세지를 교환하여 특정 상태를 공유할 수 있게 한다.

XCom(cross-communication)을 사용하여 데이터를 공유하기

def _train_model(**context):
    model_id=str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)

train_model=PythonOperator(
    task_id = "train_model"
    python_callable = _train_model
)
  • xcom_push에 대한 이 호출은 Airflow가 해당 태스크(train_model)와 해당 DAG 및 실행 날짜에 대한 XCom의 값으로 model_id 값을 등록할 수 있도록한다. (WebInterface Admin > XCom으로 확인 가능하다.)
  • xcom_push 와는 반대로 xcom_pull 메서드를 사용하여 다른 태스크에서 XCom 값을 확인할 수 있다.
def _deploy_model(**context)
    model_id=context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}")

deploy_model=PythonOperator(
    task_id ="deploy model",
    python_callable=_deploy_model
)
  • Airflow가 train_model 태스크에서 게시한 model_id와 일치하는 XCom 값을 가져오도록 지시한다. xcom_pull을 통해 XCom 값을 가져올 때 dag_id 및 실행 날짜를 정의한다.
  • 이 매개변수는 디폴트로 현재 DAG와 실행 날짜로 설정된다. 따라서, xcom_pull은 현재 DAG실행을 통해 게시된 값만 가져온다.
def _deploy_model(templates_dict, **context)
    model_id=templates_dict["model_id"]
    print(f"Deploying model {model_id}")

deploy_model=PythonOperator(
    task_id ="deploy model",
    python_callable=_deploy_model,
    templates_dict={
        "model_id": "{{task_instance.xcom_pull(
        task_ids='train_model', key='model_id')}}"
    },
)
  • 템플릿에서 XCom 값을 참조할 수 있다.

XCom 사용시 고려사항

  • XCom 은 유용해 보이지만, 단점이 존재한다.
    1. 풀링 태스크는 필요한 값을 사용하기 위해 태스크 간에 묵시적인 의존성(implicit dependency)이 필요하다.
      명시적 의존성 태스크(explicit task)와 달리 DAG에 표시되지 않으며 태스크 스케줄 시 고려되지 않는다.
    2. 따라서, XCom에 의해 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야한다. (Airflow는 이를 고려하지 않음, 숨겨진 의존성은 서로 다른 DAG에서 실행 날짜 사이에 XCom값을 공유할 때 훨씬 더 복잡하기 때문에 권장하지 않음)
    3. 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 수 있다. 가령, API 접근 토큰을 가져와서 다음 태스크에 XCom을 이용해 전달하려 할 때, 토큰 사용 시간이 만료되어 두 번째 태스크를 재실행 하지 못할 수 있을 것이다. (API 토큰 새로고침을 두번째 태스크에 수행하는 것이 원자성을 유지할 수 있다.)
    4. XCom이 저장하는 모든 값은 직렬화(serialization)를 지원해야 한다. 람다 또는 다중 멀티프로세스 관련 클래스 같은 파이썬 유형은 XCom에 저장할 수 없다.
    5. 사용되는 백엔드에 의해 XCom 값의 저장 크기가 제한될 수 있다. 기본적으로 XCom은 Airflow의 메타스토어에 저장되며 크기가 다음과 같이 제한된다.
      • SQLite - 2GB
      • PostgreSQL - 1GB
      • MySQL - 64kb

커스텀 XCom백엔드 사용하기

  • 따라서 Airflow 메타스토어를 사용하여 XCom을 저장 시에 제한 사항이나 큰 데이터 볼륨을 저장할 수 없다. XCom은 일반적으로 작은 값이나 결과값을 저장하는데 사용되며 큰 데이터 세트를 저장하는데 사용되지 않음
from typing import Any
from airflow.models.xcom import BaseXCom

class CustomXComBackend(BaseXCom):
    @staticmethod
    def serialize_value(value: Any):
        ...
    @staticmethod
    def deserialize_value(result) -> Any:
  • 커스텀 백엔드 클래스에서 직렬화 메서드XCom 값이 오퍼레이터 내에서 게시될 때마다 호출되지만, 역-직렬화 메서드XCom값이 백엔드에서 가져올 때 호출됩니다.
  • 커스텀 백엔드 클래스를 통해 클라우드 스토리지에 더 큰 XCom 값이 저장될 수 있도록 한다.

Taskflow API로 파이썬 태스크 연결하기

  • @task 사용해서 XCom으로 명시적으로 게시하지 않고 간단하게 모델 ID를 함수로부터 반환하여 다음 태스크로 전달할 수 있도록 한다.
  • 데커레이트 된 train_model 이라는 함수를 호출할 때 train_model 태스크를 위한 새로운 오퍼레이터 인스턴스를 생성한다. 데커레이트 된 train_model 함수를 호출하면 train_model 태스크에 대한 새로운 오퍼레이터 인스턴스가 생성된다.
  • train_model 함수의 return 문에서 Airflow는 태스크에서 반환된 XCom으로 자동 등록되는 값을 반환한다.
  • deploy_model 태스크는 데커레이트 된 함수를 호출하여 오퍼레이터 인스턴스의 생성뿐만 아니라 train_model 태스크의 model_id의 출력도 전달한다.
  • 이를 통해, Airflow에게 train_model의 model_id 출력이 데커레이트 된 deploy_model 함수에 인수로 전달되어야 한다고 알려준다.

장점:

  • 태스크 간 작업 결과 데이터를 전달하는 PythonOperator를 많이 사용하는 DAG를 간소화 한다.
  • 해당 함수 내의 태스크 간의 의존성을 숨기지 않고 태스크 간의 값을 명시적으로 전달함으로 해결할 수 있다.

단점:

  • PythonOperator를 사용하여 구현되는 파이썬 태스크로 제한된다. 따라서, 다른 Airflow 오퍼레이터와 관련된 태스크는 일반 API를 사용하여 태스크 및 태스크 의존성을 정의해야 한다.
  • taskflow 유형 태스크 간에 전달된 데이터는 XCom을 통해 저장되어야 한다. 즉, 전달된 모든 값은 XCom의 제약 사항(직렬화가 가능해야 함)이 적용된다. 또한 태스크 간에 전달되는 데이터 세트의 크기는 XCom의 백엔드에 의해 제한될 수 있습니다.