본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] 테스트하기

by 개복취 2024. 2. 22.

 

1. 테스트 시작하기

2. 개발을 위해 테스트 사용하기

 

테스트 시작하기

  • Airflow에서 테스트는 작성한 작업이 예상대로 동작하는지 확인하기 위한 중요한 부분이다. 테스트는 다양한 단계에서 적용할 수 있다.
    • Airflow 테스트의 두 가지 유형 : 단위 테스트(Unit Test), 통합 테스트(Integration Test)
      1. 개별 작업 단위로 테스트를 수행 → 단위 테스트
      2. 여러 작업이 함께 동작할 때 예상대로 동작하는지, 즉 전체 워크플로우 흐름 테스트를 수행 → 통합 테스트
  • 파이썬 테스팅 프레임워크인 unittest와 pytest
pip install pytest
  • pytest 명령어는 이름이 test_로 시작하는 모든 파일을 로드하고, test_로 시작되는 모든 기능을 실행한다.

모든 DAG에 대한 무결성 테스트

  • Airflow의 관점에서 테스트를 위한 첫 번째 단계는 일반적으로 DAG 무결성 테스트
    • DAG의 무결성 : DAG에 사이클이 포함되어 있지 않은지 확인, DAG의 태스크 ID가 고유한 경우 등
  • 예시 : 순환 오류가 발생하는 경우
# 오류가 발생하는 DAG 파일
t1 = DummyOperator(task_id="t1", dag=dag)
t2 = DummyOperator(task_id="t2", dag=dag)
t3 = DummyOperator(task_id="t3", dag=dag)

t1 >> t2 >> t3 >> t1
  •  일반적으로 테스트 영역을 구성하는 방법은 프로젝트의 최상단 디렉터리에 별도의 tests/ 디렉터리를 생성하여 검사 대상 코드를 그대로 복사하여 구성한다.
  • 모든 테스트 대상 파일은 파일 이름은 그대로 따르고 test_ 접두사를 붙인다.
  • dags/ 디렉터리에는 모든 DAG 파일이 보관되고, tests/dags/ 디렉터리에는 test_dag_integrity.py 파일이 있다.
  • tests/ 디렉터리에는 패키지가 아니라 테스트 코드를 담은 폴더이며, 따라서 init.py 파일이 필요하지 않다.
  • 아래의 DAG 무결성 테스트는 모든 DAG 파일이 들어있는 폴더를 대상으로 하며, 이 폴더에서 *.py 파일을 반복적으로 탐색한다.

tests/dags/test_dag_integrity.py

import glob
import importlib.util
import os

import pytest
from airflow.models import DAG

# DAG_PATH는 모든 DAG 파일을 보관하는 디렉터리(dags/ 디렉터리)를 가리킴
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
# DAG_FILES는 dags/ 에서 .py로 끝나는 파일 리스트를 보유
DAG_FILES = glob.glob(DAG_PATH)

# 발견된 모든 .py 파일에 대해 테스트를 실행
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    # 파일 로드하기
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)
		
    #파일에서 발견된 모든 DAG 클래스의 객체
    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
    
    #파일에서 DAG 객체를 추출하였으므로 원하는 검사를 실행할 수 있다.

    # 검사 1. /dags 경로에 있는 모든 .py  파일에 DAG 객체가 적어도 하나 이상 포함되어 있는지에 대한 유효성 검사를 한다.
    assert dag_objects

    # 검사 2. DAG 객체에 순환 주기 존재 여부를 확인한다.
    for dag in dag_objects:
        # Test cycles
        dag.test_cycle()

        # 직접 검사 방법을 추가할 수도 있다. 만약 DAG 이름이 import, export로 시작하는 것을 검사하려면 dag_ids를 확인하면 된다.
        assert dag.dag_id.startswith(("import", "export"))
  • DAG 무결성 테스트 실행 : pytest tests/→ DAG 및 오퍼레이터에 대해 인스턴스화되는 시점에 대해 몇 가지 다른 검사도 함께 수행 ex) BashOperator를 사용할 때 필수적인 bash_command 인수를 쓰지 않았을 때 오류 표시

