1. 테스트 시작하기
2. 개발을 위해 테스트 사용하기
테스트 시작하기
- Airflow에서 테스트는 작성한 작업이 예상대로 동작하는지 확인하기 위한 중요한 부분이다. 테스트는 다양한 단계에서 적용할 수 있다.
- Airflow 테스트의 두 가지 유형 : 단위 테스트(Unit Test), 통합 테스트(Integration Test)
- 개별 작업 단위로 테스트를 수행 → 단위 테스트
- 여러 작업이 함께 동작할 때 예상대로 동작하는지, 즉 전체 워크플로우 흐름 테스트를 수행 → 통합 테스트
- Airflow 테스트의 두 가지 유형 : 단위 테스트(Unit Test), 통합 테스트(Integration Test)
- 파이썬 테스팅 프레임워크인 unittest와 pytest
pip install pytest
- pytest 명령어는 이름이 test_로 시작하는 모든 파일을 로드하고, test_로 시작되는 모든 기능을 실행한다.
모든 DAG에 대한 무결성 테스트
- Airflow의 관점에서 테스트를 위한 첫 번째 단계는 일반적으로 DAG 무결성 테스트
- DAG의 무결성 : DAG에 사이클이 포함되어 있지 않은지 확인, DAG의 태스크 ID가 고유한 경우 등
- 예시 : 순환 오류가 발생하는 경우
# 오류가 발생하는 DAG 파일
t1 = DummyOperator(task_id="t1", dag=dag)
t2 = DummyOperator(task_id="t2", dag=dag)
t3 = DummyOperator(task_id="t3", dag=dag)
t1 >> t2 >> t3 >> t1
- 일반적으로 테스트 영역을 구성하는 방법은 프로젝트의 최상단 디렉터리에 별도의 tests/ 디렉터리를 생성하여 검사 대상 코드를 그대로 복사하여 구성한다.
- 모든 테스트 대상 파일은 파일 이름은 그대로 따르고 test_ 접두사를 붙인다.
- dags/ 디렉터리에는 모든 DAG 파일이 보관되고, tests/dags/ 디렉터리에는 test_dag_integrity.py 파일이 있다.
- tests/ 디렉터리에는 패키지가 아니라 테스트 코드를 담은 폴더이며, 따라서 init.py 파일이 필요하지 않다.
- 아래의 DAG 무결성 테스트는 모든 DAG 파일이 들어있는 폴더를 대상으로 하며, 이 폴더에서 *.py 파일을 반복적으로 탐색한다.
tests/dags/test_dag_integrity.py
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# DAG_PATH는 모든 DAG 파일을 보관하는 디렉터리(dags/ 디렉터리)를 가리킴
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
# DAG_FILES는 dags/ 에서 .py로 끝나는 파일 리스트를 보유
DAG_FILES = glob.glob(DAG_PATH)
# 발견된 모든 .py 파일에 대해 테스트를 실행
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# 파일 로드하기
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
#파일에서 발견된 모든 DAG 클래스의 객체
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
#파일에서 DAG 객체를 추출하였으므로 원하는 검사를 실행할 수 있다.
# 검사 1. /dags 경로에 있는 모든 .py 파일에 DAG 객체가 적어도 하나 이상 포함되어 있는지에 대한 유효성 검사를 한다.
assert dag_objects
# 검사 2. DAG 객체에 순환 주기 존재 여부를 확인한다.
for dag in dag_objects:
# Test cycles
dag.test_cycle()
# 직접 검사 방법을 추가할 수도 있다. 만약 DAG 이름이 import, export로 시작하는 것을 검사하려면 dag_ids를 확인하면 된다.
assert dag.dag_id.startswith(("import", "export"))
- DAG 무결성 테스트 실행 : pytest tests/→ DAG 및 오퍼레이터에 대해 인스턴스화되는 시점에 대해 몇 가지 다른 검사도 함께 수행 ex) BashOperator를 사용할 때 필수적인 bash_command 인수를 쓰지 않았을 때 오류 표시
CI/CD 파이프라인 설정하기
- 대부분의 CI/CD 시스템은 파이프라인이 정의된 YAML 구성 파일(코드 변경 시 실행할 일련의 단계)로 시작한다.
- 파이프라인이 전체가 성공적으로 동작하기 위해서는 각 단계가 성공적으로 완료 되어야 한다. 이후 Git 저장소에서 “성공한 파이프라인만 마스터에 병합”과 같은 규칙을 적용할 수 있다.
- 파이프라인 정의(YAML 파일)는 일반적으로 프로젝트에서 가장 상단인 루트에 있다.
name: python static checks and tests
on: [push] # GitHub가 push를 수신할 때마다 전체 CI/CD 파이프라인을 실행하도록 지시
jobs:
testing:
runs-on: ubuntu-18.04
steps: # 각 단계는 코드 조각을 실행
...
...
- 개발자 및 데이터 엔지니어는 데이터 파이프라인을 만들 때 DAG 무결성 테스트를 사용하여 무결성을 검증하고, 그 이후 코드 변경이나 업데이트가 필요한 경우 CI/CD 파이프라인을 사용하여 배포할 수 있다.
단위 테스트 작성하기
- 프로젝트에 있는 모든 DAG의 유효성을 초기에 확인하는 CI/CD 파이프라인을 준비하여 실행했으므로, 개별 단위 테스트를 시작해 볼 수 있다.
- 예를 들어, 어떤 메서드의 인수에서 유효한 입력 값은 양수일 때 사용자가 음수를 제공하면 오류를 반환할 것이다. 합리적인 방법 중 하나는 사용자의 입력 값을 바로 전달하기 전에 해당 값이 유효한 지를 먼저 판단하는 절차를 구성하는 것이다. 이런 동작을 구성하는 것이 우리가 만들어야 할 테스트 과정이기도 하다.
- 예를 들어, 어떤 오퍼레이터의 정확성에 대하여 테스트하는 방법으로는, 주어진 값으로 오퍼레이터를 실행하고 결과가 예상대로인지 확인하여 전체적으로 테스트하는 방법이 있다.
- 이를 위해 Airflow 시스템 외부와 단위 테스트 내부에서 오퍼레이터를 단독으로 실행하기 위해 몇 가지 pytest 컴포넌트가 필요하다. 이를 통해 다른 상황에서 오퍼레이터를 실행하고 올바르게 작동하는 지 확인할 수 있다.
pytest 프로젝트 구성하기
- pytest를 사용하면 테스트 스크립트에 test_ 접두사가 있어야 한다.
- 예1) BashOperator를 테스트하는 예제 테스트 함수
- test_operator.py 파일 안에 테스트로 호출할 함수를 만든다.
def test_example(): task = BashOperator( task_id="test", bash_command="echo 'hello!'", xcom_push=True ) # 이 테스트에서는 실제 운영 설정에서 실행하지 않고 execute() 메서드를 직접 호출한다. # 이것은 오퍼레이터를 실행하기 위해 호출할 수 있는 가장 낮은 수준의 함수이다. # 또한 BashOperator를 실행하기 위해서 태스크 인스턴스 콘텍스트는 필요하지 않다. 따라서 빈 컨텍스트를 제공한다. result = task.execute(context={}) assert result == "hello!"
- 예2) 목업(mocking)으로 해결
- 위 코드 테스트 시 오퍼레이터에 필수 인수가 없는 오류 발생, 메타스토어의 커넥션 ID를 가리키는 필수 인수 conn_id가 누락되어 테스트가 실패함
- 테스트는 각각 격리된 환경에서 진행되어야 한다. 다른 테스트의 결과가 다른 곳에 영향을 줄 수 없음을 의미하는데, 이런 경우 테스트 사이에 발생하는 정보를 데이터베이스를 이용해 전달하는 방법은 권장하지 않는다. 목업을 이용해 해결한다.(pip install pytest-mock)
- 목업은 특정 작업이나 객체를 모조로 만드는 것이다. 이를 통해 외부 시스템에 실제 연결하지 않고도 테스트를 개발하고 실행할 수 있다.
from airflowbook.operators.movielens_operator import ( # 호출되는 위치에서 목업 메서드를 가져와야 한다. MovielensPopularityOperator, MovielensHook, ) def test_movielenspopularityoperator(mocker): #mocker라는 인수를 테스트 함수에 전달 mock_get = mocker.patch.object( # 목업 객체로 객체의 속성을 패치한다. mock_get에는 동작을 검증하기 위해 사용할 수 있는 여러 속성이 포함되어 있다. MovielensHook, # 패치할 객체 "get_connection", # 패치할 함수 return_value=Connection( # 반환되는 값 conn_id="test", login="airflow", password="airflow" ), ) task = MovielensPopularityOperator( task_id="test_id", conn_id="testconn", start_date="2015-01-01", end_date="2015-01-03", top_n=5, ) result = task.execute(context=None) assert len(result) == 5 assert mock_get.call_count == 1 # 한 번만 호출된 것인지 확인 mock_get.assert_called_with("testconn") # 예상되는 conn_id로 호출된 것을 확인
디스크의 파일로 테스트하기
예1) JSON 리스트를 가진 파일을 읽고 이를 CSV 형식으로 쓰는 오퍼레이터
import csv
import json
from pathlib import Path
from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
def test_json_to_csv_operator(tmp_path: Path):
print(tmp_path.as_posix())
input_path = tmp_path / "input.json"
output_path = tmp_path / "output.csv"
input_data = [
{"name": "bob", "age": "41", "sex": "M"},
{"name": "alice", "age": "24", "sex": "F"},
{"name": "carol", "age": "60", "sex": "F"},
]
with open(input_path, "w") as f:
f.write(json.dumps(input_data))
operator = JsonToCsvOperator(
task_id="test", input_path=input_path, output_path=output_path
)
operator.execute(context={}) # 오퍼레이터 실행
# Read result
with open(output_path, "r") as f:
reader = csv.DictReader(f)
result = [dict(row) for row in reader]
# Assert
assert result == input_data # 내용 확인을 위한 assert, 테스트 후 tmp_path와 콘텐츠는 제거
- 해당 오퍼레이터는 입력(JSON) 경로와 출력(CSV) 경로의 두 가지 입력 인수를 필요로 한다. 테스트를 위한 입력으로 테스트 디렉터리에 고정 파일을 저장하여 사용한다면, 출력 파일은 어떻게 저장하여야 할까?
- 임시 저장소와 관련된 작업을 위한 tempfile 모듈을 사용하여 저장한다. 사용 후 디렉터리와 그 내용이 지워지기 때문에 파일 시스템에 해당 항목이 남지 않는다. 테스트를 시작하면 임시 디렉터리가 생성된다.
- pytest는 tmp_dir 및 tmp_path라는 tempfile 모듈에 대한 편리한 사용 방법을 제공한다. 위 예시에서는 tmp_path를 사용하였다. tmp_path 인수는 각 테스트 시 호출되는 함수를 나타내는데, pytest에서는 이를 픽스처(fixture)라고 한다.
테스트에서 DAG 및 태스크 콘텍스트로 작업하기
- 일부 오퍼레이터는 실행을 위해 더 많은 콘텍스트(변수 템플릿 등..) 또는 작업 인스턴스 콘텍스트가 필요하다.
따라서, 단순히 operator.execute(context={}) 형태로 오퍼레이터를 실행할 수는 없다. - 실제 Airflow 시스템을 실행하는 경우, 오퍼레이터를 실행할 때 더 많은 단계가 수행된다. 필요한 경우 런타임 태스크 콘텍스트를 operator.execute()에 수동으로 제공한다.
- 태스크 인스턴스 콘텍스트 구성하기(ex) 모든 변수를 수집)
- 현재 태스크 인스턴스에 대한 XCom 데이터 삭제
- 템플릿 변수 구체화하기
- operator.pre_execute() 실행하기
- operator.execute() 실행하기
- XCom에 반환 값 전송하기
- operator.post_execute() 실행하기
- operator.execute(context={})로 테스트할 때는 ds 변수를 사용할 수 없다.
- 따라서, 이를 위해 Airflow 자체에서 태스크를 시작할 때 사용하는 실제 메서드인 operator.run()(BaseOperator 클래스의 메서드)를 호출한다.
- 테스트 목적으로 DAG를 만들지 않고 그대로 실행할 수 있었지만, run() 메서드를 사용하기 위해서는 Airflow가 태스크를 실행할 때 DAG 객체를 참조하기 때문에 오퍼레이터에게 DAG를 제공해야 한다.
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test",
login="airflow",
password="airflow"),
)
task = MovielensDownloadOperator(
task_id="test",
conn_id="testconn",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
output_path=str(tmp_path / "{{ ds }}.json"),
dag=test_dag,
)
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
)
- 위 태스크를 테스트 실행하면 Airflow 메타스토어 관련 오류 발생
- 태스크를 실행하기 위해 Airflow는 동일한 실행 날짜의 이전 태스크 인스턴스와 같은 다양한 정보를 데이터베이스에서 쿼리한다. 하지만, AIRFLOW_HOME(설정되지 않은 경우 ~/airflow) 경로에 있는 Airflow 데이터베이스를 초기화(airflow db init)하지 않았거나 실행 중인 데이터베이스로 Airflow를 구성하지 않은 경우 해당 데이터베이스에서 읽거나 쓸 수 없다.
- 테스트 진행할 때도 메타스토어가 필요하다. 테스트 중에 메타스토어를 처리하는 방법 몇 가지가 있다.
- 연결 자격 증명을 조회 시 모든 데이터베이스 호출에 대해 목업 환경을 구성할 수 있다. 하지만 이 방법은 매우 번거롭다.
- 보다 실용적인 접근 방식은 Airflow가 테스트를 실행하는 동안 쿼리할 수 있는 실제 메타스토어를 실행하는 것이다. 이를 위해서 데이터베이스를 초기화하는 airflow db init을 실행해야 한다. 별도로 데이터베이스에 대한 구성이 없으면 ~/airflow/airflow.db에 저장된 SQLite 파일이 데이터베이스 역할을 수행한다.(AIRFLOW_HOME 환경 변수를 설정하면 해당 디렉터리에 데이터베이스를 저장)
- 위에서처럼 Airflow가 쿼리할 메타스토어를 설정 후 테스트를 실행하면 성공 여부를 확인할 수 있고, 테스트 중에 Airflow 메타스토어에 정보가 기록된 것을 확인할 수 있다.
→ task.run()을 호출하면 태스크 실행에 대한 세부 정보가 데이터베이스에 저장된다. - 이 테스트에서의 두 가지 고려사항
- 1. DAG를 사용하는 테스트가 여러 번 있는 경우, pytest와 함께 재사용 가능
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2015, 1, 1)
},
schedule_interval="@daily",
)
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test",
login="airflow",
password="airflow"),
)
task = MovielensDownloadOperator(
task_id="test",
conn_id="testconn",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
output_path=str(tmp_path / "{{ ds }}.json"),
dag=test_dag,
)
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
)
- 2. task.run()
- run()은 두 개의 날짜를 사용하며, DAG에서 지정된 두 날짜 사이에 실행할 태스크의 인스턴스 스케줄 주기를 계산한다. 위의 경우 동일한 두 날짜를 입력하므로 단일 태스크 인스턴스가 하나만 실행하게 된다.
- task.run() 메서드는 Airflow에서 태스크를 수동으로 실행하는 데 사용된다.
여기서 사용된 매개변수들은...:
- start_date: 이것은 태스크의 시작 날짜를 나타낸다. 보통은 DAG의 start_date를 사용한다. 위 코드에서는dag.default_args["start_date"]를 사용하고 있다.
- end_date: 이것은 태스크의 종료 날짜를 나타낸다. 위 코드에서도 dag.default_args["start_date"]를 사용하고 있다. 이 경우 시작과 종료 날짜가 동일하므로, 이 태스크는 하루치 데이터만 처리할 것다.
여기서 주의할 점은 태스크의 실행은 해당 DAG의 start_date 및 end_date 범위 내에서만 가능하다는 것이다. 즉, 태스크를 실행하기 위해서는 DAG가 실행될 수 있는 시간대에 있어야 하며, 태스크의 start_date 및 end_date도 그 범위 내에 있어야 한다.
- 만약 start_date와 end_date를 변경하면, 해당 태스크는 새로운 범위 내에서 실행된다.
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2015, 1, 1)
},
schedule_interval="@daily",
)
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2015, 1, 1)
},
schedule_interval="@daily",
)
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test",
login="airflow",
password="airflow"),
)
task = MovielensDownloadOperator(
task_id="test",
conn_id="testconn",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
output_path=str(tmp_path / "{{ ds }}.json"),
dag=test_dag,
)
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
)
외부 시스템 작업
- MovieLens 평점을 읽고 결과를 Postgres 데이터베이스에 저장하는 오퍼레이터에서, execute() 메서드는 데이터를 가져오고 결과를 Postgres에 대한 쿼리로 변환하여 MovieLens API와 Postgres 데이터베이스를 연결한다.
- 만약 로컬 PC에서 운영 중인 Postgres 데이터베이스에 접근할 수 없는 경우, 도커로 테스트하기 위해 로컬 Postgres 데이터베이스로 쉽게 구성할 수 있다.
- pytest 테스트를 할 수 있도록 도커 컨테이너의 제어가 가능한 여러 가지 파이썬 패키지가 있다. 이 책에서는 pytest-docker-tools를 사용한다.(pip install pytest_docker_tools)
- 오퍼레이터가 올바르게 작동하면 테스트가 끝날 때 도커 컨테이너로 생성된 Postgres 데이터베이스에 결과가 기록된다. 도커 컨테이너로 테스트하면 테스트를 위한 목업 환경을 구성할 필요 없이 실제 훅 메서드를 사용할 수 있다.
# pytest_docker_tools로 테스트할 도커 이미지 가져오기 from pytest_docker_tools import fetch postgres_image = fetch(repository="postgres:11.1-alpine") # pytest_docker_tools 픽스처를 이용해 테스트 시에 도커 이미지 사용하기 def test_call_fixture(postgres_image): print(postgres_image.id)
- 이제 이 이미지 ID를 사용하여 Postgres 컨테이너를 구성하고 시작할 수 있다.
from pytest_docker_tools import container postgres_container = container( image="{postgres_image.id}" ports={"5432/tcp":None}, ) def test_call_fixture(postgres_container): print(f"Running Postgres container named {postgres_container.name} " f"on port {postgres_container.ports['5432/tcp'][0].")
- 실제 데이터베이스에 대한 테스트를 위해 Postgres 컨테이너 초기화하기
postgres_image = fetch(repository="postgres:11.1-alpine") postgres = container( image="{postgres_image.id}", environment={ "POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass" }, ports={"5432/tcp": None}, volumes={ os.path.join(os.path.dirname(__file__), "postgres-init.sql"): { "bind": "/docker-entrypoint-initdb.d/postgres-init.sql" } }, )
# postgres-init.sql 파일 # 테스트 데이터베이스에 대한 스키마 초기화하기 SET SCHEMA 'public'; CREATE TABLE movielens ( movieId integer, rating float, ratingTimestamp integer, userId integer, scrapeTime timestamp );
- 이 모든 것을 스크립트에 통합하면 Postgres 데이터베이스에 대해 테스트할 수 있다.
# 전체 테스트는 컨테이너 초기화와 우리가 구성해야하는 목업 연결로 다소 복잡해졌다. import os import pytest from airflow.models import Connection from pytest_docker_tools import fetch, container from airflowbook.operators.movielens_operator import ( MovielensHook, MovielensToPostgresOperator, PostgresHook, ) postgres_image = fetch(repository="postgres:11.1-alpine") postgres = container( image="{postgres_image.id}", environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"}, ports={"5432/tcp": None}, volumes={ os.path.join(os.path.dirname(__file__), "postgres-init.sql"): { "bind": "/docker-entrypoint-initdb.d/postgres-init.sql" } }, ) def test_movielens_to_postgres_operator(mocker, test_dag, postgres): mocker.patch.object( MovielensHook, "get_connection", return_value=Connection(conn_id="test", login="airflow", password="airflow"), ) mocker.patch.object( PostgresHook, "get_connection", return_value=Connection( conn_id="postgres", conn_type="postgres", host="localhost", login="testuser", password="testpass", port=postgres.ports["5432/tcp"][0], ), ) task = MovielensToPostgresOperator( task_id="test", movielens_conn_id="movielens_id", start_date="{{ prev_ds }}", end_date="{{ ds }}", postgres_conn_id="postgres_id", insert_query=( "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) " "VALUES ({0}, '{{ macros.datetime.now() }}')" ), dag=test_dag, ) pg_hook = PostgresHook() row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0] assert row_count == 0 pytest.helpers.run_airflow_task(task, test_dag) row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0] assert row_count > 0
개발을 위해 테스트 사용하기
DAG 완료 테스트하기
- 지금까지 테스트는 단일 오퍼레이터 테스트에 중점을 두고 설명.
- 워크플로 개발에서 가장 중요한 측면은 모든 구성 요건이 서로 잘 맞는지 확인하는 것이다. 하나의 오퍼레이터가 논리적으로 올바르게 실행될 수 있지만, 예상하지 못한 방식으로 데이터가 변환되어 후속 오퍼레이터가 실패할 수도 있다.
- “DAG의 모든 오퍼레이터가 예상한 대로 작동하려면”에 관한 것은 대답하기 어렵다. 다양한 이유로 실제 환경을 시뮬레이션하는 것이 항상 가능하지 않다.
- 예를 들어 DTAP(개발, 테스트, 인수, 프로덕션) 분리 시스템을 사용할 경우, 개발 환경에서 완벽한 프로덕션 환경을 시뮬레이션할 수 없기 때문에, 소프트웨어를 개발하고 검증하는 데 사용할 수 있는 가능한 한 구현할 수 있는 프로덕션 환경을 만들어 테스트한다.
요약...
- DAG 무결성 테스트는 DAG의 기본 오류를 필터링한다.
- pytest 및 플러그인은 테스트를 위해, 임시 디렉터리 및 테스트 중 도커 컨테이너 관리를 위한 플로그인과 같은 몇 가지 유용한 구성을 제공한다.
- 태스크 인스턴스 콘텍스트를 사용하지 않는 오퍼레이터는 execute()로 실행할 수 있다.
- 태스크 인스턴스 콘텍스트를 사용하는 오퍼레이터는 DAG와 함께 실행되어야 한다.
- 통합 테스트를 위해서는 프로덕션 환경과 최대한 비슷하게 시뮬레이션해야 한다.
'Airflow' 카테고리의 다른 글
[Apache Airflow 기반의 데이터 파이프라인] 모범사례 (1) | 2024.02.26 |
---|---|
[Apache Airflow 기반의 데이터 파이프라인] 컨테이너에서 태스크 실행하기 (0) | 2024.02.23 |
[Apache Airflow 기반의 데이터 파이프라인] 커스텀 컴포넌트 빌드 (0) | 2024.02.21 |
[Apache Airflow 기반의 데이터 파이프라인] 워크플로 트리거 (0) | 2024.02.20 |
[Apache Airflow 기반의 데이터 파이프라인] 태스크간 의존성 정의하기 (0) | 2024.02.19 |