본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기

by 개복취 2024. 2. 23.

 

 

  1. 왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가?
  2. 컨테이너는 무엇인가?
  3. Docker 컨테이너 실행하기
  4. 컨테이너와 Airflow
  5. Docker에서 Airflow 태스크 실행하기
  6. K8s에서 태스크 실행
  7. KubernetesPodOperator 사용하기
  8. 번외. Airflow와 K8s

 

왜 Airflow Dag를 실행하는데 컨테이너와 k8s가 필요한가?

  • 동일한 환경에서 배포하고 싶을 때!
  • 종속성을 요구하는 경우!
  • 잠재적인 충돌을 방지하고 싶을 때!

단일 환경에서 여러 개의 DAG를 실행한다고 했을 때, 다른 버전의 library를 사용한다면 충돌이 발생할 수 있다.

물론 해당 서버에서 관리하는 DAG들의 개수가 작거나 simple한 경우라면 이런 고민을 할 필요가 없음.

 

컨테이너는 무엇인가?

  • 컨테이너는 애플리케이션에 필요한 종속성을 포함하고 서로 다른 환경에 균일하게, 쉽게 배포할 수 있는 기술이다.
  • CI/CD, Devops가 유행하지 않았던 시절에는 배포 작업이 가장 큰 문제였음...
  • Target OS 버전이 다를 수도 있고, 설치된 종속성 및 라이브러리가 다를 수도 있고, 하드웨어 성능이 차이가 날 수 있다.
    개발 서버운영 서버가 완벽하게 동일한 환경을 구성하고 있지 않기 때문...
  • 그래서 가상머신(VM) 이나 Container를 사용하여 가상화 환경을 구성합니다.

  • 가상머신 : 가상 OS 환경 위에서 종속성 파일이나 라이브러리가 존재하고 애플리케이션이 수행됨.
    → Host OS 위에 각각의 Guest OS를 띄워야하기 때문에 많은 리소스를 필요로함. (무겁다)
  • 컨테이너 : Host OS의 커널에서 애플리케이션을 가상화.
    → VM과 동일한 환경을 제공하지만 Guest OS를 필요로 하지 않기 때문에 더 가벼움.
  • 가장 잘 알려진 컨테이너 엔진은 Docker 이다!

Docker 컨테이너 실행하기

Docker run

$ docker run debian:buster-slim echo Hello, world!

 

Unable to find image 'debian:buster-slim' locally
latest: Pulling from library/debian
...
Digest: sha256:76c15066d7db315b42dc247b6d439779d2c6466f
➥ 7dc2a47c2728220e288fc680
Status: Downloaded newer image for debian:buster-slim
Hello, world!

 

Docker Architecture

  1. Docker Client는 Docker Daemon에 접속.
  2. Docker Daemon은 Docker Hub에서 Docker image를 가져옴.
  3. Docker Daemon은 해당 이미지를 사용하여 새 컨테이너를 생성.
  4. 컨테이너는 내부에서 명령어를 실행. (위 예시에서는 echo Hello, world!)
  5. Docker Daemon은 출력을 Docker Client로 전송.
  • Docker Image 만들고 빌드하기
    → 기존에 만들어진 Docker image를 실행하는것은 쉽지만, 실제로는 직접 이미지를 만들어야할 경우가 많음.

 

python .Dockerfile

FROM python:3.8-slim B
COPY requirements.txt /tmp/requirements.txt 
RUN pip install -r /tmp/requirements.txt
COPY scripts/fetch_weather.py /usr/local/bin/fetch-weather 
RUN chmod +x /usr/local/bin/fetch-weather
ENTRYPOINT [ "/usr/local/bin/fetch-weather" ] 
CMD [ "--help" ]

 

Docker build

  • tag : Docker Image에 할당된 이름 지정
$ docker build --tag manning-airflow/wttr-example .
Sending build context to Docker daemon 5.12kB
Step 1/7 : FROM python:3.8-slim
 ---> 9935a3c58eae
Step 2/7 : COPY requirements.txt /tmp/requirements.txt
 ---> 598f16e2f9f6
Step 3/7 : RUN pip install -r /tmp/requirements.txt
 ---> Running in c86b8e396c98
