본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 모범사례

by 개복취 2024. 2. 26.

 

  1. 깔끔한 DAG 작성
  2. 재현 가능한 태스크 설계
  3. 효율적인 데이터 처리
  4. 자원관리

깔끔한 DAG 작성

스타일 가이드 사용

  • 사람마다 코딩 스타일이 다르기 때문에 코드 스타일을 지정하여 개발하여 가독성을 높임
    • 일반적으로 Python의 경우 PEP8 스타일 사용
    • 정적 검사기 (검사만 해주고 코드를 수정하진 않음): pylint, flake8
    • Python 코드 포맷터 사용 (코드 자체를 수정해줌): YAPF, Black
  • Airflow 전용 코드 스타일 규칙
    • dag 선언
with DAG(...) as dag: # 콘텍스트 매니저를 사용하는 경우 
    taskl=PythonOperator(...) 
    task2=Python0perator(...)

dag = DAG(...) # 콘텍스트 매니저를 사용하지 않는 경우 
    taskl = PythonOperator(..., dag=dag)
    task2 = Python0perator(..., dag=dag)

 

  • 태스크 종속성
# 다양한 스타일
taskl >> task2
taskl << task2
[taskl] >> task2
taskl.set_downstream(task2)
task2.set_upstream(taskl)

# 통일된 스타일 
taskl >> task2 >> task3 >> [task4, task5]

중앙에서 자격 증명 관리 (connection)

  • Airflow connection 을 사용하여 계정정보 관리 ( Link )
  • 보안 이슈가 발생할 수 있으므로 주의하여 사용

 

구성 세부 정보를 일관성있게 지정하기 (File, Variables)

  • 자주 사용하는 구성정보(config 정보 등)를 파일로 별도 관리 (yaml, Ini, json)
  • Variables을 사용한 전역변수 사용

DAG 구성 시 연산 부분 배제

  • Dag 파일은 주기적으로 로드/문법검사를 수행하기 때문에 연산부분이 Dag 안에 있을경우 불필요한 리소스를 사용할 수 있음

태스크 내에서 계산 수행

def _my_not_so_efficient_task(value, ...) :
    •••
    
PythonOperator (
    task_id="my_not_so_efficient_task",
    •••
    op_kwargs={
        "value": calc_expensive_value)
	}
)

def _my_more_efficient_task(...) :
    value=calc_expensive_value ()
    ...
    
PythonOperator (
    task_id="my_more_efficient_task",
    python_callable=_my_more_efficient_task,
    ...
)

 

factory 함수를 사용한 공통 패턴 생성

  • 특정 변수 등의 차이만 있을뿐 공통패턴을 가진 DAG를 생성할경우 factory 함수를 사용함
    • 사전준비 : factory 함수 생성

factory 함수를 사용하여 태스크 세트 생성(dags/01_task_factory.py)

def generate_tasks (dataset_name, raw_dir, processed_dir, preprocess_script,output_dir, dag): # factory 함수에 의해 생성될 태스크를 구성하는 매개변수
    raw_path=os.path.join(raw_dir, dataset_name, "{ds_nodash} json")
    processed_path=os.path.join(processed_dir, dataset_name, "{ds_nodash} json") 
    output_path=os.path. join(output_dir, dataset_name, "{ds_nodash}. json")
	
    #개별 태스크 생성
    fetch_task=BashOperator( 
        task_id=f" fetch_{dataset_name}",
        bash_command=(
            f"echo curl http://example.com/{dataset_name}.json"
            f"> {raw_path} .json'"
        )
        dag=dag,
    )
    preprocess_task=BashOperator (
        task_id=f"preprocess_{dataset_name}",
        bash_command=f"echo '{preprocess_script} {raw_path} {processed_path}'", 
        dag=dag,
        )
    
    export_task=BashOperator (
        task_id=f"export_{dataset_name}",
        bash_command=f"echo 'cp {processed_path} {output_path} '",
        dag=dag,
    )

