본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드

by 개복취 2024. 2. 21.

 

  1. 커스텀 컴포넌트 빌드
  2. 커스텀 훅 빌드하기
  3. 커스텀 오퍼레이터 빌드하기
  4. 커스텀 센서 빌드하기
  5. 컴포넌트 패키징하기

 

커스텀 컴포넌트 빌드

  • Airflow의 강점 중 하나는 여러 유형의 시스템들 간의 작업을 조율할 때 쉽게 확장할 수 있다.
    (SageMaker 기반 머신러닝 모델 학습, ECSOpserator 사용해서 ECS기반 작업가능)
  • 그러나, Airflow가 지원하지 않는 시스템에서 태스크 실행해야 할 수 있다. 또한 태스크 중 PythonOperator를 사용하여 구현할 수 있지만 수많은 단순 반복적인 코드가 필요하여, 여러 DAG에서 사용하기 힘든경우가 발생한다.
    • 이 때문에 Airflow 에서 특정 시스템에서 작업이 실행되어야 할 때, 이를 위한 오퍼레이터를 개발하여 빌드해 왔었다.
    • Airflow에서 커스텀 오퍼레이션을 직접 쉽게 구현해 생성할 수 있다. 이를 이용하여 지원되지 않는 시스템에서 작업을 실행할 수 있다. 또한, 여러 DAG에 적용되는 작업의 처리를 쉽게 공용으로 만들어서 단순화 시킬 수 있다.

PythonOperator 으로 작업하기

  • 영화 평점 API를 가지고 오기 위한 MovieLens 데이터 세트를 사용한다.
  • api에 대해 offset=number 으로 레코드를 가지고온다.
  • 특정 기간 동안의 평점 데이턱를 가져오려면 start_date, end_date 파라미터를 사용하여 주어진 시작/종료 날짜 사이의 평점 데이터를 가져올 수 있다.
<http://localhost:5000/ratings?start_date=2019-01-01&end_date=2019-01-02>
  • 이러한 필터링 기능을 사용하면 전체 데이터 세트를 로드할 필요없이 증분 방식으로 로드할 수 있다.

평점가져오기

  1. request 라이브러리의 get 요청으로 가져온다.
  2. 쿼리에 사용되는 파라미터 등 추가 인수 값을 get 메서드에 전달한다. (start_date, end_date) (response 객체의 raise_for_status 메서드를 사용하여 쿼리의 정상 수행 여부를 체크한다.) 요청세션, 인증 정보, 세션 생성 처리 기능을 캡슐화한다.
  3. API 결과의 pagination 처리 기능을 구현한다. 결과를 반환할 때 yield from 을 사용하여 각 평점 레코드들의 제너레이터를 효과적으로 전달함으로써 결과의 페이지 처리에 신경을 많이 쓰지 않게 할 수 있다.

DAG 구축하기

  1. 스케쥴 간격마다 평점 데이터를 가져온다. 평점 데이터를 JSON 출력 파일로 dump할 수 있는데 날짜별로 파티션한다.
    (이렇게 하면 데이터 재수집이 필요한 경우, 필요한 부분만 가져올 수 있어서 용이하다.)
  2. 헬퍼함수 만들어주기

 

커스텀 훅 빌드하기

  • 위와 같이 API연동과 같이 복잡한 작업의 처리 방법 중 하나는, 코드를 캡슐화 하고 재활용 가능한 Airflow 훅으로 만드는 것이다. 이 작업으로 모든 API 전용 코드를 한 곳에 보관하고, DAG의 여러 부분에서 이 훅을 간단하게 사용할 수 있다.
  • 그러면 유사한 용도로 영화 평점 데이터를 가지고 올 때, API 연동에 대한 노력을 줄일 수 있다.
hook = MovielensHook(conn_id="movielens")         #훅 생성
ratings = hook.get_ratings(start_date, end_date)  #훅을 사용하여 특정 작업 수행
hook.close() # 훅 닫고, 새로운 리소스 해제
  • 또한, 훅을 사용하면 Airflow의 DB와 UI를 통해 credential과 연결된 관리 기능을 사용할 수 있다.
  • 이 기능을 적용하면 API 자격 증명 정보를 DAG에 수동으로 넣지 않아도 됨