CI/CD 파이프라인 설정하기

  • 대부분의 CI/CD 시스템은 파이프라인이 정의된 YAML 구성 파일(코드 변경 시 실행할 일련의 단계)로 시작한다.
    • 파이프라인이 전체가 성공적으로 동작하기 위해서는 각 단계가 성공적으로 완료 되어야 한다. 이후 Git 저장소에서 “성공한 파이프라인만 마스터에 병합”과 같은 규칙을 적용할 수 있다.
    • 파이프라인 정의(YAML 파일)는 일반적으로 프로젝트에서 가장 상단인 루트에 있다.
name: python static checks and tests

on: [push] # GitHub가 push를 수신할 때마다 전체 CI/CD 파이프라인을 실행하도록 지시

jobs:
    testing:
        runs-on: ubuntu-18.04
        steps: # 각 단계는 코드 조각을 실행
            ...
            ...
  • 개발자 및 데이터 엔지니어는 데이터 파이프라인을 만들 때 DAG 무결성 테스트를 사용하여 무결성을 검증하고, 그 이후 코드 변경이나 업데이트가 필요한 경우 CI/CD 파이프라인을 사용하여 배포할 수 있다.

단위 테스트 작성하기

  • 프로젝트에 있는 모든 DAG의 유효성을 초기에 확인하는 CI/CD 파이프라인을 준비하여 실행했으므로, 개별 단위 테스트를 시작해 볼 수 있다.
    • 예를 들어, 어떤 메서드의 인수에서 유효한 입력 값은 양수일 때 사용자가 음수를 제공하면 오류를 반환할 것이다. 합리적인 방법 중 하나는 사용자의 입력 값을 바로 전달하기 전에 해당 값이 유효한 지를 먼저 판단하는 절차를 구성하는 것이다. 이런 동작을 구성하는 것이 우리가 만들어야 할 테스트 과정이기도 하다.
  • 예를 들어, 어떤 오퍼레이터의 정확성에 대하여 테스트하는 방법으로는, 주어진 값으로 오퍼레이터를 실행하고 결과가 예상대로인지 확인하여 전체적으로 테스트하는 방법이 있다.
    • 이를 위해 Airflow 시스템 외부와 단위 테스트 내부에서 오퍼레이터를 단독으로 실행하기 위해 몇 가지 pytest 컴포넌트가 필요하다. 이를 통해 다른 상황에서 오퍼레이터를 실행하고 올바르게 작동하는 지 확인할 수 있다.

pytest 프로젝트 구성하기

  • pytest를 사용하면 테스트 스크립트에 test_ 접두사가 있어야 한다.
  • 예1) BashOperator를 테스트하는 예제 테스트 함수
    • test_operator.py 파일 안에 테스트로 호출할 함수를 만든다.
    def test_example():
        task = BashOperator(
            task_id="test", 
            bash_command="echo 'hello!'", 
            xcom_push=True
    )
    # 이 테스트에서는 실제 운영 설정에서 실행하지 않고 execute() 메서드를 직접 호출한다.
    # 이것은 오퍼레이터를 실행하기 위해 호출할 수 있는 가장 낮은 수준의 함수이다.
    # 또한 BashOperator를 실행하기 위해서 태스크 인스턴스 콘텍스트는 필요하지 않다. 따라서 빈 컨텍스트를 제공한다.
     result = task.execute(context={})
     assert result == "hello!"

 

  • 예2) 목업(mocking)으로 해결
    • 위 코드 테스트 시 오퍼레이터에 필수 인수가 없는 오류 발생, 메타스토어의 커넥션 ID를 가리키는 필수 인수 conn_id가 누락되어 테스트가 실패함
    • 테스트는 각각 격리된 환경에서 진행되어야 한다. 다른 테스트의 결과가 다른 곳에 영향을 줄 수 없음을 의미하는데, 이런 경우 테스트 사이에 발생하는 정보를 데이터베이스를 이용해 전달하는 방법은 권장하지 않는다. 목업을 이용해 해결한다.(pip install pytest-mock)
    • 목업은 특정 작업이나 객체를 모조로 만드는 것이다. 이를 통해 외부 시스템에 실제 연결하지 않고도 테스트를 개발하고 실행할 수 있다.
    from airflowbook.operators.movielens_operator import ( # 호출되는 위치에서 목업 메서드를 가져와야 한다.
        MovielensPopularityOperator,
        MovielensHook,
    )
    
    def test_movielenspopularityoperator(mocker): #mocker라는 인수를 테스트 함수에 전달
        mock_get = mocker.patch.object( # 목업 객체로 객체의 속성을 패치한다. mock_get에는 동작을 검증하기 위해 사용할 수 있는 여러 속성이 포함되어 있다.
            MovielensHook, # 패치할 객체
            "get_connection", # 패치할 함수
            return_value=Connection( # 반환되는 값
                conn_id="test", 
                login="airflow", 
                password="airflow"
            ),
        )
    
    task = MovielensPopularityOperator(
        task_id="test_id",
        conn_id="testconn",
        start_date="2015-01-01",
        end_date="2015-01-03",
        top_n=5,
    )
    result = task.execute(context=None)
    assert len(result) == 5
    assert mock_get.call_count == 1 # 한 번만 호출된 것인지 확인
    mock_get.assert_called_with("testconn") # 예상되는 conn_id로 호출된 것을 확인

