본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인]운영환경에서 Airflow 관리

by 개복취 2024. 2. 27.

  1. 아키텍처
  2. 설치 방식
  3. 메타스토어
  4. 스케줄러
  5. 익스큐터 설치
  6. 프로세스 로그 확인
  7. 메트릭 시각화 및 모니터링

 

아키텍처

  • 최소 구성 요건 : 웹 서버  데이터베이스  스케줄러 → DAGs

https://mola2.tistory.com/14

  • 웹 서버
    • Airflow 프로세스
    • Airflow2에서 웹 서버는 데이터베이스에서 DAG 읽어내므로 DAG 파일 액세스 필요 없음
    • 파이프라인의 현재 상태에 대한 정보를 시각적으로 표시
    • 사용자가 DAG 트리거 같은 특정 태스크 수행할 수 있도록 관리
  • 스케줄러
    • Airflow 프로세스
    • DAG 정의가 있는 폴더에 엑세스할 수 있어야 함
    • DAG 파일 구문 분석, 비트 및 조각 추출, 메타 스토어에 저장
    • 실행할 태스크 결정 및 대기열에 배치
    • 대기 상태의 태스크 가져오기 및 실행
  • 데이터베이스
    • 웹 서버 및 스케줄러의 메타 데이터를 저장하는 별도의 서비스
    • Airflow 2에서 DAG는 데이터베이스에 직렬화된 형식으로 작성

 

설치 방식

  • 익스큐터 유형에 따라 다양한 설치 환경 구성할 수 있음
    • AIRFLOW__CORE__EXECUTOR 설정을 통해 익스큐터 지정 가능

 

익스큐터 분산환경 난이도 사용에 적합한 환경

SequentialExecutor 불가능 매우 쉬움 시연 / 테스트
LocalExecutor 불가능 쉬움 단일 호스트 환경 권장
CeleryExecutor 가능 보통 멀티 호스트 확장 고려 시
KubernetesExecutor 불가능 어려움 쿠버네티스 기반 컨테이너 환경 구성 고려 시
  • SequentialExecutor
    • 별도의 설정 / 환경 구성 없이 바로 실행시킬 수 있는 방법
    • 태스크 순차적으로 하도록 구성 (한 번에 하나의 태스크)
  • LocalExecutor
    • 여러 태스크 병렬 실행 가능 (최대 32개 병렬 프로세스 가능)
    • 워커 프로세스가 FIFO 적용 방식을 통해 대기열에서 실행할 태스크 등록
  • CeleryExecutor
    • 멀티 호스트에 태스크 분배하는 이유
      • 단일 시스템의 리소스 제한에 도달했을 때
      • 여러 시스템에서 태스크를 실행하여 병렬 실행을 원할 때
      • 태스크를 여러 시스템에 분산하여 작업 속도를 더 빠르게 실행하고 싶을 때
    • Celery를 이용하여 실행할 태스크들에 대해 대기열 등록
      • 대기열 메커니즘(Broker)을 위해 RabbitMQ, Redis, AWS SQS 지원
      • Flower 모니터링 도구 제공
      • 파이썬 라이브러리 형태로 제공
    • 모든 구성요소가 서로 다른 호스트에서 실행되기 때문에 작업 자체에 대한 부하가 LocalExecutor보다 낮음

 

메타스토어

  • Airflow는 SQLAlchemy를 사용하여 모든 데이터베이스 태스크 수행
    • SQLAlchemy에서 지원하는 데이터베이스만 사용 가능
    • MySQL, PostgreSQL 사용 권장
      • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 에 URI 형식으로 설정
        1. MySQL : mysql://username:password@localhost:3306/airflow
        2. PostgreSQL : postgres://username:password@localhost:5432/airflow
  • Airflow CLI 명령어
    • airflow db init : 빈 데이터베이스에 Airflow 데이터베이스 스키마 생성
    • airflow db reset : 기존 데이터베이스를 지우고 새로운 빈 데이터베이스 생성
    • airflow db upgrade : 변경된 데이터베이스 스키마 정보 데이터베이스에 적용, 데이터베이스 마이그레이션이 포함된 Airflow 버전 업그레이드 시 필수

스케줄러

  • Airflow는 Job 내에서 DAG의 모든 태스크를 실행
  • Airflow UI에서 SchedulerJob, LocalTaskJob, BackfillJob 등 확인 가능

SchedulerJob의 역할

