본문 바로가기
Airflow

[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week5

by 개복취 2023. 10. 10.

5주차 주요내용

  1. airflow.cfg 파해치기
  2. Open Weathermap DAG 구현하기
  3. Primary Key Uniqueness 보장하기
  4. Backfill과 Airflow

바람개비 딥다이브2

 

airflow.cfg 파해지기

1. DAGs 폴더는 어디에 지정되는가?

  • core 섹션의 dags_folder 키 - 도커 sh 로 들어가는 과정
  • /opt/airflow/dags : dags파일이 있을거라고 생각함

2. DAGs 폴더에 스캔 주기를 정해주는 키의 이름이 무엇인가?

  • core 섹션의 dags_dir_list_interval 키 (default = 300s), 최대 5분을 기다리는것이다.

3. API 형태로 외부에서 조작하고 싶다면? 어떤 섹션을 변경해야 하는가?

  • api 섹션의 auth_backend를 airflow.api.auth.backend.basic_auth로 변경 : id/pw 로 인증하는것
  • auth_backend=airflow.api.auth.backend 으로 기본이 잡혀있음
  • AIRFLOW__CORE__EXECUTOR: CeleryExecutor = airflow.cfg에서의 코어섹션에서의 Executor을 CeleryExecutor으로 설정해라.
  • AIRFLOW__CORE__FERNET_KEY : ‘ ’
  • _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- yfinance} 도커 실행될때마다 기본적으로 pip로 모듈을 설치할 수 있다.

.cfg 파일로 지정된 것보다, env로 설정된 기본값들이 더 높은 우선순위를 가진다.

4. Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데 이 단어들은 무엇일까? :)

  • password, secret, passwd, authorization, api_key, apikey, access_token

5. 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는일은?

  • 스케줄러와 웹서버를 재시작해야함. Docker라면 docker-compose.yaml 파일이 변경되고 docker compose down과 docker compose up을 차례로 실행

6. Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?

  • Variable이나 connection에 확인해보면 모두 False이다. → fernet_key 세팅이 되어있지 않아서 그럼
  • 흔히 이야기하는 대칭키다. (하나의 키로 암호화하고, 복호화 하고)
  • 암호화를 한다는게 좋지만, 키를 기록해두지 않으면 낭패가 생긴다.
  • fernet_key 세팅을하면 Is_Encrypted 가 True로 된다. DB에서는 암호화 된 형태로 저장이 된다.

ContryInfo DAG 리뷰

import requests
@task
def extract_transform(): # 데이터 소스의 api호출하고 내용을 루프를 돌아 정보를 읽어온다.
 response = requests.get('<https://restcountries.com/v3/all>')
 countries = response.json() # 파이썬 딕셔너리 형태로 변경한다.
 records = []
 
for country in countries:
 name = country['name']['official']
 population = country['population']
 area = country['area']
 records.append([name, population, area])
 return records
@task
def load(schema, table, records):
 cur = get_Redshift_connection()
 try:
	 cur.execute("BEGIN;")
	 cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
	 cur.execute(f"""CREATE TABLE {schema}.{table} (
	 name varchar(256) PRIMARY KEY, population int, area float);""")
	 for r in records:
		 cur.execute(f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]},{r[2]});")
		 cur.execute("COMMIT;")
 except (Exception, psycopg2.DatabaseError) as error:
	 cur.execute("ROLLBACK;")
	 raise
with DAG(
 dag_id = 'CountryInfo',
 start_date = datetime(2023,5,30),
 catchup=False,
 tags=['API'],
 schedule = '30 6 * * 6' # 0 - Sunday, …, 6 - Saturday
) as dag:
 results = extract_transform()
 load("MyName", "country_info", results) #task Operator 사용하면 양이 확연히 줄어든다.

 

❗️dags 폴더에서 코딩시 작성한다면 주의할 점

1. Airflow는 dags 폴더를 주기적으로 스캔함

[core] dags_folder = /var/lib/airflow/dags
→ How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.

dag_dir_list_interval = 300

  • 이때 DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행이 됨
  • 이 경우 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음
  • from airflow import DAG … cur.execute(“DELETE FROM …”)
  • 해결책: .airflowignore 파일을 dags 폴더에 둘 것!
  • 테스트 코드를 갖는 패턴을 적어둔다.
    • test*.py (테스트 파일을 가지는 이름은 스캔하지 말것)