디스크의 파일로 테스트하기

예1) JSON 리스트를 가진 파일을 읽고 이를 CSV 형식으로 쓰는 오퍼레이터

import csv
import json
from pathlib import Path

from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator

def test_json_to_csv_operator(tmp_path: Path):
    print(tmp_path.as_posix())

    input_path = tmp_path / "input.json"
    output_path = tmp_path / "output.csv"

    input_data = [
        {"name": "bob", "age": "41", "sex": "M"},
        {"name": "alice", "age": "24", "sex": "F"},
        {"name": "carol", "age": "60", "sex": "F"},
    ]
    with open(input_path, "w") as f:
        f.write(json.dumps(input_data))

    operator = JsonToCsvOperator(
        task_id="test", input_path=input_path, output_path=output_path
    )

    operator.execute(context={}) # 오퍼레이터 실행

    # Read result
    with open(output_path, "r") as f:
        reader = csv.DictReader(f)
        result = [dict(row) for row in reader]

    # Assert
    assert result == input_data # 내용 확인을 위한 assert, 테스트 후 tmp_path와 콘텐츠는 제거
  • 해당 오퍼레이터는 입력(JSON) 경로와 출력(CSV) 경로의 두 가지 입력 인수를 필요로 한다. 테스트를 위한 입력으로 테스트 디렉터리에 고정 파일을 저장하여 사용한다면, 출력 파일은 어떻게 저장하여야 할까?
  • 임시 저장소와 관련된 작업을 위한 tempfile 모듈을 사용하여 저장한다. 사용 후 디렉터리와 그 내용이 지워지기 때문에 파일 시스템에 해당 항목이 남지 않는다. 테스트를 시작하면 임시 디렉터리가 생성된다.
  • pytest는 tmp_dir 및 tmp_path라는 tempfile 모듈에 대한 편리한 사용 방법을 제공한다. 위 예시에서는 tmp_path를 사용하였다. tmp_path 인수는 각 테스트 시 호출되는 함수를 나타내는데, pytest에서는 이를 픽스처(fixture)라고 한다.

 