DAG 프로세서

  • DAG 파일을 파싱하고 추출된 정보를 데이터베이스에 저장
  • 외부 소스에서 작업 구조가 변경되는 경우 동적으로 DAG가 생성되기 때문에 DAG 파일을 주기적으로 다시 확인하고 처리
  • 재처리를 많이 할 수록 CPU 성능이 많이 요구되므로 간격을 적절하게 조절
  • 관련 설정
    • AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL : 스케줄러 루프 완료 후 대기하는 시간
    • AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL : 파일이 처리되는 최소 간격(하한선)
    • AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL : DAG 폴더의 파일 리스트를 새로 고치는 최소 시간 (하한선)
    • AIRFLOW__SCHEDULER_PARSING_PROCESSES : DAG 파일을 구문 분석하는 데 사용할 최대 프로세스 수 (상한선)
    • AIRFLOW__SCHEDULER_PRINT_STATS_INTERVAL : DAG 파일 처리 통계 로그 출력하는 주기 (30초)
  • 모든 DAG 처리는 while True 루프 내에서 발생
  • /logs/dag_processor_manager/dag_processor_manager.log 에서 DAG 처리 로그 확인 가능

태스크 스케줄러

  • 실행할 태스크 인스턴스 결정
  • while True 루프를 통해 각 태스크 인스턴스에 대해 다음을 확인
    • 모든 태스크가 종속성이 충족되는가?
    • 모든 태스크가 정상적으로 마지막 단계까지 진행되었는가?
  • 모든 조건 충족 시 태스크 성공 수행 인지 → 다음 작업 일정에 대한 예약 수행
  • 태스크가 예약 상태에서 대기 상태로 전환되는 조건 결정
  • 다음 조건을 확인 후 만족 시 예약 태스크를 대기열에 푸시
    • 슬롯에 여유가 있는가?
    • 다른 작업보다 우선 순위가 높은가? (priority weight)
  • 스케줄러는 대시 상태까지만 태스크 담당
    • 이후는 익스큐터가 대기열에서 태스크 인스턴스를 읽고 워커에서 태스크 실행

태스크 익스큐터

  • 대기열에 태스크 인스턴스가 보내질 때까지 대기
  • Airflow 메타스토어에 각 태스크 인스턴스 상태 변경 상황을 항상 등록
  • 익스큐터 내에서 태스크 실행 = 태스크를 실행할 새 프로세스를 만드는 것
    • 태스크가 실패하더라도 Airflow가 중단되지 않도록 함

 

익스큐터 설치

LocalExecutor

  • 병렬 태스크 수 제한 방법
    • AIRFLOW__CORE__PARALLELISM : 최대 하위 프로세스 생성 수 설정 (32)
    • AIRFLOW__CORE__DAG_CONCURRENCY : DAG 당 대기 중 또는 실행 중 상태에 있는 최대 태스크 수 (16)
    • AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG : DAG 당 최대 병렬 DAG 실행 수 (16)
  • 데이터베이스 시스템에 대한 추가 종속성을 설치해야 함

CeleryExecutor

  • 브로커 설정: AIRFLOW__CELERY__BROKER_URL
  • 메타스토어와 통신: AIRFLOW__CELERY__RESULT_BACKEND

KubernetesExecutor

  • Airflow 프로세스 간에 DAG 파일 배포 방법
    • PersistentVolume 사용하여 포드 간에 DAG 공유
    • Git-sync init container를 사용해 리포지토리의 최신 DAG 코드 가져오기
    • Docker 이미지에 DAG 빌드
  • CeleryKubernetesExecutor

 

프로세스 로그 확인

웹 서버 로그

  • 웹 서버 종료 후에도 로그 보존하려면 웹 서버 시작 시 플래그 적용
    • 액세스 로그와 오류 → airflow webserver — access_logfile [filename]
    • 시스템 정보 보관하는 오류 로그 → airflow webserver — error_logfile [filename]

스케줄러 로그

  • $AIRFLOW_HOME/logs/scheduler: 디렉터리에 DAG별 로그 파일 생성

태스크 로그

원격 저장소로 로그 전송

  • AIRFLOW__CORE__REMOTE_LOGGING
  • AIRFLOW__CORE__REMOTE_LOG_CONN_ID

 

메트릭 시각화 및 모니터링

  • Prometheus, Grafana 사용

모니터링할 지표

  • Latency : 서비스 요청이 얼마나 걸리는지?
    • 웹 서버가 응답하는 데 걸린 시간
    • 스케줄러가 태스크를 대기 상태에서 실행 상태로 이동하는 데 걸리는 시간
  • Traffic : 시스템에 얼마나 많은 수요가 몰리고 있는지?
    • 시스템이 처리해야 하는 태스크 수
    • Ariflow가 사용 가능한 open pool 슬롯 수
  • Errors : 어떤 오류가 발생했는지?
    • 좀비 프로세스 수
    • 웹 서버의 http 200이 아닌 응답 수
    • 시간 초과된 태스크 수
  • Saturation : 시스템 자원의 어느 부분이 활용되고 있는지?
    • 현재 CPU 부하
    • 현재 실행중인 태스크 수

실패 알림

  • DAG 및 오퍼레이터 내에서 콜백
    • on_failure_callback=함수명
  • 이메일 자동 전송
    • AIRFLOW__SMTP__SMTP_HOST
    • AIRFLOW__SMTP__SMTP_MAIL_FROM
    • AIRFLOW__SMTP__SMTP_PASSWORD