fetch_task >> preprocess_task >> export_task    # 태스크 종속성 정의
return fetch_task, export_task                  # 더 큰 그래프와 다른 태스크에 연결할 수 있도록 체인의 첫 번째 태스크와 마지막 태스크를 반환(필요 시)

 

태스크에 factory 함수 적용(dags/01_task_factory.py)

import airflow.utils.dates 
from airflow import DAG

with DAG(
    dag_id="01_task_factory", 
    start_date=airflow.utils.dates.days_ago(5),
    schedule_interval="@daily",
) as dag:
    for dataset in ["sales", "customers"]:
        generate_tasks ( #다른 구성 값으로 태스크 집합을 생성
            dataset_name=dataset, 
            raw_dir=" /data/raw", 
            processed_dir=" /data/processed", 
            output_dir=" /data/output", 
            preprocess_script=f"preprocess_{dataset} py", 
            dag=dag, #DAG 인스턴스를 전달하여 태스크를 DAG에 연결
        )

 

태스크 그룹을 사용하여 관련된 태스크들의 그룹 만들기

  • 여러 DAG들을 동시에 사용할경우 task UI가 복잡해 질수 있는데, 이럴때 Task group을 사용하여 묶어줄 수 있음
  • 3개의 task를 하나로 묶었을경우 화면 예시

대규모 수정을 위한 새로운 DAG 생성

  • 한번 작성된 Dag는 DB에 값이 저장되어 Dag를 수정하여도 올바르게 작동하지 않을 수 있음
    • 이런경우 DAG 파일을 복사하여 새로운 이름으로 생성하면 그런 문제를 피할 수 있음
    • 그냥 UI에서 Dag 삭제 하는게 마음 편하다.

재현 가능한 태스크 설계

1. 태스크는 항상 멱등성을 가져야 한다. (멱등성/idempotent)

  • 멱등법칙(冪等法則) 또는 멱등성(冪等性, 영어: idempotent) 수학이나 전산학에서 연산의 한 성질을 나타내는 것으로, 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질을 의미한다.
  • 실패한 작업, 과거의 작업을 수행 등 언제 작업을 수행하더라도 같은 결과를 도출해야함.

2. 태스크 결과는 결정적이어야 한다. (결정적/deterministic)

  • 태스크는 주어진 입력에 대해 항상 동일한 출력을 반환해야함.

3. 함수형 패러다임을 사용하여 태스크를 설계한다.

 

효율적인 데이터 처리

데이터의 처리량 제한하기

  • 불필요한 join하지않기..

증분 적재 및 처리

  • 시계열 데이터의 경우 partition단위로 작업을 수행
    → 전체 데이터를 사용하지 않고 필요한 부분만 사용함

중간 단계 데이터 캐싱

  • 여러 Task가 있는 DAG의 경우 데이터 유실을 방지하기 위하여 중간중간 저장하는것이 좋음

로컬 파일 시스템에 데이터 저장 방지

  • K8s Executer 사용시 worker가 매번 다른 곳에서 동작하기 때문에 로컬에 파일을 저장하고 사용하는것은 위험할 수 있음
    • s3, nfs 등 공유 저장소를 사용하는게 안전함

외부/소스 시스템으로 작업을 이전하기

  • 큰 리소스를 사용하는 작업의 경우 airflow worker가 아닌 외부 클러스터(ex: spark cluster)에서 작업하도록 하여 성능 향상

 

자원관리

Pool을 이용한 동시성 관리하기

  • pool 을 사용하여 task 개수 제한

  • 코드예시
PythonOperator(
    task_id="my_task",
    pool="my_resource_pool"
)

SLA 및 경고를 사용하여 장기 실행 작업 탐지 (Link)

  • SLA(service-level agreement(서비스 수준 계약))
  • task 수행 시간에 대한 모니터링이 필요할 경우
    → SLA를 설정함
    → 설정된 SLA값보다 dag가 오래 동작할경우 경고 표시
  • SLA Misses 에서 보여진다고 함