커스텀 훅 설계하기

  • Airflow에서의 모든 Hook은 추상 클래스인 BaseHook 클래스의 서브클래스로 생성한다.
  • 자격 증명 정보를 보다 안전하게 관리하려면 하드코딩보다 Airflow에서 제공하는 기능을 쓰자
    • Admin > Connection 항목에서 작업을 수행할 수있다.
  • BaseHook 클래스에서 get_connection이라는 메서드를 제공하는데 이 메서드는 메타스토어에서 커넥션 ID에 대한 연결 세부 정보를 가져온다.
from airflow.hooks.base_hook import BaseHook

class MovielensHook(BaseHook):
    DEFAULT_HOST = "movielens"
    DEFAULT_SCHEMA = "http"
    DEFAULT_PORT = 5000
    
    def __init__(self, conn_id): # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달한다.
        super().__init__()       # basehook의 생성자 호출
        self._conn_id = conn_id  # 커넥션 id를 꼭 저장해야한다.

    def get_conn(self):          # 주어진 id를 사용하여 커넥션 설정 정보를 가져옴
        config = self.get_connection(self._conn_id)
        schema = DEFAULT_SCHEMA
        host = DEFAULT_HOST
        port = DEFAULT_PORT
        base_url = f"{schema}://{host}:{port}"
        session = requests.Session()

        # 커넥션 설정 정보의 login/password 정보를 사용하여 요청 세션 생성
        if config.login:
            session.auth = (config.login, config.password)

        #요청 세션과 기본 URL 반환
        return session, base_url
  • 위의 방법은 get_conn 함수를 호출 할 때마다. 자격 증명 정보를 가져온다.
  • 이를 해결하기 위해 인스턴스에 sessionbase_url을 보호(protected) 변수에 캐싱한다.

MovielensHook

class MovielensHook(BaseHook):
    def __init__():
        ...
        self.session = None
        self._base_url = None # 세션과 기본 URL 캐싱을 위한 추가 변수 2개

    def get_conn(self):
        if self._session is None: # 세션을 생성하기 전에 연결된 세션이 있는지 체크한다.
            config = self.get_connection(self._conn_id)
            ...
            self._base_url = f"base_url = f"{schema}://{host}:{port}""
            self._session = requests.Session()
            ...
            return self._session, self._base_url
  • 두번째 호출될때에는 self._session 변수는 더 이상 None이 아니고 캐싱된 세션과 기본 URL이 반환된다.

커스텀 훅으로 DAG 빌드하기

from custom.hooks import MovielensHook

hook = Movielens(conn_id = conn_id)
ratings = hook.get_ratings(
    start_date = start_date,
    end_date = end_date,
    batch_size = batch_size
)
  • 위의 리스트를 수행하면 평점 레코드의 제너레이터를 반환한다. 이를 사용하여 평점 데이터를 출력 파일(JSON)으로 저장한다.
  • DAG에 훅을 사용하기 위해 훅 호출 코드를 PythonOperator에 래핑해야 한다.
  • DAG실행에 필요한 시작/종료 날짜 입력하고, 적절한 출력 파일로 저장하는 코드이다.
import datetime as dt
import logging
import json
import os

from airflow import DAG
from airflow.operators.python import PythonOperator

from custom.hooks import MovielensHook