테스트에서 DAG 및 태스크 콘텍스트로 작업하기

  • 일부 오퍼레이터는 실행을 위해 더 많은 콘텍스트(변수 템플릿 등..) 또는 작업 인스턴스 콘텍스트가 필요하다.
    따라서, 단순히 operator.execute(context={}) 형태로 오퍼레이터를 실행할 수는 없다.
  • 실제 Airflow 시스템을 실행하는 경우, 오퍼레이터를 실행할 때 더 많은 단계가 수행된다. 필요한 경우 런타임 태스크 콘텍스트를 operator.execute()에 수동으로 제공한다.
    1. 태스크 인스턴스 콘텍스트 구성하기(ex) 모든 변수를 수집)
    2. 현재 태스크 인스턴스에 대한 XCom 데이터 삭제
    3. 템플릿 변수 구체화하기
    4. operator.pre_execute() 실행하기
    5. operator.execute() 실행하기
    6. XCom에 반환 값 전송하기
    7. operator.post_execute() 실행하기
  • operator.execute(context={})로 테스트할 때는 ds 변수를 사용할 수 없다.
  • 따라서, 이를 위해 Airflow 자체에서 태스크를 시작할 때 사용하는 실제 메서드인 operator.run()(BaseOperator 클래스의 메서드)를 호출한다.
  • 테스트 목적으로 DAG를 만들지 않고 그대로 실행할 수 있었지만, run() 메서드를 사용하기 위해서는 Airflow가 태스크를 실행할 때 DAG 객체를 참조하기 때문에 오퍼레이터에게 DAG를 제공해야 한다.
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
    mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(
            conn_id="test", 
            login="airflow", 
            password="airflow"),
    )

    task = MovielensDownloadOperator(
        task_id="test",
        conn_id="testconn",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        output_path=str(tmp_path / "{{ ds }}.json"),
        dag=test_dag,
    )

    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
    )
  • 위 태스크를 테스트 실행하면 Airflow 메타스토어 관련 오류 발생
  • 태스크를 실행하기 위해 Airflow는 동일한 실행 날짜의 이전 태스크 인스턴스와 같은 다양한 정보를 데이터베이스에서 쿼리한다. 하지만, AIRFLOW_HOME(설정되지 않은 경우 ~/airflow) 경로에 있는 Airflow 데이터베이스를 초기화(airflow db init)하지 않았거나 실행 중인 데이터베이스로 Airflow를 구성하지 않은 경우 해당 데이터베이스에서 읽거나 쓸 수 없다.
  • 테스트 진행할 때도 메타스토어가 필요하다. 테스트 중에 메타스토어를 처리하는 방법 몇 가지가 있다.
    1. 연결 자격 증명을 조회 시 모든 데이터베이스 호출에 대해 목업 환경을 구성할 수 있다. 하지만 이 방법은 매우 번거롭다.
    2. 보다 실용적인 접근 방식은 Airflow가 테스트를 실행하는 동안 쿼리할 수 있는 실제 메타스토어를 실행하는 것이다. 이를 위해서 데이터베이스를 초기화하는 airflow db init을 실행해야 한다. 별도로 데이터베이스에 대한 구성이 없으면 ~/airflow/airflow.db에 저장된 SQLite 파일이 데이터베이스 역할을 수행한다.(AIRFLOW_HOME 환경 변수를 설정하면 해당 디렉터리에 데이터베이스를 저장)
  • 위에서처럼 Airflow가 쿼리할 메타스토어를 설정 후 테스트를 실행하면 성공 여부를 확인할 수 있고, 테스트 중에 Airflow 메타스토어에 정보가 기록된 것을 확인할 수 있다.
    → task.run()을 호출하면 태스크 실행에 대한 세부 정보가 데이터베이스에 저장된다.
  • 이 테스트에서의 두 가지 고려사항
    • 1. DAG를 사용하는 테스트가 여러 번 있는 경우, pytest와 함께 재사용 가능
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG

@pytest.fixture
def test_dag():
    return DAG(
        "test_dag",
        default_args={
            "owner": "airflow", 
            "start_date": datetime.datetime(2015, 1, 1)
        },
        schedule_interval="@daily",
    )
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
    mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(
            conn_id="test", 
            login="airflow", 
            password="airflow"),
    )

    task = MovielensDownloadOperator(
        task_id="test",
        conn_id="testconn",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        output_path=str(tmp_path / "{{ ds }}.json"),
        dag=test_dag,
    )

    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
    )
  • 2. task.run()
    • run()은 두 개의 날짜를 사용하며, DAG에서 지정된 두 날짜 사이에 실행할 태스크의 인스턴스 스케줄 주기를 계산한다. 위의 경우 동일한 두 날짜를 입력하므로 단일 태스크 인스턴스가 하나만 실행하게 된다.
    • task.run() 메서드는 Airflow에서 태스크를 수동으로 실행하는 데 사용된다.

