본문 바로가기
Airflow

[Apache Airflow 기반의 데이터 파이프라인] Airflow 콘텍스트를 사용하여 태스크 템플릿 작성하기

by 개복취 2024. 2. 17.

  1. 개요
  2. PythonOperator 템플릿
  3. 키워드 인자를 받아들이는 또다른 방법 : 명시적으로 변수를 알려주기
  4. PythonOperator에 변수제공 :두가지 이상의 데이터 소스에서 데이터를 다운로드
  5. 다른 시스템과 연결하기

개요

  • 어떤 종류의 데이터로 작업을 하든지, 파이프라인을 구축하기 전에 접근 방식에 대한 기술적 계획을 세우는 것이 중요하다. 솔루션은 항상 다른 사용자가 데이터로 무엇을 하려는지에 따라 달라지므로, 질문에 대한 답을 알고 나면 기술적 세부 사항에 대한 문제를 해결할 수 있다.
  • jinja 탬플릿 {{이중 중괄호}}을 통해 런타임 시 삽입 될 변수를 나타내서 사용할 수 있다.
print("Hello {{ name }}!")
  • 런타임 시 값을 입력하기 때문에 프로그래밍 할 때에는 값을 알 수 없다.
  • 태스크 콘텍스트 execution_date를 유용하게 사용할 수 있다. 날짜 시간 라이브러리인 Pendulum을 사용하며 execution_date는 이러한 Pendulum 의 datetime 객체이다.

 

PythonOperator 템플릿

  • 런타임 콘텍스트로 템플릿화 할 수 있는 인수를 사용하지 않고 별도로 런타임 콘텍스트를 적용할 수 있는 python_callable을 사용하기 때문에 jinja 템플릿 표준을 따르지 않는다.
  • 파이썬은 함수에서 키워드 인수를 받을 수 있다. 여기에서 제공되는 키워드 인수를 사전에 알지 못하는 경우, 예상되는 키워드 인수를 모두 명시적으로 작성할 필요 없는 다용한 사용 사례가 있다.
def _print_context(**kwargs):
    print(kwargs)
  • 키워드 인수는 두 개의 애스터리스크(**)로 표시하면 캡쳐된다. 그리고 캡쳐 인수의 이름을 kwargs에 지정합니다.
  • 키워드 인수의 이름의 지정은 Airflow태스크 콘텍스트 변수를 확인하기 위해 적절한 이름을 사용한다. “context”
def _print_context(**context): #명시적으로 context라고 적시
    print(context)

print_context=PythonOperator(
    task_id = 'print_context',
    python_callable= _print_context,
    dag = dag,
)
def _print_context(**context): #명시적으로 context라고 적시
    start=context["execution_date"]
    end=context["next_execution_date"]
    print(f"Start: {start}, end: {end}")

print_context=PythonOperator(
	task_id="print_context", python_callable=_print_context, dag=dag
)

#출력 예
#Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00

 

키워드 인자를 받아들이는 또다른 방법 : 명시적으로 변수를 알려주기

def _get_data(execution_date, **context)
  • 내부에서 context변수를 사용하여 _get_data함수가 호출된다.
  • 모든 context변수는 키워드 인수로 전달된다. (_get_data(conf=…, dag=…, dag_run=…, execution_date=…, …))
  • 파이썬은 매겨변수 리스트에 지정된 인수에 대해 필요 여부를 확인한다.
    1. 시그니처 안에 설정이 존재하는가(위의 예시에서는 execution_date 존재 체크)
    2. 그렇지 않으면 **context에 추가한다.
  • execution_date는 명시적으로 인수로 정의되어 있으므로 필수로 입력해야 하며, 다른 모든 인수는 **context에 캡쳐된다

⇒ 이와 같은 일련의 과정으로 context[”execution_date”]에서 **context를 추출하는 대신에 execution_date 변수를 직접 사용할 수 있다. 또한 코드는 보다 알아보기 쉽고, 명시적인 인수 정의를 통히 린터와 타입 힌팅의 도구를 사용할 수 있다.

 

PythonOperator에 변수제공: 두가지 이상의 데이터 소스에서 데이터를 다운로드