Collecting click
...
Removing intermediate container c86b8e396c98
 ---> 102aae5e3412
Step 4/7 : COPY scripts/fetch_weather.py /usr/local/bin/fetch-weather
 ---> 7380766da370
Step 5/7 : RUN chmod +x /usr/local/bin/fetch-weather
 ---> Running in 7d5bf4d184b5
Removing intermediate container 7d5bf4d184b5
 ---> cae6f678e8f8
Step 6/7 : ENTRYPOINT [ "/usr/local/bin/fetch-weather" ]
 ---> Running in 785fe602e3fa
Removing intermediate container 785fe602e3fa
 ---> 3a0b247507af
Step 7/7 : CMD [ "--help" ]
 ---> Running in bad0ef960f30
Removing intermediate container bad0ef960f30
 ---> ffabdb642077
Successfully built ffabdb642077
Successfully tagged wttr-example:latest
  • Docker Container 실행

 

Docker run

$ docker run manning-airflow/wttr-example:latest

 

Usage: fetch-weather [OPTIONS] CITY
 CLI application for fetching weather forecasts from wttr.in.
Options:
 --output_path FILE Optional file to write output to.
 --help Show this message and exit.
  • Docker 볼륨 사용

볼륨 마운트하기

$ docker run --volume `pwd`/data:/data wttr-example ...
  • Docker 컨테이너의 /data 하위 경로를 local /data 디렉토리와 마운트할 수 있어서, Docker 컨테이너에서 파일을 생성했다면 로컬에서도 생성된 파일을 사용할 수 있음.

컨테이너와 Airflow

  • Airflow 태스크를 컨테이너로 실행할 수 있습니다.
    DockerOperatorKubernetesPodOperator 를 사용.

 

  • 종속성 관리를 쉽게 할 수 있음. 서로 다른 이미지를 생성하면 각 태스크에 필요한 종속성을 해당 이미지에만 설치할 수 있음.
  • 태스크가 더 이상 Worker 환경에서 실행될 필요가 없기 때문에 Airflow Worker 환경에 태스크에 대한 종속성을 설치할 필요가 없음.

 

Docker에서 Airflow 태스크 실행하기

 

DockerOperator

rank_movies = DockerOperator(
     task_id="rank_movies",
     image="manning-airflow/movielens-ranking", 
 command=[ 
	 "rank_movies.py",
	 "--input_path",
	 "/data/ratings/{{ds}}.json",
	 "--output_path",
	 "/data/rankings/{{ds}}.csv",
	 ],
 volumes=["/tmp/airflow/data:/data"], 
 )

 

 

DockerOperator 동작원리

  1. Airflow는 Worker에게 태스크를 스케줄하여 작업 지시.
  2. DockerOperator는 Docker run 명령 실행.
  3. Docker Daemon이 image registry에서 필요한 Docker image를 가져옴.
  4. Docker 컨테이너 실행.
  5. 완료되면, 컨테이너는 종료되고 DockerOperatorAirflow Worker의 결과를 반환.

 

Docker에서 Airflow 태스크 실행하기

  • 태스크에서 수행되는 python 스크립트 작성

평점 스크립트

...
from pathlib import Path
@click.command()
@click.option(...) 
...
def main(start_date, end_date, output_path,
	 host, user, password, batch_size):
	 """CLI script for fetching ratings from the movielens API."""
	 session = requests.Session() 
	 session.auth = (user, password)
	 logging.info("Fetching ratings from %s (user: %s)", host, user) 
	 ratings = list(
             _get_ratings(
             session=session,
             host=host,
             start_date=start_date,
             end_date=end_date,
             batch_size=batch_size,
          )
	 )
	 logging.info("Retrieved %d ratings!", len(ratings))
	 output_path = Path(output_path)
	 output_dir = output_path.parent 
	 output_dir.mkdir(parents=True, exist_ok=True)
	 logging.info("Writing to %s", output_path)
	 with output_path.open("w") as file_: 
		 json.dump(ratings, file_)

 

평점 스크립트 - Docker 파일 생성

FROM python:3.8-slim
RUN pip install click==7.1.1 requests==2.23.0 
COPY scripts/fetch_ratings.py /usr/bin/local/fetch-ratings 
RUN chmod +x /usr/bin/local/fetch-ratings
ENV PATH="/usr/local/bin:${PATH}"

 