여기서 사용된 매개변수들은...:

  • start_date: 이것은 태스크의 시작 날짜를 나타낸다. 보통은 DAG의 start_date를 사용한다. 위 코드에서는dag.default_args["start_date"]를 사용하고 있다.
  • end_date: 이것은 태스크의 종료 날짜를 나타낸다. 위 코드에서도 dag.default_args["start_date"]를 사용하고 있다. 이 경우 시작과 종료 날짜가 동일하므로, 이 태스크는 하루치 데이터만 처리할 것다.

여기서 주의할 점은 태스크의 실행은 해당 DAG의 start_dateend_date 범위 내에서만 가능하다는 것이다. 즉, 태스크를 실행하기 위해서는 DAG가 실행될 수 있는 시간대에 있어야 하며, 태스크의 start_dateend_date도 그 범위 내에 있어야 한다.

  1. 만약 start_dateend_date를 변경하면, 해당 태스크는 새로운 범위 내에서 실행된다.
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG

@pytest.fixture
def test_dag():
    return DAG(
    "test_dag",
    default_args={
        "owner": "airflow", 
        "start_date": datetime.datetime(2015, 1, 1)
        },
    schedule_interval="@daily",
    )
# DAG를 재사용하기 위한 pytest 픽스처 예시
import datetime
import pytest
from airflow.models import DAG

@pytest.fixture
def test_dag():
    return DAG(
        "test_dag",
        default_args={
            "owner": "airflow", 
            "start_date": datetime.datetime(2015, 1, 1)
        },
        schedule_interval="@daily",
    )
# 테스트에 픽스처를 포함하여 필요한 객체를 생성하기
def test_movielens_operator(tmp_path, mocker, test_dag):
    mocker.patch.object(
        MovielensHook,
        "get_connection",
        return_value=Connection(
            conn_id="test", 
            login="airflow", 
            password="airflow"),
    )

    task = MovielensDownloadOperator(
        task_id="test",
        conn_id="testconn",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        output_path=str(tmp_path / "{{ ds }}.json"),
        dag=test_dag,
    )

    task.run(
        start_date=dag.default_args["start_date"],
        end_date=dag.default_args["start_date"],
    )

 