with DAG(
    dag_id="02_hook",
    description="Fetches ratings from the Movielens API using a custom hook.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:

def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
    logger = logging.getLogger(__name__)

    start_date = templates_dict["start_date"]
    end_date = templates_dict["end_date"]
    output_path = templates_dict["output_path"]

    logger.info(f"Fetching ratings for {start_date} to {end_date}")
    hook = MovielensHook(conn_id=conn_id)
    ratings = list(
        hook.get_ratings(
            start_date=start_date, end_date=end_date, batch_size=batch_size
        )
    )
    logger.info(f"Fetched {len(ratings)} ratings")

    logger.info(f"Writing ratings to {output_path}")

    # Make sure output directory exists.
    output_dir = os.path.dirname(output_path)
    os.makedirs(output_dir, exist_ok=True)

    with open(output_path, "w") as file_:
        json.dump(ratings, fp=file_)

PythonOperator(
    task_id="fetch_ratings",
    python_callable=_fetch_ratings,
    op_kwargs={"conn_id": "movielens"},
    templates_dict={
        "start_date": "{{ds}}",
        "end_date": "{{next_ds}}",
        "output_path": "/data/custom_hook/{{ds}}.json",
    },
)

 

커스텀 오퍼레이터 빌드하기

  • 커스텀 오퍼레이터를 직접 구현하여, 반복적인 태스크 수행 시 코드의 반복을 최소화 할 수 있다.
  • 모든 오퍼레이터는 BaseOperator 클래스의 서브 클래스로 만들어야 한다.
    • BaseOperator은 제네릭 인수를 많이 가지고 있다.
    • 모든 제네릭 인수를 모두 나열하지 않도록 **kwargs를 인수로 사용하도록 하자
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator): # BaseOperator 클래스 상속
    @apply_defaults # 기본 DAG인수를 커스텀 오퍼레이터에게 전달히기 위한 데커레이터 (**필수!**)
    def __init__(self, conn_id, ... , **kwargs): #BaseOperator 생성자에게 추가 키워드 인수 제공
        super.__init__(self, **kwargs)
        self._conn_id = conn_id

    def execute(self,context):
        ...
  • 커스텀 오퍼레이터에만 사용되는 인수들을 명시적으로 지정할 수 있다.
  • 일반적으로 conn_id와 작업에 필요한 세부사항((시작/종료) 날짜, 쿼리…)이 포함된다.
  • apply_defaults 의 데커레이터는 커스텀 오퍼레이터 init 에 적용되고 이는 항상 포함해야한다.
  • execute 메서드는 Airflow가 DAG를 실행할 때 DAG안에서 실행되는 오퍼레이터의 메인 메서드가 된다.
  • CustomOperator의 execute 메서드는 context 하나의 파라미터만 받고, Airflow의 모든 콘텍스트 변수를 담고 있는 dict 객체이다.

MovielensFetchRatingsOperator

class MovielensFetchRatingsOperator(BaseOperator):

    def execute(self, context):
        hook = MovielensHook(self._conn_id)

        try:
            self.log.info(
                f"Fetching ratings for {self._start_date} to {self._end_date}"
            )
            ratings = list(
                hook.get_ratings(
                    start_date=self._start_date,
                    end_date=self._end_date,
                    batch_size=self._batch_size,
                )
            )
            self.log.info(f"Fetched {len(ratings)} ratings")
        finally:
            # Make sure we always close our hook's session.
            hook.close()

        self.log.info(f"Writing ratings to {self._output_path}")

        # Make sure output directory exists.
        output_dir = os.path.dirname(self._output_path)
        os.makedirs(output_dir, exist_ok=True)

        # Write output as JSON.
        with open(self._output_path, "w") as file_:
            json.dump(ratings, fp=file_)
fetch_ratings = MovielensFetchRatingsOperator(
	task_id = "fetch_ratings",
	conn_id = "movielens",
	start_date = "2020-01-01"
	end_date = "2020-01-01"
	output_path = "/data/2020-01-01.json"
)
  • 오퍼레이터가 평점데이터를 가져오는 시작/종료 날짜를 미리 지정해야 하는 단점이 있다.
  • 즉, 실행 날짜에 상관없이 하드코딩된 날짜 기간에 대한 평점 데이터를 가지고 온다.
  • 위와 같은 문제를 해결하기 위해, templates_field 클래스 변수에 해당 변수명을 지정하여 Airflow에 알려준다.

MovielensFetchRatingsOperator

class MovielensFetchRatingsOperator(BaseOperator):
    template_fields = ("_start_date", "_end_date", "_output_path")
    # 커스텀 오퍼레이터에서 템플릿화 할 인스턴스 변수들을 Airflow에게 알려줍니다.

    @apply_defaults
    def __init__(
        self,
        conn_id,
        output_path,
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        batch_size=1000,
        **kwargs,
    ):
        super(MovielensFetchRatingsOperator, self).__init__(**kwargs)

        self._conn_id = conn_id
        self._output_path = output_path
        self._start_date = start_date
        self._end_date = end_date
        self._batch_size = batch_size
from custom.operators import MovielensFetchRatingsOperator

fetch_ratings = MovielensFetchRatingsOperator(
	task_id = "fetch_ratings",
	conn_id = "movielens",
	start_date = "{{ds}}"
	end_date = "{{next_ds}}"
	output_path = "/data/custom_operator/{{ds}}.json"
)
  • 템플릿을 활용하면 이렇게 할 수 있다.

 

커스텀 센서 빌드하기

  • 센서란? : 특별한 유형의 오퍼레이터이며, DAG안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하기 위해 사용되는 것이다.
