- 커스텀 컴포넌트 빌드
- 커스텀 훅 빌드하기
- 커스텀 오퍼레이터 빌드하기
- 커스텀 센서 빌드하기
- 컴포넌트 패키징하기
커스텀 컴포넌트 빌드
- Airflow의 강점 중 하나는 여러 유형의 시스템들 간의 작업을 조율할 때 쉽게 확장할 수 있다.
(SageMaker 기반 머신러닝 모델 학습, ECSOpserator 사용해서 ECS기반 작업가능) - 그러나, Airflow가 지원하지 않는 시스템에서 태스크 실행해야 할 수 있다. 또한 태스크 중 PythonOperator를 사용하여 구현할 수 있지만 수많은 단순 반복적인 코드가 필요하여, 여러 DAG에서 사용하기 힘든경우가 발생한다.
- 이 때문에 Airflow 에서 특정 시스템에서 작업이 실행되어야 할 때, 이를 위한 오퍼레이터를 개발하여 빌드해 왔었다.
- Airflow에서 커스텀 오퍼레이션을 직접 쉽게 구현해 생성할 수 있다. 이를 이용하여 지원되지 않는 시스템에서 작업을 실행할 수 있다. 또한, 여러 DAG에 적용되는 작업의 처리를 쉽게 공용으로 만들어서 단순화 시킬 수 있다.
PythonOperator 으로 작업하기
- 영화 평점 API를 가지고 오기 위한 MovieLens 데이터 세트를 사용한다.
- api에 대해 offset=number 으로 레코드를 가지고온다.
- 특정 기간 동안의 평점 데이턱를 가져오려면 start_date, end_date 파라미터를 사용하여 주어진 시작/종료 날짜 사이의 평점 데이터를 가져올 수 있다.
<http://localhost:5000/ratings?start_date=2019-01-01&end_date=2019-01-02>
- 이러한 필터링 기능을 사용하면 전체 데이터 세트를 로드할 필요없이 증분 방식으로 로드할 수 있다.
평점가져오기
- request 라이브러리의 get 요청으로 가져온다.
- 쿼리에 사용되는 파라미터 등 추가 인수 값을 get 메서드에 전달한다. (start_date, end_date) (response 객체의 raise_for_status 메서드를 사용하여 쿼리의 정상 수행 여부를 체크한다.) 요청세션, 인증 정보, 세션 생성 처리 기능을 캡슐화한다.
- API 결과의 pagination 처리 기능을 구현한다. 결과를 반환할 때 yield from 을 사용하여 각 평점 레코드들의 제너레이터를 효과적으로 전달함으로써 결과의 페이지 처리에 신경을 많이 쓰지 않게 할 수 있다.
DAG 구축하기
- 스케쥴 간격마다 평점 데이터를 가져온다. 평점 데이터를 JSON 출력 파일로 dump할 수 있는데 날짜별로 파티션한다.
(이렇게 하면 데이터 재수집이 필요한 경우, 필요한 부분만 가져올 수 있어서 용이하다.) - 헬퍼함수 만들어주기
커스텀 훅 빌드하기
- 위와 같이 API연동과 같이 복잡한 작업의 처리 방법 중 하나는, 코드를 캡슐화 하고 재활용 가능한 Airflow 훅으로 만드는 것이다. 이 작업으로 모든 API 전용 코드를 한 곳에 보관하고, DAG의 여러 부분에서 이 훅을 간단하게 사용할 수 있다.
- 그러면 유사한 용도로 영화 평점 데이터를 가지고 올 때, API 연동에 대한 노력을 줄일 수 있다.
hook = MovielensHook(conn_id="movielens") #훅 생성
ratings = hook.get_ratings(start_date, end_date) #훅을 사용하여 특정 작업 수행
hook.close() # 훅 닫고, 새로운 리소스 해제
- 또한, 훅을 사용하면 Airflow의 DB와 UI를 통해 credential과 연결된 관리 기능을 사용할 수 있다.
- 이 기능을 적용하면 API 자격 증명 정보를 DAG에 수동으로 넣지 않아도 됨
커스텀 훅 설계하기
- Airflow에서의 모든 Hook은 추상 클래스인 BaseHook 클래스의 서브클래스로 생성한다.
- 자격 증명 정보를 보다 안전하게 관리하려면 하드코딩보다 Airflow에서 제공하는 기능을 쓰자
- Admin > Connection 항목에서 작업을 수행할 수있다.
- BaseHook 클래스에서 get_connection이라는 메서드를 제공하는데 이 메서드는 메타스토어에서 커넥션 ID에 대한 연결 세부 정보를 가져온다.
from airflow.hooks.base_hook import BaseHook
class MovielensHook(BaseHook):
DEFAULT_HOST = "movielens"
DEFAULT_SCHEMA = "http"
DEFAULT_PORT = 5000
def __init__(self, conn_id): # conn_id는 훅에게 어떤 커넥션을 사용하는지 전달한다.
super().__init__() # basehook의 생성자 호출
self._conn_id = conn_id # 커넥션 id를 꼭 저장해야한다.
def get_conn(self): # 주어진 id를 사용하여 커넥션 설정 정보를 가져옴
config = self.get_connection(self._conn_id)
schema = DEFAULT_SCHEMA
host = DEFAULT_HOST
port = DEFAULT_PORT
base_url = f"{schema}://{host}:{port}"
session = requests.Session()
# 커넥션 설정 정보의 login/password 정보를 사용하여 요청 세션 생성
if config.login:
session.auth = (config.login, config.password)
#요청 세션과 기본 URL 반환
return session, base_url
- 위의 방법은 get_conn 함수를 호출 할 때마다. 자격 증명 정보를 가져온다.
- 이를 해결하기 위해 인스턴스에 session과 base_url을 보호(protected) 변수에 캐싱한다.
MovielensHook
class MovielensHook(BaseHook):
def __init__():
...
self.session = None
self._base_url = None # 세션과 기본 URL 캐싱을 위한 추가 변수 2개
def get_conn(self):
if self._session is None: # 세션을 생성하기 전에 연결된 세션이 있는지 체크한다.
config = self.get_connection(self._conn_id)
...
self._base_url = f"base_url = f"{schema}://{host}:{port}""
self._session = requests.Session()
...
return self._session, self._base_url
- 두번째 호출될때에는 self._session 변수는 더 이상 None이 아니고 캐싱된 세션과 기본 URL이 반환된다.
커스텀 훅으로 DAG 빌드하기
from custom.hooks import MovielensHook
hook = Movielens(conn_id = conn_id)
ratings = hook.get_ratings(
start_date = start_date,
end_date = end_date,
batch_size = batch_size
)
- 위의 리스트를 수행하면 평점 레코드의 제너레이터를 반환한다. 이를 사용하여 평점 데이터를 출력 파일(JSON)으로 저장한다.
- DAG에 훅을 사용하기 위해 훅 호출 코드를 PythonOperator에 래핑해야 한다.
- DAG실행에 필요한 시작/종료 날짜 입력하고, 적절한 출력 파일로 저장하는 코드이다.
import datetime as dt
import logging
import json
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from custom.hooks import MovielensHook
with DAG(
dag_id="02_hook",
description="Fetches ratings from the Movielens API using a custom hook.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
hook = MovielensHook(conn_id=conn_id)
ratings = list(
hook.get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_)
PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
op_kwargs={"conn_id": "movielens"},
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/data/custom_hook/{{ds}}.json",
},
)
커스텀 오퍼레이터 빌드하기
- 커스텀 오퍼레이터를 직접 구현하여, 반복적인 태스크 수행 시 코드의 반복을 최소화 할 수 있다.
- 모든 오퍼레이터는 BaseOperator 클래스의 서브 클래스로 만들어야 한다.
- BaseOperator은 제네릭 인수를 많이 가지고 있다.
- 모든 제네릭 인수를 모두 나열하지 않도록 **kwargs를 인수로 사용하도록 하자
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator): # BaseOperator 클래스 상속
@apply_defaults # 기본 DAG인수를 커스텀 오퍼레이터에게 전달히기 위한 데커레이터 (**필수!**)
def __init__(self, conn_id, ... , **kwargs): #BaseOperator 생성자에게 추가 키워드 인수 제공
super.__init__(self, **kwargs)
self._conn_id = conn_id
def execute(self,context):
...
- 커스텀 오퍼레이터에만 사용되는 인수들을 명시적으로 지정할 수 있다.
- 일반적으로 conn_id와 작업에 필요한 세부사항((시작/종료) 날짜, 쿼리…)이 포함된다.
- apply_defaults 의 데커레이터는 커스텀 오퍼레이터 init 에 적용되고 이는 항상 포함해야한다.
- execute 메서드는 Airflow가 DAG를 실행할 때 DAG안에서 실행되는 오퍼레이터의 메인 메서드가 된다.
- CustomOperator의 execute 메서드는 context 하나의 파라미터만 받고, Airflow의 모든 콘텍스트 변수를 담고 있는 dict 객체이다.
MovielensFetchRatingsOperator
class MovielensFetchRatingsOperator(BaseOperator):
def execute(self, context):
hook = MovielensHook(self._conn_id)
try:
self.log.info(
f"Fetching ratings for {self._start_date} to {self._end_date}"
)
ratings = list(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
batch_size=self._batch_size,
)
)
self.log.info(f"Fetched {len(ratings)} ratings")
finally:
# Make sure we always close our hook's session.
hook.close()
self.log.info(f"Writing ratings to {self._output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(self._output_path)
os.makedirs(output_dir, exist_ok=True)
# Write output as JSON.
with open(self._output_path, "w") as file_:
json.dump(ratings, fp=file_)
fetch_ratings = MovielensFetchRatingsOperator(
task_id = "fetch_ratings",
conn_id = "movielens",
start_date = "2020-01-01"
end_date = "2020-01-01"
output_path = "/data/2020-01-01.json"
)
- 오퍼레이터가 평점데이터를 가져오는 시작/종료 날짜를 미리 지정해야 하는 단점이 있다.
- 즉, 실행 날짜에 상관없이 하드코딩된 날짜 기간에 대한 평점 데이터를 가지고 온다.
- 위와 같은 문제를 해결하기 위해, templates_field 클래스 변수에 해당 변수명을 지정하여 Airflow에 알려준다.
MovielensFetchRatingsOperator
class MovielensFetchRatingsOperator(BaseOperator):
template_fields = ("_start_date", "_end_date", "_output_path")
# 커스텀 오퍼레이터에서 템플릿화 할 인스턴스 변수들을 Airflow에게 알려줍니다.
@apply_defaults
def __init__(
self,
conn_id,
output_path,
start_date="{{ds}}",
end_date="{{next_ds}}",
batch_size=1000,
**kwargs,
):
super(MovielensFetchRatingsOperator, self).__init__(**kwargs)
self._conn_id = conn_id
self._output_path = output_path
self._start_date = start_date
self._end_date = end_date
self._batch_size = batch_size
from custom.operators import MovielensFetchRatingsOperator
fetch_ratings = MovielensFetchRatingsOperator(
task_id = "fetch_ratings",
conn_id = "movielens",
start_date = "{{ds}}"
end_date = "{{next_ds}}"
output_path = "/data/custom_operator/{{ds}}.json"
)
- 템플릿을 활용하면 이렇게 할 수 있다.
커스텀 센서 빌드하기
- 센서란? : 특별한 유형의 오퍼레이터이며, DAG안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하기 위해 사용되는 것이다.
from airflow.sensors.base import BaseSensorOperator
class MyCustomSensor(BaseSensorOperator):
...
def poke(self, context): #Operator의 execute와 동일한 역할
class MovielensRatingsSensor(BaseSensorOperator):
template_fields = ("_start_date", "_end_date")
@apply_defaults
def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
- 센서가 오퍼레이터의 특정유형이기 때문에, 오퍼레이터를 구현했을 때 사용한 것과 같은 설정을 사용한다.
def poke(self, context):
hook = MovielensHook(self._conn_id)
try:
next(
hook.get_ratings(
start_date=self._start_date, end_date=self._end_date, batch_size=1
)
)
self.log.info(
f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
)
return True
except StopIteration:
self.log.info(
f"Didn't find any ratings for {self._start_date} "
f"to {self._end_date}, waiting..."
)
return False
finally:
hook.close()
- 앞서 구현했던 커스텀 훅과 커스텀 센서 클래스를 사용해 짧고 간결하게 작성할 수 있다.
- 생성자를 만든 다음 poke메서드를 구현하면 된다. 이 메서드 안에서 특정 기간 동안의 평점 데이터가 있는지 체크해야하는 것인데, 시작/종료 날짜 사이에 레코드가 있으면 True를 반환시킨다.
- 모든 평점 레코드를 가져올 필요 없이, 해당 범위에 적어도 레코드 한 개라도 존재하는지 체크하면 된다.
import datetime as dt
from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator
from custom.sensors import MovielensRatingsSensor
with DAG(
dag_id="04_sensor",
description="Fetches ratings from the Movielens API, with a custom sensor.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
wait_for_ratings = MovielensRatingsSensor(
task_id="wait_for_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
)
fetch_ratings = MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/data/custom_sensor/{{ds}}.json",
)
wait_for_ratings >> fetch_ratings
컴포넌트 패키징하기
- 컴포넌트를 배포하는 더 나은 방법은 파이썬패키지에 코드를 넣는것이다.
- DAG와는 다르게 별도로 코드를 유지할 수 있기 때문에, 커스텀 코드에 대한 CI/CD 프로세스를 구성할 수 있고 다른 사람과 코드를 더 쉽게 공유하고 협업할 수 있다.
- 파이썬 패키징은 setuptool을 사용하여 간단한 파이썬 패키지를 생성할 수 있다.
- 훅, 오퍼레이터, 센서 클래스를 포함하는 airflow_movielens라는 패키지를 생성할 수 있다.
#!/usr/bin/env python
import setuptools
requirements = ["apache-airflow", "requests"]
extra_requirements = {"dev": ["pytest"]}
setuptools.setup(
name="airflow_movielens",
version="0.1.0",
description="Hooks, sensors and operators for the Movielens API.",
author="Anonymous",
author_email="anonymous@example.com",
install_requires=requirements,
extras_require=extra_requirements,
packages=setuptools.find_packages("src"),
package_dir={"": "src"},
url="<https://github.com/example-repo/airflow_movielens>",
license="MIT license",
)
패키징 할 때 중요한 필드
- name : 패키지 이름을 정의한다.
- version : 패키지의 버전 번호
- install_requires : 패키지에 필요한 종속 라이브러리 목록
- packages/package_dir : 설치 시 포함되어야 할 패키지들과 패키지들의 위치를 setuptools에게 전달한다.
부가적으로 필요한 필드(선택)
- author : 패키지 저자의 이름
- author_email : 저자의 연락처 정보
- description : 패키지에 대한 짧고 가독성 있는 설명(일반적으로 한줄로 기술함)
- url : 온라인에서 패키지를 찾을 수 있는 위치
- license : 패키지 코드를 배포 할 때 적용하는 라이선스(라이선스를 적용할 경우)
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기 (0) | 2024.02.23 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 테스트하기 (0) | 2024.02.22 |
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 (0) | 2024.02.20 |
[Apache Airflow 기반의 데이터 파이프라인] 태스크간 의존성 정의하기 (0) | 2024.02.19 |
[Apache Airflow 기반의 데이터 파이프라인] Airflow 콘텍스트를 사용하여 태스크 템플릿 작성하기 (0) | 2024.02.17 |