외부 시스템 작업

  • MovieLens 평점을 읽고 결과를 Postgres 데이터베이스에 저장하는 오퍼레이터에서, execute() 메서드는 데이터를 가져오고 결과를 Postgres에 대한 쿼리로 변환하여 MovieLens API와 Postgres 데이터베이스를 연결한다.
  • 만약 로컬 PC에서 운영 중인 Postgres 데이터베이스에 접근할 수 없는 경우, 도커로 테스트하기 위해 로컬 Postgres 데이터베이스로 쉽게 구성할 수 있다.
    • pytest 테스트를 할 수 있도록 도커 컨테이너의 제어가 가능한 여러 가지 파이썬 패키지가 있다. 이 책에서는 pytest-docker-tools를 사용한다.(pip install pytest_docker_tools)
    • 오퍼레이터가 올바르게 작동하면 테스트가 끝날 때 도커 컨테이너로 생성된 Postgres 데이터베이스에 결과가 기록된다. 도커 컨테이너로 테스트하면 테스트를 위한 목업 환경을 구성할 필요 없이 실제 훅 메서드를 사용할 수 있다.
    # pytest_docker_tools로 테스트할 도커 이미지 가져오기
    from pytest_docker_tools import fetch
    postgres_image = fetch(repository="postgres:11.1-alpine")
    
    # pytest_docker_tools 픽스처를 이용해 테스트 시에 도커 이미지 사용하기
    def test_call_fixture(postgres_image):
        print(postgres_image.id)
    • 이제 이 이미지 ID를 사용하여 Postgres 컨테이너를 구성하고 시작할 수 있다.
    from pytest_docker_tools import container
    postgres_container = container(
        image="{postgres_image.id}"
        ports={"5432/tcp":None},
    )
    
    def test_call_fixture(postgres_container):
        print(f"Running Postgres container named {postgres_container.name} "
        f"on port {postgres_container.ports['5432/tcp'][0].")
    • 실제 데이터베이스에 대한 테스트를 위해 Postgres 컨테이너 초기화하기
    postgres_image = fetch(repository="postgres:11.1-alpine")
    postgres = container(
        image="{postgres_image.id}",
        environment={
                "POSTGRES_USER": "testuser", 
                "POSTGRES_PASSWORD": "testpass"
    		},
        ports={"5432/tcp": None},
        volumes={
            os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
                "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
            }
        },
    )
    # postgres-init.sql 파일
    # 테스트 데이터베이스에 대한 스키마 초기화하기
    SET SCHEMA 'public';
    CREATE TABLE movielens (
        movieId integer,
        rating float,
        ratingTimestamp integer,
        userId integer,
        scrapeTime timestamp
    );
    
    • 이 모든 것을 스크립트에 통합하면 Postgres 데이터베이스에 대해 테스트할 수 있다.
    # 전체 테스트는 컨테이너 초기화와 우리가 구성해야하는 목업 연결로 다소 복잡해졌다.
    import os
    
    import pytest
    from airflow.models import Connection
    from pytest_docker_tools import fetch, container
    
    from airflowbook.operators.movielens_operator import (
        MovielensHook,
        MovielensToPostgresOperator,
        PostgresHook,
    )
    
    postgres_image = fetch(repository="postgres:11.1-alpine")
    postgres = container(
        image="{postgres_image.id}",
        environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"},
        ports={"5432/tcp": None},
        volumes={
            os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
                "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
            }
        },
    )
    
    def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
        mocker.patch.object(
            MovielensHook,
            "get_connection",
            return_value=Connection(conn_id="test", login="airflow", password="airflow"),
        )
        mocker.patch.object(
            PostgresHook,
            "get_connection",
            return_value=Connection(
                conn_id="postgres",
                conn_type="postgres",
                host="localhost",
                login="testuser",
                password="testpass",
                port=postgres.ports["5432/tcp"][0],
            ),
        )
    
        task = MovielensToPostgresOperator(
            task_id="test",
            movielens_conn_id="movielens_id",
            start_date="{{ prev_ds }}",
            end_date="{{ ds }}",
            postgres_conn_id="postgres_id",
            insert_query=(
                "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
                "VALUES ({0}, '{{ macros.datetime.now() }}')"
            ),
            dag=test_dag,
        )
    
        pg_hook = PostgresHook()
    
        row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
        assert row_count == 0
    
        pytest.helpers.run_airflow_task(task, test_dag)
    
        row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
        assert row_count > 0

개발을 위해 테스트 사용하기

DAG 완료 테스트하기

  • 지금까지 테스트는 단일 오퍼레이터 테스트에 중점을 두고 설명.
  • 워크플로 개발에서 가장 중요한 측면은 모든 구성 요건이 서로 잘 맞는지 확인하는 것이다. 하나의 오퍼레이터가 논리적으로 올바르게 실행될 수 있지만, 예상하지 못한 방식으로 데이터가 변환되어 후속 오퍼레이터가 실패할 수도 있다.
  • “DAG의 모든 오퍼레이터가 예상한 대로 작동하려면”에 관한 것은 대답하기 어렵다. 다양한 이유로 실제 환경을 시뮬레이션하는 것이 항상 가능하지 않다.
  • 예를 들어 DTAP(개발, 테스트, 인수, 프로덕션) 분리 시스템을 사용할 경우, 개발 환경에서 완벽한 프로덕션 환경을 시뮬레이션할 수 없기 때문에, 소프트웨어를 개발하고 검증하는 데 사용할 수 있는 가능한 한 구현할 수 있는 프로덕션 환경을 만들어 테스트한다.

요약...

  • DAG 무결성 테스트는 DAG의 기본 오류를 필터링한다.
  • pytest 및 플러그인은 테스트를 위해, 임시 디렉터리 및 테스트 중 도커 컨테이너 관리를 위한 플로그인과 같은 몇 가지 유용한 구성을 제공한다.
  • 태스크 인스턴스 콘텍스트를 사용하지 않는 오퍼레이터는 execute()로 실행할 수 있다.
  • 태스크 인스턴스 콘텍스트를 사용하는 오퍼레이터는 DAG와 함께 실행되어야 한다.
  • 통합 테스트를 위해서는 프로덕션 환경과 최대한 비슷하게 시뮬레이션해야 한다.