- 아키텍처
- 설치 방식
- 메타스토어
- 스케줄러
- 익스큐터 설치
- 프로세스 로그 확인
- 메트릭 시각화 및 모니터링
아키텍처
- 최소 구성 요건 : 웹 서버 → 데이터베이스 ← 스케줄러 → DAGs
- 웹 서버
- Airflow 프로세스
- Airflow2에서 웹 서버는 데이터베이스에서 DAG 읽어내므로 DAG 파일 액세스 필요 없음
- 파이프라인의 현재 상태에 대한 정보를 시각적으로 표시
- 사용자가 DAG 트리거 같은 특정 태스크 수행할 수 있도록 관리
- 스케줄러
- Airflow 프로세스
- DAG 정의가 있는 폴더에 엑세스할 수 있어야 함
- DAG 파일 구문 분석, 비트 및 조각 추출, 메타 스토어에 저장
- 실행할 태스크 결정 및 대기열에 배치
- 대기 상태의 태스크 가져오기 및 실행
- 데이터베이스
- 웹 서버 및 스케줄러의 메타 데이터를 저장하는 별도의 서비스
- Airflow 2에서 DAG는 데이터베이스에 직렬화된 형식으로 작성
설치 방식
- 익스큐터 유형에 따라 다양한 설치 환경 구성할 수 있음
- AIRFLOW__CORE__EXECUTOR 설정을 통해 익스큐터 지정 가능
익스큐터 분산환경 난이도 사용에 적합한 환경
SequentialExecutor | 불가능 | 매우 쉬움 | 시연 / 테스트 |
LocalExecutor | 불가능 | 쉬움 | 단일 호스트 환경 권장 |
CeleryExecutor | 가능 | 보통 | 멀티 호스트 확장 고려 시 |
KubernetesExecutor | 불가능 | 어려움 | 쿠버네티스 기반 컨테이너 환경 구성 고려 시 |
- SequentialExecutor
- 별도의 설정 / 환경 구성 없이 바로 실행시킬 수 있는 방법
- 태스크 순차적으로 하도록 구성 (한 번에 하나의 태스크)
- LocalExecutor
- 여러 태스크 병렬 실행 가능 (최대 32개 병렬 프로세스 가능)
- 워커 프로세스가 FIFO 적용 방식을 통해 대기열에서 실행할 태스크 등록
- CeleryExecutor
- 멀티 호스트에 태스크 분배하는 이유
- 단일 시스템의 리소스 제한에 도달했을 때
- 여러 시스템에서 태스크를 실행하여 병렬 실행을 원할 때
- 태스크를 여러 시스템에 분산하여 작업 속도를 더 빠르게 실행하고 싶을 때
- Celery를 이용하여 실행할 태스크들에 대해 대기열 등록
- 대기열 메커니즘(Broker)을 위해 RabbitMQ, Redis, AWS SQS 지원
- Flower 모니터링 도구 제공
- 파이썬 라이브러리 형태로 제공
- 모든 구성요소가 서로 다른 호스트에서 실행되기 때문에 작업 자체에 대한 부하가 LocalExecutor보다 낮음
- 멀티 호스트에 태스크 분배하는 이유
메타스토어
- Airflow는 SQLAlchemy를 사용하여 모든 데이터베이스 태스크 수행
- SQLAlchemy에서 지원하는 데이터베이스만 사용 가능
- MySQL, PostgreSQL 사용 권장
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 에 URI 형식으로 설정
- MySQL : mysql://username:password@localhost:3306/airflow
- PostgreSQL : postgres://username:password@localhost:5432/airflow
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 에 URI 형식으로 설정
- Airflow CLI 명령어
- airflow db init : 빈 데이터베이스에 Airflow 데이터베이스 스키마 생성
- airflow db reset : 기존 데이터베이스를 지우고 새로운 빈 데이터베이스 생성
- airflow db upgrade : 변경된 데이터베이스 스키마 정보 데이터베이스에 적용, 데이터베이스 마이그레이션이 포함된 Airflow 버전 업그레이드 시 필수
스케줄러
- Airflow는 Job 내에서 DAG의 모든 태스크를 실행
- Airflow UI에서 SchedulerJob, LocalTaskJob, BackfillJob 등 확인 가능
SchedulerJob의 역할
DAG 프로세서
- DAG 파일을 파싱하고 추출된 정보를 데이터베이스에 저장
- 외부 소스에서 작업 구조가 변경되는 경우 동적으로 DAG가 생성되기 때문에 DAG 파일을 주기적으로 다시 확인하고 처리
- 재처리를 많이 할 수록 CPU 성능이 많이 요구되므로 간격을 적절하게 조절
- 관련 설정
- AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL : 스케줄러 루프 완료 후 대기하는 시간
- AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL : 파일이 처리되는 최소 간격(하한선)
- AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL : DAG 폴더의 파일 리스트를 새로 고치는 최소 시간 (하한선)
- AIRFLOW__SCHEDULER_PARSING_PROCESSES : DAG 파일을 구문 분석하는 데 사용할 최대 프로세스 수 (상한선)
- AIRFLOW__SCHEDULER_PRINT_STATS_INTERVAL : DAG 파일 처리 통계 로그 출력하는 주기 (30초)
- 모든 DAG 처리는 while True 루프 내에서 발생
- /logs/dag_processor_manager/dag_processor_manager.log 에서 DAG 처리 로그 확인 가능
태스크 스케줄러
- 실행할 태스크 인스턴스 결정
- while True 루프를 통해 각 태스크 인스턴스에 대해 다음을 확인
- 모든 태스크가 종속성이 충족되는가?
- 모든 태스크가 정상적으로 마지막 단계까지 진행되었는가?
- 모든 조건 충족 시 태스크 성공 수행 인지 → 다음 작업 일정에 대한 예약 수행
- 태스크가 예약 상태에서 대기 상태로 전환되는 조건 결정
- 다음 조건을 확인 후 만족 시 예약 태스크를 대기열에 푸시
- 슬롯에 여유가 있는가?
- 다른 작업보다 우선 순위가 높은가? (priority weight)
- 스케줄러는 대시 상태까지만 태스크 담당
- 이후는 익스큐터가 대기열에서 태스크 인스턴스를 읽고 워커에서 태스크 실행
태스크 익스큐터
- 대기열에 태스크 인스턴스가 보내질 때까지 대기
- Airflow 메타스토어에 각 태스크 인스턴스 상태 변경 상황을 항상 등록
- 익스큐터 내에서 태스크 실행 = 태스크를 실행할 새 프로세스를 만드는 것
- 태스크가 실패하더라도 Airflow가 중단되지 않도록 함
익스큐터 설치
LocalExecutor
- 병렬 태스크 수 제한 방법
- AIRFLOW__CORE__PARALLELISM : 최대 하위 프로세스 생성 수 설정 (32)
- AIRFLOW__CORE__DAG_CONCURRENCY : DAG 당 대기 중 또는 실행 중 상태에 있는 최대 태스크 수 (16)
- AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG : DAG 당 최대 병렬 DAG 실행 수 (16)
- 데이터베이스 시스템에 대한 추가 종속성을 설치해야 함
CeleryExecutor
- 브로커 설정: AIRFLOW__CELERY__BROKER_URL
- 메타스토어와 통신: AIRFLOW__CELERY__RESULT_BACKEND
KubernetesExecutor
- Airflow 프로세스 간에 DAG 파일 배포 방법
- PersistentVolume 사용하여 포드 간에 DAG 공유
- Git-sync init container를 사용해 리포지토리의 최신 DAG 코드 가져오기
- Docker 이미지에 DAG 빌드
- CeleryKubernetesExecutor
프로세스 로그 확인
웹 서버 로그
- 웹 서버 종료 후에도 로그 보존하려면 웹 서버 시작 시 플래그 적용
- 액세스 로그와 오류 → airflow webserver — access_logfile [filename]
- 시스템 정보 보관하는 오류 로그 → airflow webserver — error_logfile [filename]
스케줄러 로그
- $AIRFLOW_HOME/logs/scheduler: 디렉터리에 DAG별 로그 파일 생성
태스크 로그
원격 저장소로 로그 전송
- AIRFLOW__CORE__REMOTE_LOGGING
- AIRFLOW__CORE__REMOTE_LOG_CONN_ID
메트릭 시각화 및 모니터링
- Prometheus, Grafana 사용
모니터링할 지표
- Latency : 서비스 요청이 얼마나 걸리는지?
- 웹 서버가 응답하는 데 걸린 시간
- 스케줄러가 태스크를 대기 상태에서 실행 상태로 이동하는 데 걸리는 시간
- Traffic : 시스템에 얼마나 많은 수요가 몰리고 있는지?
- 시스템이 처리해야 하는 태스크 수
- Ariflow가 사용 가능한 open pool 슬롯 수
- Errors : 어떤 오류가 발생했는지?
- 좀비 프로세스 수
- 웹 서버의 http 200이 아닌 응답 수
- 시간 초과된 태스크 수
- Saturation : 시스템 자원의 어느 부분이 활용되고 있는지?
- 현재 CPU 부하
- 현재 실행중인 태스크 수
실패 알림
- DAG 및 오퍼레이터 내에서 콜백
- on_failure_callback=함수명
- 이메일 자동 전송
- AIRFLOW__SMTP__SMTP_HOST
- AIRFLOW__SMTP__SMTP_MAIL_FROM
- AIRFLOW__SMTP__SMTP_PASSWORD
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인] 모범사례 (1) | 2024.02.26 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기 (0) | 2024.02.23 |
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 (0) | 2024.02.22 |
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 (0) | 2024.02.21 |
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 (0) | 2024.02.20 |