DAG 구성

import datetime as dt
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
with DAG(
 dag_id="01_docker",
 description="Fetches ratings from the Movielens API using Docker.",
 start_date=dt.datetime(2019, 1, 1),
 end_date=dt.datetime(2019, 1, 3),
 schedule_interval="@daily",
	) as dag:
	 Fetch_ratings = DockerOperator(
	 task_id="fetch_ratings",
	 image="manning-airflow/movielens-fetch", 
	 command=[
		 "fetch-ratings", 
		 "--start_date",
		 "{{ds}}",
		 "--end_date",
		"{{next_ds}}",
		 "--output_path",
		 "/data/ratings/{{ds}}.json",
		 "--user",
		 os.environ["MOVIELENS_USER"], 
		 "--password",
		 os.environ["MOVIELENS_PASSWORD"],
		 "--host",
		 os.environ["MOVIELENS_HOST"],
		 ],
	 volumes=["/tmp/airflow/data:/data"], 
	 network_mode="airflow", 
	)

rank_movies = DockerOperator(
 task_id="rank_movies",
 image="manning-airflow/movielens-ranking", 
 command=[
     "rank-movies", 
     "--input_path",
     "/data/ratings/{{ds}}.json",
     "--output_path",
     "/data/rankings/{{ds}}.csv",
 ],
volumes=["/tmp/airflow/data:/data"],
 )
 fetch_ratings >> rank_movies

 

Docker 기반의 WorkFlow

  1. Docker file 생성 및 Docker image 작성.
  2. Docker Daemon은 image를 구축
  3. Docker Daemon은 image registry에 image등록
  4. DAG 작성
  5. Airflow는 DockerOperator 태스크를 실행 및 스케쥴
  6. Airflow worker는 image를 가져옴
  7. Airflow worker는 Docker Daemon을 사용하여 해당 image 컨테이너 실행

 

K8s에서 태스크 실행

  • 여러 시스템에서 태스크를 조정하고 분산하고자하는 니즈가 발생하여 오케스트레이션 시스템 사용(k8s)
  • k8s란?
    • 컨테이너화된 애플리케이션의 배포, 확장, 관리에 초점을 맞춘 오픈 소스 컨테이너 오케스트레이션 플랫폼
    • 컨테이너를 여러 작업 노드에 배치를 관리하여 확장할 수 있도록 지원하는 동시에 스케줄링 시 필요한 리소스(CPU or Memory), 스토리지 및 특수한 하드웨어(GPU) 등을 고려한다는 것이 Docker와의 차이점 이다.

 

k8s 구성요소

  • 크게 Masterworker node로 구성됨.
  • Master : API 서버, 스케줄러 및 배포, 스토리지 등을 관리하는 기타 서비스를 포함하여 다양한 컴포넌트를 실행 k8s 클러스터에서 컨테이너화된 애플리케이션을 관리함.
  • API 서버 : kubectl(k8s의 기본 CLI 인터페이스) 또는 k8s 파이썬 SDK와 같은 클라이언트 에서 k8s를 쿼리하고 명령을 실행하여 컨테이너를 배포하는 데 사용.
  • k8s worker node : 스케줄러가 할당한 컨테이너 애플리케이션을 실행하는 역할.
  • Pod : k8s에서 이러한 애플리케이션을 파드라고 함. 쿠버네티스에서 가장 작은 단위임. Airflow에서 태스크는 단일 파드 내부의 컨테이너로 실행됨.

k8s 설정하기 - 스토리지 YAML

apiVersion: v1
kind: PersistentVolume 
metadata:
 name: data-volume 
 Labels:
 type: local
Spec:
 storageClassName: manual
 capacity:
 storage: 1Gi 
 accessModes:
 - ReadWriteOnce 
 hostPath:
 path: "/tmp/data" 
---
apiVersion: v1
kind: PersistentVolumeClaim 
metadata:
 name: data-volume 
