- 왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가?
- 컨테이너는 무엇인가?
- Docker 컨테이너 실행하기
- 컨테이너와 Airflow
- Docker에서 Airflow 태스크 실행하기
- K8s에서 태스크 실행
- KubernetesPodOperator 사용하기
- 번외. Airflow와 K8s
왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가?
- 동일한 환경에서 배포하고 싶을 때!
- 종속성을 요구하는 경우!
- 잠재적인 충돌을 방지하고 싶을 때!
단일 환경에서 여러 개의 DAG를 실행한다고 했을 때, 다른 버전의 library를 사용한다면 충돌이 발생할 수 있다.
물론 해당 서버에서 관리하는 DAG들의 개수가 작거나 simple한 경우라면 이런 고민을 할 필요가 없음.
컨테이너는 무엇인가?
- 컨테이너는 애플리케이션에 필요한 종속성을 포함하고 서로 다른 환경에 균일하게, 쉽게 배포할 수 있는 기술이다.
- CI/CD, Devops가 유행하지 않았던 시절에는 배포 작업이 가장 큰 문제였음...
- Target OS 버전이 다를 수도 있고, 설치된 종속성 및 라이브러리가 다를 수도 있고, 하드웨어 성능이 차이가 날 수 있다.
→ 개발 서버와 운영 서버가 완벽하게 동일한 환경을 구성하고 있지 않기 때문... - 그래서 가상머신(VM) 이나 Container를 사용하여 가상화 환경을 구성합니다.
- 가상머신 : 가상 OS 환경 위에서 종속성 파일이나 라이브러리가 존재하고 애플리케이션이 수행됨.
→ Host OS 위에 각각의 Guest OS를 띄워야하기 때문에 많은 리소스를 필요로함. (무겁다) - 컨테이너 : Host OS의 커널에서 애플리케이션을 가상화.
→ VM과 동일한 환경을 제공하지만 Guest OS를 필요로 하지 않기 때문에 더 가벼움.
- 가장 잘 알려진 컨테이너 엔진은 Docker 이다!
Docker 컨테이너 실행하기
Docker run
$ docker run debian:buster-slim echo Hello, world!
Unable to find image 'debian:buster-slim' locally
latest: Pulling from library/debian
...
Digest: sha256:76c15066d7db315b42dc247b6d439779d2c6466f
➥ 7dc2a47c2728220e288fc680
Status: Downloaded newer image for debian:buster-slim
Hello, world!
Docker Architecture
- Docker Client는 Docker Daemon에 접속.
- Docker Daemon은 Docker Hub에서 Docker image를 가져옴.
- Docker Daemon은 해당 이미지를 사용하여 새 컨테이너를 생성.
- 컨테이너는 내부에서 명령어를 실행. (위 예시에서는 echo Hello, world!)
- Docker Daemon은 출력을 Docker Client로 전송.
- Docker Image 만들고 빌드하기
→ 기존에 만들어진 Docker image를 실행하는것은 쉽지만, 실제로는 직접 이미지를 만들어야할 경우가 많음.
python .Dockerfile
FROM python:3.8-slim B
COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
COPY scripts/fetch_weather.py /usr/local/bin/fetch-weather
RUN chmod +x /usr/local/bin/fetch-weather
ENTRYPOINT [ "/usr/local/bin/fetch-weather" ]
CMD [ "--help" ]
Docker build
- tag : Docker Image에 할당된 이름 지정
$ docker build --tag manning-airflow/wttr-example .
Sending build context to Docker daemon 5.12kB
Step 1/7 : FROM python:3.8-slim
---> 9935a3c58eae
Step 2/7 : COPY requirements.txt /tmp/requirements.txt
---> 598f16e2f9f6
Step 3/7 : RUN pip install -r /tmp/requirements.txt
---> Running in c86b8e396c98
Collecting click
...
Removing intermediate container c86b8e396c98
---> 102aae5e3412
Step 4/7 : COPY scripts/fetch_weather.py /usr/local/bin/fetch-weather
---> 7380766da370
Step 5/7 : RUN chmod +x /usr/local/bin/fetch-weather
---> Running in 7d5bf4d184b5
Removing intermediate container 7d5bf4d184b5
---> cae6f678e8f8
Step 6/7 : ENTRYPOINT [ "/usr/local/bin/fetch-weather" ]
---> Running in 785fe602e3fa
Removing intermediate container 785fe602e3fa
---> 3a0b247507af
Step 7/7 : CMD [ "--help" ]
---> Running in bad0ef960f30
Removing intermediate container bad0ef960f30
---> ffabdb642077
Successfully built ffabdb642077
Successfully tagged wttr-example:latest
- Docker Container 실행
Docker run
$ docker run manning-airflow/wttr-example:latest
Usage: fetch-weather [OPTIONS] CITY
CLI application for fetching weather forecasts from wttr.in.
Options:
--output_path FILE Optional file to write output to.
--help Show this message and exit.
- Docker 볼륨 사용
볼륨 마운트하기
$ docker run --volume `pwd`/data:/data wttr-example ...
- Docker 컨테이너의 /data 하위 경로를 local /data 디렉토리와 마운트할 수 있어서, Docker 컨테이너에서 파일을 생성했다면 로컬에서도 생성된 파일을 사용할 수 있음.
컨테이너와 Airflow
- Airflow 태스크를 컨테이너로 실행할 수 있습니다.
→ DockerOperator 및 KubernetesPodOperator 를 사용.
- 종속성 관리를 쉽게 할 수 있음. 서로 다른 이미지를 생성하면 각 태스크에 필요한 종속성을 해당 이미지에만 설치할 수 있음.
- 태스크가 더 이상 Worker 환경에서 실행될 필요가 없기 때문에 Airflow Worker 환경에 태스크에 대한 종속성을 설치할 필요가 없음.
Docker에서 Airflow 태스크 실행하기
DockerOperator
rank_movies = DockerOperator(
task_id="rank_movies",
image="manning-airflow/movielens-ranking",
command=[
"rank_movies.py",
"--input_path",
"/data/ratings/{{ds}}.json",
"--output_path",
"/data/rankings/{{ds}}.csv",
],
volumes=["/tmp/airflow/data:/data"],
)
DockerOperator 동작원리
- Airflow는 Worker에게 태스크를 스케줄하여 작업 지시.
- DockerOperator는 Docker run 명령 실행.
- Docker Daemon이 image registry에서 필요한 Docker image를 가져옴.
- Docker 컨테이너 실행.
- 완료되면, 컨테이너는 종료되고 DockerOperator는 Airflow Worker의 결과를 반환.
Docker에서 Airflow 태스크 실행하기
- 태스크에서 수행되는 python 스크립트 작성
평점 스크립트
...
from pathlib import Path
@click.command()
@click.option(...)
...
def main(start_date, end_date, output_path,
host, user, password, batch_size):
"""CLI script for fetching ratings from the movielens API."""
session = requests.Session()
session.auth = (user, password)
logging.info("Fetching ratings from %s (user: %s)", host, user)
ratings = list(
_get_ratings(
session=session,
host=host,
start_date=start_date,
end_date=end_date,
batch_size=batch_size,
)
)
logging.info("Retrieved %d ratings!", len(ratings))
output_path = Path(output_path)
output_dir = output_path.parent
output_dir.mkdir(parents=True, exist_ok=True)
logging.info("Writing to %s", output_path)
with output_path.open("w") as file_:
json.dump(ratings, file_)
평점 스크립트 - Docker 파일 생성
FROM python:3.8-slim
RUN pip install click==7.1.1 requests==2.23.0
COPY scripts/fetch_ratings.py /usr/bin/local/fetch-ratings
RUN chmod +x /usr/bin/local/fetch-ratings
ENV PATH="/usr/local/bin:${PATH}"
DAG 구성
import datetime as dt
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
with DAG(
dag_id="01_docker",
description="Fetches ratings from the Movielens API using Docker.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
Fetch_ratings = DockerOperator(
task_id="fetch_ratings",
image="manning-airflow/movielens-fetch",
command=[
"fetch-ratings",
"--start_date",
"{{ds}}",
"--end_date",
"{{next_ds}}",
"--output_path",
"/data/ratings/{{ds}}.json",
"--user",
os.environ["MOVIELENS_USER"],
"--password",
os.environ["MOVIELENS_PASSWORD"],
"--host",
os.environ["MOVIELENS_HOST"],
],
volumes=["/tmp/airflow/data:/data"],
network_mode="airflow",
)
rank_movies = DockerOperator(
task_id="rank_movies",
image="manning-airflow/movielens-ranking",
command=[
"rank-movies",
"--input_path",
"/data/ratings/{{ds}}.json",
"--output_path",
"/data/rankings/{{ds}}.csv",
],
volumes=["/tmp/airflow/data:/data"],
)
fetch_ratings >> rank_movies
Docker 기반의 WorkFlow
- Docker file 생성 및 Docker image 작성.
- Docker Daemon은 image를 구축
- Docker Daemon은 image registry에 image등록
- DAG 작성
- Airflow는 DockerOperator 태스크를 실행 및 스케쥴
- Airflow worker는 image를 가져옴
- Airflow worker는 Docker Daemon을 사용하여 해당 image 컨테이너 실행
K8s에서 태스크 실행
- 여러 시스템에서 태스크를 조정하고 분산하고자하는 니즈가 발생하여 오케스트레이션 시스템 사용(k8s)
- k8s란?
- 컨테이너화된 애플리케이션의 배포, 확장, 관리에 초점을 맞춘 오픈 소스 컨테이너 오케스트레이션 플랫폼
- 컨테이너를 여러 작업 노드에 배치를 관리하여 확장할 수 있도록 지원하는 동시에 스케줄링 시 필요한 리소스(CPU or Memory), 스토리지 및 특수한 하드웨어(GPU) 등을 고려한다는 것이 Docker와의 차이점 이다.
k8s 구성요소
- 크게 Master와 worker node로 구성됨.
- Master : API 서버, 스케줄러 및 배포, 스토리지 등을 관리하는 기타 서비스를 포함하여 다양한 컴포넌트를 실행 k8s 클러스터에서 컨테이너화된 애플리케이션을 관리함.
- API 서버 : kubectl(k8s의 기본 CLI 인터페이스) 또는 k8s 파이썬 SDK와 같은 클라이언트 에서 k8s를 쿼리하고 명령을 실행하여 컨테이너를 배포하는 데 사용.
- k8s worker node : 스케줄러가 할당한 컨테이너 애플리케이션을 실행하는 역할.
- Pod : k8s에서 이러한 애플리케이션을 파드라고 함. 쿠버네티스에서 가장 작은 단위임. Airflow에서 태스크는 단일 파드 내부의 컨테이너로 실행됨.
k8s 설정하기 - 스토리지 YAML
apiVersion: v1
kind: PersistentVolume
metadata:
name: data-volume
Labels:
type: local
Spec:
storageClassName: manual
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
hostPath:
path: "/tmp/data"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-volume
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
- PersistentVolume(PV)는 관리자에 의해 생성된 볼륨을 뜻하고, PersistentVolumeClaim(PVC)은 사용자가 볼륨을 사용하기 위해 PV에 요청을 하게 될 때 사용됨.
k8s 설정하기 - 스토리지 리소스 배포하기
$ kubectl --namespace airflow apply -f resources/data-volume.yml
persistentvolumeclaim/data-volume created
persistentvolume/data-volume created
k8s 설정하기 - API에 대한 YAML
apiVersion: apps/v1
kind: Deployment
metadata:
name: movielens-deployment
labels:
app: movielens
spec:
replicas: 1
selector:
matchLabels:
app: movielens
template:
metadata:
Labels:
app: movielens
spec:
containers:
- name: movielens
image: manning-airflow/movielens-api
ports:
- containerPort: 5000
env:
- name: API_USER
value: airflow
- name: API_PASSWORD
value: airflow
---
apiVersion: v1
kind: Service
metadata:
name: movielens
spec:
selector:
app: movielens
ports:
- protocol: TCP
port: 80
targetPort: 5000
k8s 설정하기 - API 배포
$ kubectl --namespace airflow apply -f resources/api.yml
deployment.apps/movielens-deployment created
service/movielens created
KubernetesPodOperator 사용하기
KubernetesPodOperator
fetch_ratings = KubernetesPodOperator(
task_id="fetch_ratings",
image="manning-airflow/movielens-fetch",
cmds=["fetch-ratings"],
arguments=[
"--start_date",
"{{ds}}",
"--end_date",
"{{next_ds}}",
"--output_path",
"/data/ratings/{{ds}}.json",
"--user",
os.environ["MOVIELENS_USER"],
"--password",
os.environ["MOVIELENS_PASSWORD"],
"--host",
os.environ["MOVIELENS_HOST"],
],
namespace="airflow",
name="fetch-ratings",
cluster_context="docker-desktop",
in_cluster=False,
volumes=[volume],
volume_mounts=[volume_mount],
image_pull_policy="Never",
is_delete_operator_pod=True, 1)
)
볼륨 마운트
from kubernetes.client import models as k8s
...
volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="data-volume"
)
volume = k8s.V1Volume(
name="data-volume",
persistent_volume_claim=volume_claim
)
volume_mount = k8s.V1VolumeMount(
name="data-volume",
mount_path="/data",
sub_path=None,
read_only=False,
)
전체 DAG 구현
import datetime as dt
import os
from kubernetes.client import models as k8s
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
with DAG(
dag_id="02_kubernetes",
description="Fetches ratings from the Movielens API using kubernetes.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 3),
schedule_interval="@daily",
) as dag:
volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(...)
volume = k8s.V1Volume(...)
volume_mount = k8s.V1VolumeMount(...)
fetch_ratings = KubernetesPodOperator(...)
rank_movies = KubernetesPodOperator(...)
fetch_ratings >> rank_movies
K8s workflow
- Docker기반 workflow와 차이점
- 태스크 컨테이너가 더 이상 Airflow 워커 노드에서 실행되지 않고 k8s 클러스터 내에 별도의 노드에서 실행됨.
→ 워커에 사용되는 모든 리소스는 최소화되며, k8s의 기능을 사용하여 적절한 리소스(예: CPU, 메모리, GPU)가 있는 노드에 태스크가 배포되었는지 확인할 수 있습니다. - 어떤 스토리지도 더 이상 Airflow 워커가 접근하지 않지만, k8s 파드에서는 사용할 수 있어야함. 일반적으로 이는 쿠버네티스를 통해 제공되는 스토리지를 사용하는 것을 의미함.
→ 파드에서 스토리지에 대한 적절한 액세스 권한이 있다면, 다른 유형의 네트워크/클라우드 스토리지를 사용할 수도 있습니다.
- 태스크 컨테이너가 더 이상 Airflow 워커 노드에서 실행되지 않고 k8s 클러스터 내에 별도의 노드에서 실행됨.
- 총평: 전반적으로 k8s는 도커에 비해 확장성, 유연성(예: 다양한 워크로드에 대해 서로 다른 리소스/노드 제공) 및 스토리지, 보안 등과 같은 기타 리소스 관리 관점에서 상당한 장점을 가지고있습니다. 또한 Airflow 전체를 k8s에서 실행할 수 있습니다. 즉, Airflow 전체를 확장 가능한 컨테이너 기반 인프라에서 구동 설정이 가능합니다.
번외. Airflow와 K8s
Airflow on k8s - Kubernetes에 Airflow 환경을 구성하는 예시
- K8s cluster에 Airflow를 구성하는 컴포넌트들이 pod 형태로 존재합니다.
- 장비나 프로세스 형태로 존재하던 컴포넌트들이 POD 형태로 바뀌었다고 보면 됩니다.
- 각 서비스 사정에 맞게 Local Executor의 POD를 하나로 구성한다든지 Redis를 단일 POD로 구성한다든지와 같이 자유롭게 변경할 수 있습니다. (Kubernetes를 활용했을 때의 장점)
사례 : Managed Airflow
→ 규모가 큰 조직에서 중앙 제어. Airflow 서비스를 제공.
- 장점 : 구성이 간단하고 템플릿화하기 쉽다. 다양한 조직에 서비스를 제공해야 할 때 유용.
- 단점 : 구성 자체가 POD로만 변환된 부분이기 때문에, Celery Executor 기준으로 마스터와 메시지 브로커, 워커 등이 Kubernetes 환경에 지속적으로 상주한다는 점입니다. 따라서 자원을 계속 점유하며 관리 포인트 또한 그대로 유지됩니다.
- 물론 기존과 달리 Kubernetes에서 오케스트레이션(orchestration)해주기 때문에 유지 보수 및 관리하기가 조금 더 수월함.
사례 : 무거워진 Docker 컨테이너
- 요구사항이 점점 늘어나 라이브러리가 늘어나면, Docker 이미지도 매우 커질뿐더러 유지 보수 및 관리도 정말 힘들어짐
- KubernetesExecutor & KubernetesPodOperator
- Kubernetes Executor는 일반적인 Kubernetes 환경에서의 Airflow와 다르게 필요할 때만 Kubernetes 자원을 사용합니다.
- 또한, KubernetesPodOperator를 사용하면 개발자가 필요한 Docker 컨테이너만을 골라 POD로 실행시킬 수 있습니다.
K8sExecutor 사용 workflow
- 수행해야 할 시점이 된 태스크를 스케줄러가 찾고, Executor가 동적으로 Airflow 워커를 POD 형태로 실행합니다. 해당 워커 POD에서 실제 개발자가 정의한 태스크를 수행합니다.
KubenretesPodOperator 사용 workflow
- 수행해야 할 시점이 된 태스크를 스케줄러가 찾고, Executor는 동적으로 Airflow 워커를 POD 형태로 실행합니다. 다만 해당 워커 POD에서 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행한다는 차이점이 있습니다.
요약...
- 장점 :
- 가볍다 : Airflow는 Lib.의존성이 없는 기본 이미지.
- 유지보수 비용 절감 : Docker를 통한 Task간 독립성 보장. 운영중인 여러 Airflow 통합 가능
- 효율적 자원관리 : 기존 방법의 경우 webserver, scheduler, broker, worker 상주 동적으로 task 생성 후 사용한 자원 반납
- 개발 효율성 : Dag Script의 Template화 가능
- 단점 :
- 난이도가 높다.
- 구성이 어렵다.
- 클러스터 운영이 어렵다.
출처 및 참고할만한 사이트:
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인]운영환경에서 Airflow 관리 (0) | 2024.02.27 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 모범사례 (1) | 2024.02.26 |
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 (0) | 2024.02.22 |
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 (0) | 2024.02.21 |
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 (0) | 2024.02.20 |