def _get_data(output_path, **context):
		year, month, day, hour, *_ = context["execution_date"].timetuple()
		url = ("<https://~>")
		request.urlretrieve(url, ouput_path) #output_path 인수를 통해 구성가능
  • 두가지 방법으로 ouput_path를 제공할 수 있다.
    • op_args : 콜러블 커스텀 변수
    • op_kwargs : 콜러블 커스텀 kwargs

op_args

get_data = PythonOperator(
	task_id = "get_data",
	python_callable = _get_date,
	op_args=["/tmp/wikipageviews.gz"], #op_args를 사용하여 callable에 추가 변수를 제공한다.
	dag=dag,
)
  • Python 오퍼레이터를 실행하면 op_args에 제공된 리스트의 각 값이 콜러블 함수에 전달된다. 즉, _get_data("/tmp/wikipageviews.gz") 를 직접 호출하는 것과 동일한 결과를 얻는다.

op_kwargs

get_data = PythonOperator(
	task_id = "get_data",
	python_callable = _get_date,
	op_kwargs=["output_path": "/tmp/wikipageviews.gz"], #op_kwargs를 사용하여 호출가능한 callable에 추가 변수를 제공한다.
	dag=dag,
)
  • op_args와 유사하게 op_kwargs의 모든 값은 콜러블 함수에 전달되지만 여기서는 키워드 인수로 전달된다. _get_data에 대한 동등한 호출은get_data(output_path="/tmp/wikipageviews.gz") 이다.
  • 이러한 값은 문자열을 포함할 수 있으므로 템플릿을 만들 수 있습니다. 콜러블 함수 자체 내에서 datetime구성 요소를 추출하지 않고 템플릿 문자열을 콜러블 함수에 전달할 수 있습니다.

다른 시스템과 연결하기

  • Airflow는 태스크 간 데이터를 전달하는 방법이 두가지 있다.
    1. airflow 메타스토어를 사용하여 태스크 간 결과를 쓰고 읽는다. (XCom이라고함)
    2. 영구적인 위치(디스크 / 데이터베이스)에 태스크 결과를 기록한다.
  • Airflow는 XCom이라는 기본 메커니즘을 제공하여 메타스토어에서 선택 가능한(pickable) 개체를 저장하고 나중에 읽을 수 있다.
  • 피클은 파이썬의 직렬화 프로토콜이며 직렬화는 메모리의 개체를 나중에 다시 읽을 수 있도록 디스크에 저장할 수 있는 형식으로 변환하는 것을 의미한다.
  • 크기가 작은 오브젝트는 XCom을 이용한 피클링이 적합하다.
  • 좀 더 큰 데이터를 태스크 간 전송하기 위해서는 Airflow 외부에 데이터를 유지하는 것이 좋다. 향후 더 많은 페이지 처리로 데이터 크기가 커질 수 있다는 점에 염두에 두고 XCom대신 디스크에 결과를 저장한다.
  • 태스크 간 데이터를 저장하는 방법을 결정하기위해 두가지를 알아야한다.
    • 데이터가 다시 사용되는 위치
    • 다시 사용되는 방법
  • 대상이 되는 데이터베이스가 Postgres이므로 PostgresOperator을 사용해 데이터를 입력하도록 한다.
    (pip install apache-airflow-providers-postgres)
dag = DAG(..., template_searchpath="/tmp")

write_to_postgres=PostgresOperator(
    task_id="write_to_postgres",
    postgres_conn_id="my_postgres", #연결에 사용할 인증 정보의 식별자
    sql="postgres_query.sql",       #SQL쿼리 또는 SQL쿼리를 포함하는 파일경로
    dag=dag,
)
  • PostgresOperator 을 실행하면 여러가지 작업이 수행된다. PostgresOperator은 Postgres와 통신하기 위해 훅(hook)이라고 불리는 것을 인스턴스화 합니다. 인스턴스화된 훅은..
    • Postgres와 통신하기위한 연결 생성
    • Postgres에 쿼리를 전송하고 연결에 대한 종료작업 처리
    • 여기서 오퍼레이터는 사용자의 요청을 훅으로 전달하는 작업만 담당한다.
  • 훅은 오퍼레이터 내부에서 동작하기 때문에 신경 쓸 필요가 없다.