2. Airflow와 타임존 (UTC가 Default (바꾸지 말것))

  • airflow.cfg에는 두 종류의 타임존 관련 키가 존재
    • default_timezone
    • default_ui_timezone
  • start_date, end_date, schedule
    • default_timezone에 지정된 타임존을 따름
  • execution_date와 로그 시간
    • 항상 UTC를 따름
    • 즉 execution_date를 사용할 때는 타임존을 고려해서 변환후 사용필요, 현재로 가장 좋은 방법은 UTC를 일관되게 사용하는 것으로 보임

Open Weathermap DAG 구현하기

 

만들려는 DAG: 서울 8일 낮/최소/최대 온도 읽기

  • API키를 open_weather_api_key라는 Variable로 저장한다.
  • 서울의 위도와 경도를 찾을 것
  • One-Call API 사용한다.
    • 날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)

-> https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={API key}&units=metric

CREATE TABLE MyName.weather_forecast (
 date date primary key,
 temp float, -- 낮 온도
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE() #create_date 를 기준으로 ordering하면서 일련번호를 붙여서 중복제거를 한다..
);
  • Incremental Update 할 때, date, created_date 를 사용한다.
  • daily라는 리스트에 앞으로 8일간의 온도 정보가 들어옴
    • dt 필드가 날짜를 나타냄
    • tmp 필드가 온도 정보를 나타냄
      • day
      • min
      • max
      • night
      • eve
      • morn
  • Airflow Connections를 통해 만들어진 Redshift connection
    • 기본 autocommit의 값은 False인 점을 유의
  • 두 가지 방식의 Full Refresh 구현 방식
    • Full Refresh와 INSERT INTO를 사용
    • Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정

DAG구현(1): Full Refresh

CREATE TABLE 각자스키마.weather_forecast (
 date date primary key,
 temp float,
 min_temp float,
 max_temp float,
 created_date timestamp default GETDATE()
);
# created_date는 처음 full-refresh 할 때 사용되지 않는다.
# default로 세팅에 있는 레코드이기때문에 created_date를 지정하지않으면 지금시간의 레코드시간으로 적재됨
 

DAG구현(2)

  • One-Call API는 결과를 JSON 형태로 리턴해준다.
    • 읽어들이려면 requests.get 결과의 txt를 JSON으로 변환해야한다.
    • 아니면 requests.get 결과 오브젝트가 제공해주는 .json() 함수 사용한다.
  • 결과 JSON에서 daily라는 필드가 앞으로 8일간 날씨 정보가 들어가 있다.
    • daily 필드는 리스트이며 레코드가 하나의 날짜에 해당한다.
    • 날짜 정보는 ‘dt’ 라는 필드에 들어있고, 1970-01-01 부터 몇 ms 걸렸는지 나타낸 것을 epoch 라고 한다. (문자열 형태의 시간정보)
      • datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09

DAG구현(3)

  • Airflow Connections를 통해 만들어진 Redshift connection
    • 기본 autocommit의 값은 False인 점을 유의
  • 두 가지 방식의 Full Refresh 구현 방식
    • Full RefreshINSERT INTO를 사용
    • Full Refresh와 COPY를 사용 -> 나중에 사용해볼 예정
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json


def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    api_key = Variable.get("open_weather_api_key")
    # 서울의 위도/경도
    lat = 37.5665
    lon = 126.9780

    # https://openweathermap.org/api/one-call-api
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)
    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # epoch를 숫자로
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    etl("MyName", "weather_forecast")

Primary Key Uniqueness 보장하기

  • 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
    • 하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
    • 이를 CREATE_TABLE 사용시 지정
  • 관계형 데이터베이스 시스템이 Pk의 값이 중복 존재하는 것을 막아줌
    • 예1) Users 테이블에서 email 필드
    • 예2) Products 테이블에서 product_id 필드

Pk 유지방법(1)

CREATE TABLE 각자스키마.weather_forecast (
 **date date primary key,**
 temp float,
 min_temp float,
 max_temp float,
 **created_date timestamp default GETDATE()**
);
  • 날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음
  • 그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용한다.
  • 즉, date가 같은 레코드들이 있다면 created_date를 기준으로 더 최근 정보를 선택
  • 이를 하는데 적합한 SQL 문법은 ROW_NUMBER