spec:
 storageClassName: manual
 accessModes:
 - ReadWriteOnce
 resources:
  requests:
   storage: 1Gi
  • PersistentVolume(PV)는 관리자에 의해 생성된 볼륨을 뜻하고, PersistentVolumeClaim(PVC)은 사용자가 볼륨을 사용하기 위해 PV에 요청을 하게 될 때 사용됨.

k8s 설정하기 - 스토리지 리소스 배포하기

$ kubectl --namespace airflow apply -f resources/data-volume.yml
persistentvolumeclaim/data-volume created
persistentvolume/data-volume created

 

k8s 설정하기 - API에 대한 YAML

apiVersion: apps/v1
kind: Deployment 
metadata:
 name: movielens-deployment 
 labels:
	 app: movielens 
spec:
	 replicas: 1
selector:
 matchLabels:
	 app: movielens
template:
	 metadata:
		 Labels:
			 app: movielens
	spec:
	 containers: 
	 - name: movielens
	 image: manning-airflow/movielens-api 
	 ports:
	 - containerPort: 5000
	 env:
	 - name: API_USER
	  value: airflow
	 - name: API_PASSWORD
	  value: airflow
---
apiVersion: v1 
kind: Service
metadata:
 name: movielens
spec:
 selector: 
  app: movielens
 ports:
 - protocol: TCP 
	 port: 80
	 targetPort: 5000

 

k8s 설정하기 - API 배포

$ kubectl --namespace airflow apply -f resources/api.yml
deployment.apps/movielens-deployment created
service/movielens created

 

KubernetesPodOperator 사용하기

 

KubernetesPodOperator

fetch_ratings = KubernetesPodOperator(
	 task_id="fetch_ratings",
	 image="manning-airflow/movielens-fetch", 
	 cmds=["fetch-ratings"], 
	 arguments=[ 
		 "--start_date",
		 "{{ds}}",
		 "--end_date",
		 "{{next_ds}}",
		 "--output_path",
		 "/data/ratings/{{ds}}.json",
		 "--user",
		 os.environ["MOVIELENS_USER"],
		 "--password",
		 os.environ["MOVIELENS_PASSWORD"],
		 "--host",
		 os.environ["MOVIELENS_HOST"],
	 ],
 namespace="airflow", 
 name="fetch-ratings", 
 cluster_context="docker-desktop", 
 in_cluster=False, 
 volumes=[volume], 
 volume_mounts=[volume_mount], 
 image_pull_policy="Never", 
 is_delete_operator_pod=True, 1)
 )

 

볼륨 마운트

from kubernetes.client import models as k8s
...
volume_claim = k8s.V1PersistentVolumeClaimVolumeSource( 
 claim_name="data-volume"
)
volume = k8s.V1Volume( 
 name="data-volume",
 persistent_volume_claim=volume_claim
)
volume_mount = k8s.V1VolumeMount(
 name="data-volume",
 mount_path="/data", 
 sub_path=None,
 read_only=False, 
)

 

전체 DAG 구현

import datetime as dt
import os
from kubernetes.client import models as k8s
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
 KubernetesPodOperator,
)
with DAG(
 dag_id="02_kubernetes",
 description="Fetches ratings from the Movielens API using kubernetes.",
 start_date=dt.datetime(2019, 1, 1),
 end_date=dt.datetime(2019, 1, 3),
 schedule_interval="@daily",
) as dag:
	 volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(...)
	 volume = k8s.V1Volume(...)
	 volume_mount = k8s.V1VolumeMount(...)
	 fetch_ratings = KubernetesPodOperator(...)
	 rank_movies = KubernetesPodOperator(...)
	 fetch_ratings >> rank_movies

 

K8s workflow

  • Docker기반 workflow와 차이점
    1. 태스크 컨테이너가 더 이상 Airflow 워커 노드에서 실행되지 않고 k8s 클러스터 내에 별도의 노드에서 실행됨.
      → 워커에 사용되는 모든 리소스는 최소화되며, k8s의 기능을 사용하여 적절한 리소스(예: CPU, 메모리, GPU)가 있는 노드에 태스크가 배포되었는지 확인할 수 있습니다.
    2. 어떤 스토리지도 더 이상 Airflow 워커가 접근하지 않지만, k8s 파드에서는 사용할 수 있어야함. 일반적으로 이는 쿠버네티스를 통해 제공되는 스토리지를 사용하는 것을 의미함.
      → 파드에서 스토리지에 대한 적절한 액세스 권한이 있다면, 다른 유형의 네트워크/클라우드 스토리지를 사용할 수도 있습니다.
  • 총평: 전반적으로 k8s는 도커에 비해 확장성, 유연성(예: 다양한 워크로드에 대해 서로 다른 리소스/노드 제공) 및 스토리지, 보안 등과 같은 기타 리소스 관리 관점에서 상당한 장점을 가지고있습니다. 또한 Airflow 전체를 k8s에서 실행할 수 있습니다. 즉, Airflow 전체를 확장 가능한 컨테이너 기반 인프라에서 구동 설정이 가능합니다.

 

