- 깔끔한 DAG 작성
- 재현 가능한 태스크 설계
- 효율적인 데이터 처리
- 자원관리
깔끔한 DAG 작성
스타일 가이드 사용
- 사람마다 코딩 스타일이 다르기 때문에 코드 스타일을 지정하여 개발하여 가독성을 높임
- 일반적으로 Python의 경우 PEP8 스타일 사용
- 정적 검사기 (검사만 해주고 코드를 수정하진 않음): pylint, flake8
- Python 코드 포맷터 사용 (코드 자체를 수정해줌): YAPF, Black
- Airflow 전용 코드 스타일 규칙
- dag 선언
with DAG(...) as dag: # 콘텍스트 매니저를 사용하는 경우
taskl=PythonOperator(...)
task2=Python0perator(...)
dag = DAG(...) # 콘텍스트 매니저를 사용하지 않는 경우
taskl = PythonOperator(..., dag=dag)
task2 = Python0perator(..., dag=dag)
- 태스크 종속성
# 다양한 스타일
taskl >> task2
taskl << task2
[taskl] >> task2
taskl.set_downstream(task2)
task2.set_upstream(taskl)
# 통일된 스타일
taskl >> task2 >> task3 >> [task4, task5]
중앙에서 자격 증명 관리 (connection)
- Airflow connection 을 사용하여 계정정보 관리 ( Link )
- 보안 이슈가 발생할 수 있으므로 주의하여 사용
구성 세부 정보를 일관성있게 지정하기 (File, Variables)
- 자주 사용하는 구성정보(config 정보 등)를 파일로 별도 관리 (yaml, Ini, json)
- Variables을 사용한 전역변수 사용
DAG 구성 시 연산 부분 배제
- Dag 파일은 주기적으로 로드/문법검사를 수행하기 때문에 연산부분이 Dag 안에 있을경우 불필요한 리소스를 사용할 수 있음
태스크 내에서 계산 수행
def _my_not_so_efficient_task(value, ...) :
•••
PythonOperator (
task_id="my_not_so_efficient_task",
•••
op_kwargs={
"value": calc_expensive_value)
}
)
def _my_more_efficient_task(...) :
value=calc_expensive_value ()
...
PythonOperator (
task_id="my_more_efficient_task",
python_callable=_my_more_efficient_task,
...
)
factory 함수를 사용한 공통 패턴 생성
- 특정 변수 등의 차이만 있을뿐 공통패턴을 가진 DAG를 생성할경우 factory 함수를 사용함
- 사전준비 : factory 함수 생성
factory 함수를 사용하여 태스크 세트 생성(dags/01_task_factory.py)
def generate_tasks (dataset_name, raw_dir, processed_dir, preprocess_script,output_dir, dag): # factory 함수에 의해 생성될 태스크를 구성하는 매개변수
raw_path=os.path.join(raw_dir, dataset_name, "{ds_nodash} json")
processed_path=os.path.join(processed_dir, dataset_name, "{ds_nodash} json")
output_path=os.path. join(output_dir, dataset_name, "{ds_nodash}. json")
#개별 태스크 생성
fetch_task=BashOperator(
task_id=f" fetch_{dataset_name}",
bash_command=(
f"echo curl http://example.com/{dataset_name}.json"
f"> {raw_path} .json'"
)
dag=dag,
)
preprocess_task=BashOperator (
task_id=f"preprocess_{dataset_name}",
bash_command=f"echo '{preprocess_script} {raw_path} {processed_path}'",
dag=dag,
)
export_task=BashOperator (
task_id=f"export_{dataset_name}",
bash_command=f"echo 'cp {processed_path} {output_path} '",
dag=dag,
)
fetch_task >> preprocess_task >> export_task # 태스크 종속성 정의
return fetch_task, export_task # 더 큰 그래프와 다른 태스크에 연결할 수 있도록 체인의 첫 번째 태스크와 마지막 태스크를 반환(필요 시)
태스크에 factory 함수 적용(dags/01_task_factory.py)
import airflow.utils.dates
from airflow import DAG
with DAG(
dag_id="01_task_factory",
start_date=airflow.utils.dates.days_ago(5),
schedule_interval="@daily",
) as dag:
for dataset in ["sales", "customers"]:
generate_tasks ( #다른 구성 값으로 태스크 집합을 생성
dataset_name=dataset,
raw_dir=" /data/raw",
processed_dir=" /data/processed",
output_dir=" /data/output",
preprocess_script=f"preprocess_{dataset} py",
dag=dag, #DAG 인스턴스를 전달하여 태스크를 DAG에 연결
)
태스크 그룹을 사용하여 관련된 태스크들의 그룹 만들기
- 여러 DAG들을 동시에 사용할경우 task UI가 복잡해 질수 있는데, 이럴때 Task group을 사용하여 묶어줄 수 있음
- 3개의 task를 하나로 묶었을경우 화면 예시
대규모 수정을 위한 새로운 DAG 생성
- 한번 작성된 Dag는 DB에 값이 저장되어 Dag를 수정하여도 올바르게 작동하지 않을 수 있음
- 이런경우 DAG 파일을 복사하여 새로운 이름으로 생성하면 그런 문제를 피할 수 있음
- 그냥 UI에서 Dag 삭제 하는게 마음 편하다.
재현 가능한 태스크 설계
1. 태스크는 항상 멱등성을 가져야 한다. (멱등성/idempotent)
- 멱등법칙(冪等法則) 또는 멱등성(冪等性, 영어: idempotent) 수학이나 전산학에서 연산의 한 성질을 나타내는 것으로, 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질을 의미한다.
- 실패한 작업, 과거의 작업을 수행 등 언제 작업을 수행하더라도 같은 결과를 도출해야함.
2. 태스크 결과는 결정적이어야 한다. (결정적/deterministic)
- 태스크는 주어진 입력에 대해 항상 동일한 출력을 반환해야함.
3. 함수형 패러다임을 사용하여 태스크를 설계한다.
효율적인 데이터 처리
데이터의 처리량 제한하기
- 불필요한 join하지않기..
증분 적재 및 처리
- 시계열 데이터의 경우 partition단위로 작업을 수행
→ 전체 데이터를 사용하지 않고 필요한 부분만 사용함
중간 단계 데이터 캐싱
- 여러 Task가 있는 DAG의 경우 데이터 유실을 방지하기 위하여 중간중간 저장하는것이 좋음
로컬 파일 시스템에 데이터 저장 방지
- K8s Executer 사용시 worker가 매번 다른 곳에서 동작하기 때문에 로컬에 파일을 저장하고 사용하는것은 위험할 수 있음
- s3, nfs 등 공유 저장소를 사용하는게 안전함
외부/소스 시스템으로 작업을 이전하기
- 큰 리소스를 사용하는 작업의 경우 airflow worker가 아닌 외부 클러스터(ex: spark cluster)에서 작업하도록 하여 성능 향상
자원관리
Pool을 이용한 동시성 관리하기
- pool 을 사용하여 task 개수 제한
- 코드예시
PythonOperator(
task_id="my_task",
pool="my_resource_pool"
)
SLA 및 경고를 사용하여 장기 실행 작업 탐지 (Link)
- SLA(service-level agreement(서비스 수준 계약))
- task 수행 시간에 대한 모니터링이 필요할 경우
→ SLA를 설정함
→ 설정된 SLA값보다 dag가 오래 동작할경우 경고 표시 - SLA Misses 에서 보여진다고 함
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인]운영환경에서 Airflow 관리 (0) | 2024.02.27 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기 (0) | 2024.02.23 |
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 (0) | 2024.02.22 |
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 (0) | 2024.02.21 |
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 (0) | 2024.02.20 |