본문 바로가기
Airflow

[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week4

by 개복취 2023. 10. 9.

4주차 주요내용

  1. 멱등성과 트랜잭션
  2. Airflow 이해하기
  3. NameGender.py DAG 개선하기
  4. Yahoo Finance API DAG 작성

바람개비 딥다이브

멱등성과 트랜잭션 

멱등성이란?

  • 데이터 파이프라인이 연속 식행되었을 때 소스에 있는 데이터가 그대로 저장되어야 함을 이야기함
  • 가령 Full refresh를 하는 데이터 파이프라인이라면…
    1. 먼저 DW의 관련 테이블에서 모든 레코드들을 삭제한다.
    2. 데이터 소스에서 읽어온 데이터를 DW테이블로 적재한다.
  • 만일 1이 성공하고 2가 실패한다면…?
  • 만일 1이 실행된 다음 누군가 이 테이블을 사용한다면?

→ 위와 같은 상황을 방지하기 위해, 우리는 트랜잭션에 대한 개념을 인지해야한다.

 

트랜잭션이란?

  • Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법
  • Transaction Isolation Level: “Read Committed”가 디폴트 세팅
  • Begin - END 또는 Begin - Commit 사이에 해당 SQL들을 사용한다.
  • Python의 경우 try/catch와 같이 사용하는 것이 일반적이다.
    • raise를 써주는 것이 좋다.

  • Autocommit이 ‘True’ 일때 자동으로 commit 되는것을 막고싶으면 BEGIN 명시적으로 써주면 됨
  • 무엇을 사용할지는 개인 취향이고, python의 경우 try/catch 문과 같이 사용하는 것이 일반적이다.
    • try/catch로 에러가 나면 rollback을 명시적으로 실행, 에러가 안나면 commit 한다.

 


Airflow 이해하기

airflow에서는 데이터 파이프라인을 dag라고 부르며, dag는 다수의 task로 구성되고, task간에 실행순서가 지정되며 하나의 task는 여러개의 operator으로 구성된다.

  • PythonOperator을 사용해서 task에서 해야하는 일을 python함수로 구현하는 것이다.

Hello World 예제 프로그램 살펴보기

from airflow import DAG
from airflow.operators.py thon import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2023,8,20),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *' # 하루에 한번 UTC: 2시 0분에 실행되는 것을 이야기함
)

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag
)

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye

→ DAG로 설정해준 print_hello는 PythonOperator으로 구성되어있으며 먼저 실행하고 print_goodbye는 두번째로 실행한다.

  • def print_hello(): , def print_goodbye(): 에서 나온 결과값
  • PythonOperator을 사용하면 파이썬으로 작성할 수 있는 모든 일을 dag의 task로 구현할 수 있다.
    • 가장 강력하지만, 가장 노가다를 많이해야하는 오퍼레이터이다.
    • 오퍼레이터를 본인이 직접 구현한다기 보다는, 누군가가 만든 오퍼레이터를 가지고 사용하는게 일반적임
  • Airflow하면서 처음 배울때 가장 어려운 부분은 start_date, catchup, execution_date가 무엇이냐 하는것
    • start_date는 보기와는 다르게 처음 시작하는 날이 아님 → 향후 설명

 

Task 파라미터

  • 위에서 PythonOperator들을 따로 불러서 각각의 함수를 부르는것이 일반적인 방법이었다.
  • 그러나 Decorator의 task를 사용해서 편리하게 처리할 수 있는 방법이 있다.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5), # 시작하는 날이 아님.. 헷갈리는 개념
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
		default_dags=default_dag # task param 이렇게 설정해주는게 일반적임
) as dag:

    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()
  • @task 로 처리된 부분은 함수이름이 id 그자체가 된다.
  • PythonOperator 사용해도 되지만, 간편하다는 이유때문에 위와 같은 방식으로 사용되는게 일반적이다.

default_args = 모든 파라미터에 공통적으로 들어가는 인자처리

‘on_success_callback’ : if success call me

’on_failure_callback’ : if failure call me

 