Pk 유지방법(2)

  • ROW_NUMBER을 사용해서 새로운 컬럼을 추가하여, date별 created_date 역순으로 일련번호 부여하기

Pk 유지방법(3)

  1. 임시 테이블 t(스테이징 테이블)을 만들고 거기에 현재 모든 레코드를 복사 CREATE TEMP TABLE t AS SELECT * FROM MyName.weather_forecast;
  2. 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사
    • 이때, (일주일치 데이터의) 중복이 존재가능함
  3. 중복을 걸러주는 SQL 작성:
    • 최신 레코드를 우선 순위로 선택
    • ROW_NUMBER을 이용해서 pk로 partition을 잡고 적당한 다른 필드로 ordering(DESC)을 수행해 pk 별로 하나의 레코드를 잡아낸다.
    INSERT INTO MyName.weather_forecast
    SELECT date, temp, min_temp, max_temp, created_date
    FROM (
    	SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date) seq
    	FROM t
    )
    WHERE seq = 1;
    
    #date = pk
    #created_date를 내림차순으로 하여 큰것부터 1번으로 줘라
  4. 위의 SQL을 바탕으로 최종 원본 테이블로 복사하기
    • 이때 원본 테이블에서 레코드들을 삭제
    • 임시 temp 테이블을 원본 테이블로 복사(일련번호가 1번인 것들만 선택)

weather_forecast로 Incremental Update 다시 설명

-> created_date 기준 날짜별로 정렬해서, created_date 가 큰것만 남도록 최종 테이블을 만들면 됨

 

 

Upsert (= INSERT + UPDATE)

  • PK 기준으로 존재하는 레코드라면 새 정보로 수정
  • 존재하지 않는 레코드라면 새 레코드로 적재
  • 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌

→ Incremental update로 쇼부

 


Backfill 과 Airflow

  • execution_date, start_date 에 대해 알아보자
  • Incremental Update가 효율성이 좋을 수 있지만 운영/유지보수 난이도가 올라간다.
    • 실수로 인해 데이터가 빠지는 일이 생길 수 있음
    • 과거 데이터를 다시 읽어와야하는 경우 다시 모두 재 실행을 해주어야 함

Execution_date

  • 보통의 Daily DAG 작성시, 현재 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
  • from datetime import datetime, timedelta # 지금 시간 기준으로 어제 날짜를 계산 y = datetime.now() - timedelta(1) yesterday = datetime.strftime(y, '%Y-%m-%d') **yesterday = context["execution_date”]** # yesterday에 해당하는 데이터를 소스에서 읽어옴 # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면 sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
  • ETL별로 실행 날짜 / 시간과 결과를 메타데이터 데이터베이스에 기록한다.
  • 모든 DAG 실행에는 “execution_date”가 존재하고 여기서 데이터의 날짜와 시간을 읽어온다.
    • execution_date로 채워야하는 날짜와 시간이 넘어온다.
  • 이를 바탕으로 backfill이 쉬워지는 방식으로, 데이터를 갱신하도록 코드를 작성한다.
  • from datetime import datetime, timedelta yesterday = context["execution_date”] # yesterday에 해당하는 데이터를 소스에서 읽어옴 # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면 sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

Start_date

  • start_date 는 해당 날짜에 dag를 시작을 의미하는게 아니다!! → 설정한 그 다음날에 비로소 시작
  • 가령, 2020년 11월 7일의 데이터부터 매일 하루치 데이터를 읽는다 할 때, ETL 동작은 2020년 11월 8일날 부터 동작해야한다.
  • 다시말해, 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는 11월 7일(start_date)이다.

catchup 이 True면 : start_date ~ 현재까지 데이터의 쿼리를 날린다.

위의 job은 총 3번 실행되었다.

 

Backfill과 관련된 Airflow 변수들

  • start_date : DAG가 처음실행되는 순간 start_date + DAG 실행주기
  • execution_date : DAG가 읽어와야하는 데이터의 날짜와 시간
  • catchup : DAG가 실행된 시점 ~ 현재
  • end_date : 백필에 사용된다.

5주차 후기

Incremental Update의 개념에 대해 조금 더 깊게 이해했다.

백필과정의 난이도가, 데이터 엔지니어의 밸런스있는 삶이 결정된다는 이야기를 듣고 지금 열심히 공부해야겠다는 생각을 했다.

할게 산더미같다 더 열심히해야지