from airflow.sensors.base import BaseSensorOperator

class MyCustomSensor(BaseSensorOperator):
    ...
    def poke(self, context): #Operator의 execute와 동일한 역할
class MovielensRatingsSensor(BaseSensorOperator):
    template_fields = ("_start_date", "_end_date")
    @apply_defaults 
    def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
        super().__init__(**kwargs)
        self._conn_id = conn_id
        self._start_date = start_date
        self._end_date = end_date
  • 센서가 오퍼레이터의 특정유형이기 때문에, 오퍼레이터를 구현했을 때 사용한 것과 같은 설정을 사용한다.
def poke(self, context):
    hook = MovielensHook(self._conn_id)

    try:
        next(
            hook.get_ratings(
                start_date=self._start_date, end_date=self._end_date, batch_size=1
            )
        )
        self.log.info(
            f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
        )
        return True
    except StopIteration:
        self.log.info(
            f"Didn't find any ratings for {self._start_date} "
            f"to {self._end_date}, waiting..."
        )
        return False
    finally:
        hook.close()
  • 앞서 구현했던 커스텀 훅과 커스텀 센서 클래스를 사용해 짧고 간결하게 작성할 수 있다.
  • 생성자를 만든 다음 poke메서드를 구현하면 된다. 이 메서드 안에서 특정 기간 동안의 평점 데이터가 있는지 체크해야하는 것인데, 시작/종료 날짜 사이에 레코드가 있으면 True를 반환시킨다.
  • 모든 평점 레코드를 가져올 필요 없이, 해당 범위에 적어도 레코드 한 개라도 존재하는지 체크하면 된다.
import datetime as dt

from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator
from custom.sensors import MovielensRatingsSensor

with DAG(
    dag_id="04_sensor",
    description="Fetches ratings from the Movielens API, with a custom sensor.",
    start_date=dt.datetime(2019, 1, 1),
    end_date=dt.datetime(2019, 1, 10),
    schedule_interval="@daily",
) as dag:
    wait_for_ratings = MovielensRatingsSensor(
        task_id="wait_for_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
    )

    fetch_ratings = MovielensFetchRatingsOperator(
        task_id="fetch_ratings",
        conn_id="movielens",
        start_date="{{ds}}",
        end_date="{{next_ds}}",
        output_path="/data/custom_sensor/{{ds}}.json",
    )

    wait_for_ratings >> fetch_ratings

 

컴포넌트 패키징하기

  • 컴포넌트를 배포하는 더 나은 방법은 파이썬패키지에 코드를 넣는것이다.
  • DAG와는 다르게 별도로 코드를 유지할 수 있기 때문에, 커스텀 코드에 대한 CI/CD 프로세스를 구성할 수 있고 다른 사람과 코드를 더 쉽게 공유하고 협업할 수 있다.
  • 파이썬 패키징은 setuptool을 사용하여 간단한 파이썬 패키지를 생성할 수 있다.
    • 훅, 오퍼레이터, 센서 클래스를 포함하는 airflow_movielens라는 패키지를 생성할 수 있다.
#!/usr/bin/env python

import setuptools

requirements = ["apache-airflow", "requests"]

extra_requirements = {"dev": ["pytest"]}

setuptools.setup(
    name="airflow_movielens",
    version="0.1.0",
    description="Hooks, sensors and operators for the Movielens API.",
    author="Anonymous",
    author_email="anonymous@example.com",
    install_requires=requirements,
    extras_require=extra_requirements,
    packages=setuptools.find_packages("src"),
    package_dir={"": "src"},
    url="<https://github.com/example-repo/airflow_movielens>",
    license="MIT license",
)

패키징 할 때 중요한 필드

  • name : 패키지 이름을 정의한다.
  • version : 패키지의 버전 번호
  • install_requires : 패키지에 필요한 종속 라이브러리 목록
  • packages/package_dir : 설치 시 포함되어야 할 패키지들과 패키지들의 위치를 setuptools에게 전달한다.

부가적으로 필요한 필드(선택)

  • author : 패키지 저자의 이름
  • author_email : 저자의 연락처 정보
  • description : 패키지에 대한 짧고 가독성 있는 설명(일반적으로 한줄로 기술함)
  • url : 온라인에서 패키지를 찾을 수 있는 위치
  • license : 패키지 코드를 배포 할 때 적용하는 라이선스(라이선스를 적용할 경우)