중요한 DAG파라미터 (not task 파라미터)

  • max_active_runs : #of DAGS instance (백필할 때 사용됨) (default=16, 안전하게 1로 세팅한다.)
    • (이론적으로) 높은 수일 수록 한번에 dag 돌아가는 횟수가 정해지기 때문에 빠르게 처리할 수 있다.
    • Cpu 갯수에 따라서 upper_bound 설정된다. cpu설정에 따라서 돌아갈 수 있는 dag의 최대가 정해진다.
    • 크게 지정한다고 해서 좋은게 아닌게, 크게 지정할 수록 소스에 load가 가서 소스쪽에 문제가 생길 수 있다.
    • 따라서, 소스와 destination에 capacity에 따라서 크게 설정해주는건 의미없고, 오히려 문제를 일으킬 수 있다.
  • max_active_tasks : # of tasks that can run in parallel (동시에 여러 태스크 돌아가는 수)
    • 하나의 DAG밑에서 task가 동시에 몇개가 돌 수 있는가를 의미한다. 일렬로 돌아가는 dag면 크게하나 작게하나 문제없음 동시에 여러개가 돌아가는 task라면 이 값을 어떻게 세팅해주는가는 중요해진다. Airflow 서버의 cpu 보다 크게 세팅하는건 의미없다.
  • catchup: whether to backfill past runs → 나중에 설명함
  • DAG Param vs. Task Param 차이점 이해가 중요하다.
    •  위의 파라미터들은 모두 DAG파라미터로 DAG객체를 만들 때 지정해주어야 한다.

How to Trigger a DAG

  • airflow 컨테이너 중 하나로 로그인
    • docker ps : #Worker 의 컨테이너 ID 찾기
    • docker exec -it 컨테이너ID sh

  • docker : 컨테이너를 실행
  • exec : 특정 docker container에게 명령어를 던진다.
  • -it : interactive 하게 명령어를 주고받는 것을 의미한다.
  • 컨테이너ID 에게 sh 라는 명령어를 수행해라 라는 뜻 ⇒ 해당 도커 컨테이너 안으로 로그인 하라는 뜻

 

  • airflow 내부에서 dags directory의 내용과 로컬 dags 저장소에 있는 내용이 서로 동일하다.
    • 외부에 있는 같은 볼륨을 사용하기 때문에 파일안의 내용이 서로 같다.

  • 위와 같이 될 수 있었던 이유는 docker-compose.yaml 에서 volumes 설정을 해주었기 때문이다.

web ui 안거치고 cli 환경에서 DAG 리스트 확인하기

  • airflow sh 들어가서 > airflow dags list 로 확인할 수 있다.

하나의 DAG를 골라서 그 밑에 어떤 task가 존재하고 실행시킬 수 있다. > airflow dags test [dag_id] (+execution_date)

task 하나만 실행할 수 있다. > airflow tasks test [dag_id] [task_id] [execution date(YYYY-mm-dd): not future]

print_hello 했을때 hello 하나만 찍히고 끝나는것을 볼 수 있다.

 

  • Airflow 의 전반적인 설정자체는 airflow.cfg (airflow configuration file)내부에 설정이 되어있다.
    • 내부에 airflow 에게 dag파일이 어디에있는지 알려주는 커맨드가 존재한다. ‘dags_folder’ 내용

docker-compose 파일 내부에 volume이 지정되어있는 위치로 설정이 된다.

opt/airflow/dags 에 있는 정보와 동일하게 매핑시켜라 라는 뜻으로 이해할 수 있다.

 


Name Gender DAG 개선하기

NameGender_V1 (Porting to Airflow)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = HOST
    user = ID
    password = PW
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)


