본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거

by 개복취 2024. 2. 20.

 

  1. 센서를 사용한 폴링조건
  2. 다른 DAG를 트리거하기

 

센서를 사용한 폴링조건

  • v2.7.1 버전 기준 9개의 센서 가 존재하고, 버전별로 센서가 정리되고 있다.
    • 센서패키지에서 통합관리 되던게, provider로 흩어진걸로 보여짐
    • ex) airflow.sensor.sqlsensor (v2.4.0) → airflow.providers.common.sql.sensors
    • ex) airflow.sensor.hdfs_sensor → airflow.providers.apache.hdfs.sensors
  • v.2.0.0 → 24개 ( Docs )
  • v 2.7.1 → 9개 (Docs)

사용자 지정 조건 폴링

  • pythonSensor를 사용하여 다양한 방법으로 센싱처리가 가능함 (python 로직이 추가되어 개발해서 센싱한다는 느낌)
  • pythonOperator 로 직접 센서를 만드는것과 큰 차이가 있을지 의문이다.

원활하지 않는 흐름의 센서 처리

  • Airflow의 최대 태스크의 수를 제한하여, 너무 많은 Airflow task가 생기는 것을 방지한다.

https://velog.io/@jaytiger/Airflow-Concurrency

  • Dag내 설정
    • Concurrency : 동시에 실행가능한 태스크의 수를 결정함
    • max_active_tasks_per_dag, max_active_runs_per_dag 등 필요에 따라 옵션 사용
  • config 설정
    • max_active_runs : DAG당 한 순간에 실행 가능한 DAG Run
    • max_active_tasks, max_active_runs 등 필요에 따라 옵션 사용
  • 해당 옵션들은 상황에 따라 자주 사용되는 옵션이므로 알아두는게 좋음
  • 설정된 개수에 도달하면 더이상의 태스크를 실행하지 않으며, 이를 센서 데드록(sensor deadlock)상태 라고 함.

 

소스 적용 예시

Dag=DAG(
    Dag_id="test_dag"
    ....
    Concurrency=10 # 해당 옵션을 사용하여 동시에 실행가능한 태스크의 수를 결정함 
)
  • Sensor의 mode 옵션 ( Docs, 참고 )
    • poke (default): The Sensor takes up a worker slot for its entire runtime
    • reschedule: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks
  • 모드별 동작 정리
    • poke 모드는 sensor 작업이 완료되지 않았다면 주어진 인터벌만큼 sleep을 함 (=pool의 slot을 계속 차지)
    • reschedule 모드는 sensor 작업이 완료되지 않았다면 다음 스케줄을 등록한 후, 무한루프를 빠져나가고 다음 인터벌에서 다시 execute 메소드를 호출

sensor 코드

# BaseSensorOperator 소스 init 부분

def __init__(
        self,
        *,
        poke_interval: timedelta | float = 60,
        timeout: timedelta | float = conf.getfloat("sensors", "default_timeout"),
        soft_fail: bool = False,
        mode: str = "poke",
        exponential_backoff: bool = False,
        max_wait: timedelta | float | None = None,
        silent_fail: bool = False,

 

다른 DAG를 트리거하기

  • Dag에서 다른 Dag를 호출하여 트리거(실행)하는 경우 TriggerDagRunOperator를 사용하여 호출함

TriggerDagRunOperator 사용방법 예시

# 아래 예시에는 trigger_test라는 TriggerDagRunOperator를 생성하고, 해당 Operator에서 target_dag라는 dag를 호출하여 실행함

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

call_trigger = TriggerDagRunOperator(
        task_id='trigger_test' 
        trigger_dag_id='target_dag', # 호출할 DAG_ID 입력
        trigger_run_id=None, # 실행중인 run_id를 입력하면 해당 run_id를 실행한다. None이나 default값은 자동 할당이다.
        execution_date=None, # 실행 날짜를 지정한다.
        reset_dag_run=False, # 해당 dag에 dag_run이 있을 경우 초기화 하고 다시 시작함
        wait_for_completion=False, # DAG_run이 완료 될때 까지 기다린다.
        poke_interval=60, # wait_for_completion을 true로 했다면, Trigger가 작동했는지 확인하는 간격을 지정 할 수 있다. 값을 정해주지 않는다면 기본값은 60이다.
        allowed_states=["success"], # Trigger를 실행할 state를 작성한다.list로 작성해야하며 default는 success이다.
        failed_states=None, # Trigger를 실행하지 않을 상태를 기입한다. list로 작성한다.
    )

다른 DAG의 상태를 폴링하기

  • ExternalTaskSensor를 사용하여 다른 DAG의 상태를 확인할수 있음
  • 주의점 : execution_delta옵션에 타겟 DAG의 정확한 실행 시간을 입력해주어야함

REST/CLI를 이용해 워크플로 시작하기

  • CLI(Docs) : airflow가 설치된 서버에서 command Line 명령어를 통하여 직접 DAG를 트리거 할 수 있음.
  • API(redoc) : airflow에서 제공하는 REST API를 사용하여 DAG를 트리거 할 수 있음.

요약...

  • 센서는 특정 조건이 참인지 여부를 지속적으로 확인(polling)하는 특수 유형 오퍼레이터 타입
  • Airflow는 다양한 시스템 및 사용 요건에 맞는 센서를 제공. 사용자 지정 조건(custom condition)은 PythonSensor로 만들수 있음
  • TriggerDagRunOperator는 다른 DAG를 트리거 할수 있음. ExternalTaskSensor는 다른 DAG의 상태를 확인할 수 있음
  • REST API와 CLI를 사용하여 Airflow 외부에서 DAG를 트리거 할 수 있음