번외. Airflow와 K8s

 

 

Airflow on k8s - Kubernetes에 Airflow 환경을 구성하는 예시

  •  K8s cluster에 Airflow를 구성하는 컴포넌트들이 pod 형태로 존재합니다.
  • 장비나 프로세스 형태로 존재하던 컴포넌트들이 POD 형태로 바뀌었다고 보면 됩니다.

  • 각 서비스 사정에 맞게 Local Executor의 POD를 하나로 구성한다든지 Redis를 단일 POD로 구성한다든지와 같이 자유롭게 변경할 수 있습니다. (Kubernetes를 활용했을 때의 장점)

사례 : Managed Airflow

→ 규모가 큰 조직에서 중앙 제어. Airflow 서비스를 제공.

  • 장점 : 구성이 간단하고 템플릿화하기 쉽다. 다양한 조직에 서비스를 제공해야 할 때 유용.
  • 단점 : 구성 자체가 POD로만 변환된 부분이기 때문에, Celery Executor 기준으로 마스터와 메시지 브로커, 워커 등이 Kubernetes 환경에 지속적으로 상주한다는 점입니다. 따라서 자원을 계속 점유하며 관리 포인트 또한 그대로 유지됩니다.
    • 물론 기존과 달리 Kubernetes에서 오케스트레이션(orchestration)해주기 때문에 유지 보수 및 관리하기가 조금 더 수월함.

 

사례 : 무거워진 Docker 컨테이너

  • 요구사항이 점점 늘어나 라이브러리가 늘어나면, Docker 이미지도 매우 커질뿐더러 유지 보수 및 관리도 정말 힘들어짐

 

  • KubernetesExecutor & KubernetesPodOperator
    • Kubernetes Executor는 일반적인 Kubernetes 환경에서의 Airflow와 다르게 필요할 때만 Kubernetes 자원을 사용합니다.
    • 또한, KubernetesPodOperator를 사용하면 개발자가 필요한 Docker 컨테이너만을 골라 POD로 실행시킬 수 있습니다.

K8sExecutor 사용 workflow

  • 수행해야 할 시점이 된 태스크를 스케줄러가 찾고, Executor가 동적으로 Airflow 워커를 POD 형태로 실행합니다. 해당 워커 POD에서 실제 개발자가 정의한 태스크를 수행합니다.

KubenretesPodOperator 사용 workflow

  • 수행해야 할 시점이 된 태스크를 스케줄러가 찾고, Executor는 동적으로 Airflow 워커를 POD 형태로 실행합니다. 다만 해당 워커 POD에서 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행한다는 차이점이 있습니다.

 


요약...

  • 장점 :
    1. 가볍다 : Airflow는 Lib.의존성이 없는 기본 이미지.
    2. 유지보수 비용 절감 : Docker를 통한 Task간 독립성 보장. 운영중인 여러 Airflow 통합 가능
    3. 효율적 자원관리 : 기존 방법의 경우 webserver, scheduler, broker, worker 상주 동적으로 task 생성 후 사용한 자원 반납
    4. 개발 효율성 : Dag Script의 Template화 가능
  • 단점 :
    1. 난이도가 높다.
    2. 구성이 어렵다.
    3. 클러스터 운영이 어렵다.

 

출처 및 참고할만한 사이트:

  1. Apache Airflow 기반의 데이터 파이프라인
  2. Deview 2020, Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor)
  3. https://subicura.com/k8s/
  4. https://humbledude.github.io/blog/2019/07/12/airflow-on-k8s/