- 센서를 사용한 폴링조건
- 다른 DAG를 트리거하기
센서를 사용한 폴링조건
- v2.7.1 버전 기준 9개의 센서 가 존재하고, 버전별로 센서가 정리되고 있다.
- 센서패키지에서 통합관리 되던게, provider로 흩어진걸로 보여짐
- ex) airflow.sensor.sqlsensor (v2.4.0) → airflow.providers.common.sql.sensors
- ex) airflow.sensor.hdfs_sensor → airflow.providers.apache.hdfs.sensors
- v.2.0.0 → 24개 ( Docs )
- v 2.7.1 → 9개 (Docs)
사용자 지정 조건 폴링
- pythonSensor를 사용하여 다양한 방법으로 센싱처리가 가능함 (python 로직이 추가되어 개발해서 센싱한다는 느낌)
- pythonOperator 로 직접 센서를 만드는것과 큰 차이가 있을지 의문이다.
원활하지 않는 흐름의 센서 처리
- Airflow의 최대 태스크의 수를 제한하여, 너무 많은 Airflow task가 생기는 것을 방지한다.
- Dag내 설정
- Concurrency : 동시에 실행가능한 태스크의 수를 결정함
- max_active_tasks_per_dag, max_active_runs_per_dag 등 필요에 따라 옵션 사용
- config 설정
- max_active_runs : DAG당 한 순간에 실행 가능한 DAG Run
- max_active_tasks, max_active_runs 등 필요에 따라 옵션 사용
- 해당 옵션들은 상황에 따라 자주 사용되는 옵션이므로 알아두는게 좋음
- 설정된 개수에 도달하면 더이상의 태스크를 실행하지 않으며, 이를 센서 데드록(sensor deadlock)상태 라고 함.
소스 적용 예시
Dag=DAG(
Dag_id="test_dag"
....
Concurrency=10 # 해당 옵션을 사용하여 동시에 실행가능한 태스크의 수를 결정함
)
- Sensor의 mode 옵션 ( Docs, 참고 )
- poke (default): The Sensor takes up a worker slot for its entire runtime
- reschedule: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks
- 모드별 동작 정리
- poke 모드는 sensor 작업이 완료되지 않았다면 주어진 인터벌만큼 sleep을 함 (=pool의 slot을 계속 차지)
- reschedule 모드는 sensor 작업이 완료되지 않았다면 다음 스케줄을 등록한 후, 무한루프를 빠져나가고 다음 인터벌에서 다시 execute 메소드를 호출
sensor 코드
# BaseSensorOperator 소스 init 부분
def __init__(
self,
*,
poke_interval: timedelta | float = 60,
timeout: timedelta | float = conf.getfloat("sensors", "default_timeout"),
soft_fail: bool = False,
mode: str = "poke",
exponential_backoff: bool = False,
max_wait: timedelta | float | None = None,
silent_fail: bool = False,
다른 DAG를 트리거하기
- Dag에서 다른 Dag를 호출하여 트리거(실행)하는 경우 TriggerDagRunOperator를 사용하여 호출함
TriggerDagRunOperator 사용방법 예시
# 아래 예시에는 trigger_test라는 TriggerDagRunOperator를 생성하고, 해당 Operator에서 target_dag라는 dag를 호출하여 실행함
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
call_trigger = TriggerDagRunOperator(
task_id='trigger_test'
trigger_dag_id='target_dag', # 호출할 DAG_ID 입력
trigger_run_id=None, # 실행중인 run_id를 입력하면 해당 run_id를 실행한다. None이나 default값은 자동 할당이다.
execution_date=None, # 실행 날짜를 지정한다.
reset_dag_run=False, # 해당 dag에 dag_run이 있을 경우 초기화 하고 다시 시작함
wait_for_completion=False, # DAG_run이 완료 될때 까지 기다린다.
poke_interval=60, # wait_for_completion을 true로 했다면, Trigger가 작동했는지 확인하는 간격을 지정 할 수 있다. 값을 정해주지 않는다면 기본값은 60이다.
allowed_states=["success"], # Trigger를 실행할 state를 작성한다.list로 작성해야하며 default는 success이다.
failed_states=None, # Trigger를 실행하지 않을 상태를 기입한다. list로 작성한다.
)
다른 DAG의 상태를 폴링하기
- ExternalTaskSensor를 사용하여 다른 DAG의 상태를 확인할수 있음
- 주의점 : execution_delta옵션에 타겟 DAG의 정확한 실행 시간을 입력해주어야함
REST/CLI를 이용해 워크플로 시작하기
- CLI(Docs) : airflow가 설치된 서버에서 command Line 명령어를 통하여 직접 DAG를 트리거 할 수 있음.
- API(redoc) : airflow에서 제공하는 REST API를 사용하여 DAG를 트리거 할 수 있음.
요약...
- 센서는 특정 조건이 참인지 여부를 지속적으로 확인(polling)하는 특수 유형 오퍼레이터 타입
- Airflow는 다양한 시스템 및 사용 요건에 맞는 센서를 제공. 사용자 지정 조건(custom condition)은 PythonSensor로 만들수 있음
- TriggerDagRunOperator는 다른 DAG를 트리거 할수 있음. ExternalTaskSensor는 다른 DAG의 상태를 확인할 수 있음
- REST API와 CLI를 사용하여 Airflow 외부에서 DAG를 트리거 할 수 있음
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 (0) | 2024.02.22 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 (0) | 2024.02.21 |
[Apache Airflow 기반의 데이터 파이프라인] 태스크간 의존성 정의하기 (0) | 2024.02.19 |
[Apache Airflow 기반의 데이터 파이프라인] Airflow 콘텍스트를 사용하여 태스크 템플릿 작성하기 (0) | 2024.02.17 |
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week7 (0) | 2023.10.12 |