본문 바로가기

Airflow15

[Apache Airflow 기반의 데이터 파이프라인]운영환경에서 Airflow 관리 아키텍처 설치 방식 메타스토어 스케줄러 익스큐터 설치 프로세스 로그 확인 메트릭 시각화 및 모니터링 아키텍처 최소 구성 요건 : 웹 서버 → 데이터베이스 ← 스케줄러 → DAGs 웹 서버 Airflow 프로세스 Airflow2에서 웹 서버는 데이터베이스에서 DAG 읽어내므로 DAG 파일 액세스 필요 없음 파이프라인의 현재 상태에 대한 정보를 시각적으로 표시 사용자가 DAG 트리거 같은 특정 태스크 수행할 수 있도록 관리 스케줄러 Airflow 프로세스 DAG 정의가 있는 폴더에 엑세스할 수 있어야 함 DAG 파일 구문 분석, 비트 및 조각 추출, 메타 스토어에 저장 실행할 태스크 결정 및 대기열에 배치 대기 상태의 태스크 가져오기 및 실행 데이터베이스 웹 서버 및 스케줄러의 메타 데이터를 저장하는 별도의.. 2024. 2. 27.
[Apache Airflow 기반의 데이터 파이프라인] 모범사례 깔끔한 DAG 작성 재현 가능한 태스크 설계 효율적인 데이터 처리 자원관리 깔끔한 DAG 작성 스타일 가이드 사용 사람마다 코딩 스타일이 다르기 때문에 코드 스타일을 지정하여 개발하여 가독성을 높임 일반적으로 Python의 경우 PEP8 스타일 사용 정적 검사기 (검사만 해주고 코드를 수정하진 않음): pylint, flake8 Python 코드 포맷터 사용 (코드 자체를 수정해줌): YAPF, Black Airflow 전용 코드 스타일 규칙 dag 선언 with DAG(...) as dag: # 콘텍스트 매니저를 사용하는 경우 taskl=PythonOperator(...) task2=Python0perator(...) dag = DAG(...) # 콘텍스트 매니저를 사용하지 않는 경우 taskl = P.. 2024. 2. 26.
[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기 왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가? 컨테이너는 무엇인가? Docker 컨테이너 실행하기 컨테이너와 Airflow Docker에서 Airflow 태스크 실행하기 K8s에서 태스크 실행 KubernetesPodOperator 사용하기 번외. Airflow와 K8s 왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가? 동일한 환경에서 배포하고 싶을 때! 종속성을 요구하는 경우! 잠재적인 충돌을 방지하고 싶을 때! 단일 환경에서 여러 개의 DAG를 실행한다고 했을 때, 다른 버전의 library를 사용한다면 충돌이 발생할 수 있다. 물론 해당 서버에서 관리하는 DAG들의 개수가 작거나 simple한 경우라면 이런 고민을 할 필요가 없음. 컨테이너는 무엇인가? 컨테이.. 2024. 2. 23.
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 1. 테스트 시작하기 2. 개발을 위해 테스트 사용하기 테스트 시작하기 Airflow에서 테스트는 작성한 작업이 예상대로 동작하는지 확인하기 위한 중요한 부분이다. 테스트는 다양한 단계에서 적용할 수 있다. Airflow 테스트의 두 가지 유형 : 단위 테스트(Unit Test), 통합 테스트(Integration Test) 개별 작업 단위로 테스트를 수행 → 단위 테스트 여러 작업이 함께 동작할 때 예상대로 동작하는지, 즉 전체 워크플로우 흐름 테스트를 수행 → 통합 테스트 파이썬 테스팅 프레임워크인 unittest와 pytest pip install pytest pytest 명령어는 이름이 test_로 시작하는 모든 파일을 로드하고, test_로 시작되는 모든 기능을 실행한다. 모든 DAG에 대한 무.. 2024. 2. 22.
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 커스텀 컴포넌트 빌드 커스텀 훅 빌드하기 커스텀 오퍼레이터 빌드하기 커스텀 센서 빌드하기 컴포넌트 패키징하기 커스텀 컴포넌트 빌드 Airflow의 강점 중 하나는 여러 유형의 시스템들 간의 작업을 조율할 때 쉽게 확장할 수 있다. (SageMaker 기반 머신러닝 모델 학습, ECSOpserator 사용해서 ECS기반 작업가능) 그러나, Airflow가 지원하지 않는 시스템에서 태스크 실행해야 할 수 있다. 또한 태스크 중 PythonOperator를 사용하여 구현할 수 있지만 수많은 단순 반복적인 코드가 필요하여, 여러 DAG에서 사용하기 힘든경우가 발생한다. 이 때문에 Airflow 에서 특정 시스템에서 작업이 실행되어야 할 때, 이를 위한 오퍼레이터를 개발하여 빌드해 왔었다. Airflow에서 커스.. 2024. 2. 21.
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 센서를 사용한 폴링조건 다른 DAG를 트리거하기 센서를 사용한 폴링조건 v2.7.1 버전 기준 9개의 센서 가 존재하고, 버전별로 센서가 정리되고 있다. 센서패키지에서 통합관리 되던게, provider로 흩어진걸로 보여짐 ex) airflow.sensor.sqlsensor (v2.4.0) → airflow.providers.common.sql.sensors ex) airflow.sensor.hdfs_sensor → airflow.providers.apache.hdfs.sensors v.2.0.0 → 24개 ( Docs ) v 2.7.1 → 9개 (Docs) 사용자 지정 조건 폴링 pythonSensor를 사용하여 다양한 방법으로 센싱처리가 가능함 (python 로직이 추가되어 개발해서 센싱한다는 느낌) .. 2024. 2. 20.
반응형