def transform(text):
    logging.info("Transform started")	
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "MyName,M" -> [ 'MyName', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(records):
    logging.info("load started")
    """
    records = [
      [ "MyName", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "MyName"
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise
    logging.info("load done")


def etl():
    link = "s3link.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)


dag_second_assignment = DAG(
	dag_id = 'name_gender',
	catchup = False,
	start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
	schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
	task_id = 'perform_etl',
	python_callable = etl,
	dag = dag_second_assignment)

이전에 작성했던 NameGender DAG를 Airflow로 Porting 하는 작업을 코드로 작성했다.

상당수의 코드가 하드코딩 되어 개선의 여지가 있어보인다.

NameGender_V2 (params, execution_date 활용하기)

task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "s3link.csv"
    },
    dag = dag)
  • V2에서는, csv 링크를 하드코딩 하지않고 PythonOperator의 params를 통해 변수를 넘길 수 있다.
def etl(**context):
    link = context["params"]["url"]
    # task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
    # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)

    data = extract(link)
    lines = transform(data)
    load(lines)
  • Airflow가 유지하는 중요한 변수가 있는데 그중 execution_date를 받아서 찍어볼 수 있다.
  • execution_date가 무엇이냐 → 위에서 이야기한 dag를 실행할 때 마지막 인자로 들어가는 부분이다.
    > airflow dags test name_gender [execution_date]

 

NameGender_V3 (Variable, Xcom_(pull/push) 활용하기)

  • Xcom(inter communication)을 통해 task간 데이터를 주고 받는다.
    • xcom_push : Airflow 메타데이터 내부에 넣기
    • xcom_pull : 메타데이터로 저장하고 있는 값을 다른 task에 던지기
def transform(**context):
    logging.info("Transform started")
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\\n")[1:]
    for l in lines:
      (name, gender) = l.split(",")
      records.append([name, gender])
    logging.info("Transform ended")
    return records
  • text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    : extract 라는 task가 리턴해준 값을 읽어달라고 하는 명령어

  • Variable 으로 원하는 값을 import 해서 저장할 수 있음
  • 또한, Redshift의 ID/PW등의 민감한 정보는, Airflow 코드에 남기지 않고, Connection에 따로 저장해서 접근하면 된다.
  • PostgresHook 으로 redshift_dev_db을 가지고온다. 이는. 보안적인 이슈를 줄일 수 있다.

Yahoo Finance API DAG 작성

Yahoo Finance API DAG 작성(1) - full refresh

  1. Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
  2. Redshift 상의 테이블로 1에서 받은 레코드들을 적재

3. Extract / Transform : Yahoo Finance API 호출

  • API 호출하여 애플 주식(AAPL) 정보를 수집하고 파싱한다.
import yfinance as yf
@task
def get_historical_prices(symbol):
 ticket = yf.Ticker(symbol) # ticker = AAPL
 data = ticket.history()
 records = []
 for index, row in data.iterrows():
 date = index.strftime('%Y-%m-%d %H:%M:%S')
 records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
 return records
  • task decorator을 사용하게 된다면, 리턴값에 대한 처리를 아래와 같이 쓰면 Airflow가 대신 해준다.
results = get_historical_prices("AAPL")
load("MyName", "stock_info", results) # (스키마, @task이름, 로딩해야하는 레코드)
  • 결국 idea는 full-refresh 형태로 테이블 삭제하고 다시만드는 형태를 하나의 트랜잭션에다 묶는것이다.
❗️yfinance 모듈이 없다고 할때 → default python module이 아니기 때문에 기본으로 설치되어있지 않고 도커 컨테이너 돌때 사용해야 한다고 명시적으로 나타내주어야 한다.

Yahoo Finance API DAG 작성(2) - Incremental Update

  • 새로 읽어온 것만 가져와서 읽는다 (not full refresh)
import yfinance as yf

@task
def get_historical_prices(symbol):
 ticket = yf.Ticker(symbol)
 data = ticket.history()
 records = []
 for index, row in data.iterrows():
 date = index.strftime('%Y-%m-%d %H:%M:%S')
 records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
 return records
  • 현재 테이블을 임시테이블로 가져오고, 임시테이블에다가 yfinance 로 읽은 데이터를 적재한다.
@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 원본 테이블 생성
        _create_table(cur, schema, table, True)
        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")
  • Incremental Update로 구현
    • 임시 테이블 생성하면서 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE … AS SELECT)
    • 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
    • 원본 테이블을 삭제하고 새로 생성
    • 원본 테이블에 임시 테이블의 내용을 복사 (이 때 **SELECT DISTINCT ***를 사용하여 중복 제거)
  • 트랜잭션 형태로 구성 (NameGender DAG와 동일)

 


4주차 후기

Airflow의 기능들을 많이 배워볼 수 있었던 시간이었다.

실제로 API 를 가지고와서 DAG설정해보는 과정을 해보면서 쓰임에 대해 더 깊이있게 배운것 같다.

Incremental Update의 과정이 머리 아프게하지만 반복숙달하면 괜찮을지도?