본문 바로가기
Airflow

[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week7

by 개복취 2023. 10. 12.

7주차 주요내용

  1. Airflow Configuration for Production Usage
  2. Slack 연동하기
  3. 구글 시트 연동하기: 시트 => Redshift 테이블
  4. API & Airflow 모니터링
  5. Dag Dependencies

바람개비 떠나보내기

 

Airflow Configuration for Production Usage

  • 제일 중요한 파일은 airflow.cfg (/var/lib/airflow or /opt/airflow/)
    • Any changes here will be reflected when you restart the webserver and scheduler
    • core 섹션의 dags_folder가 DAG들이 있는 디렉토리가 되어야한다.
      • /var/lib/airflow/dags
    • dag_dir_list_interval : dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시(초단위)
  • Airflow DB upgrade
    • 기본 sql: SqllitePostgres or MySQL (DB는 주기적으로 백업되어야 한다)
    • sql_alchemy_conn in Core section of airflow.cfg
  • LocalExecutor 또는 CeleryExecutor 사용한다.
    • Executor은 어떤 dag의 task를 실행하는 주체이다. (task를 dependency에 맞게 실행시켜주는것)
    • CeleryExecutor : Celery + Redis를 사용해서 큐처럼 사용한다. 한단계 발전하면 k8s Executor을 사용한다. 어떤 Executor을 사용하는지는 중요하지 않고 code자체가 달라지는것은 아니다.
  • login 화면이 없었을때에는(Airflow 1.0), 포트를 하나씩 스캔해서 코드 및 정보들을 염탐할 수 있었다.
    • 2.0에서는 인증이 기본으로 ON 으로 되어있다.
  • DAGS의 log는 기본적으로 Airflow의 메타데이터에 쌓이지 않고, 디스크에 저장된다.
    • Logs → /dev/airflow/logs in (Core section of airflow.cfg)
      • base_log_folder
      • child_process_log_directory
    • Local data → /dev/airflow/data
  • 주기적으로 디스크를 정리해주지 않으면 dag를 실행할 때에 문제가 생길 수 있다.
    • shellOperator을 기반으로한 DAG를 실행하여 미연에 방지할 수 있다.
  • 처음에는 EC2 위에 도커엔진 돌려서 하나의 서버로 운영, 이후 형편이 괜찮아지면 (dag, task늘어나면) cloud버전으로 돌리는것 또는 회사에서 관리하는 과정을 가져간다 (쿠팡, 오늘의집, 쏘카, 네이버…)
  • Operator 하나가 도커하나 올라가는 형태로 만들어지게 됨 (끝나면 자원을 반환하는 형태로 만들어짐)

Slack 연동하기

  • DAG 실행 중에 에러가 발생하면 그걸 지정된 슬랙 workspace의 채널로 보내기
  • 이를 위해서 해당 슬랙 workspace에 App 설정이 필요함
  • 다음으로 연동을 위한 함수를 만들고 (plugins/slack.py)
  • 이를 태스크에 적용되는 default_args의 on_failure_callback에 지정한다.
from plugins import slack
 …
 default_args= {
 'on_failure_callback': slack.on_failure_callback,
 }

webhook을 통해 해당 코드를 입력한다.

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}'
<https://hooks.slack.com/services/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>

 

→ ok라는 응답을 보내준다.

→ webhook uri는 동일함 (https://hooks.slack.com/services), 뒤의 내용을 airflow내용에 넣는다.

(name_gender_v4 에는 사실 슬랙 연동을 위한 내용이 있었다.)

from plugins import slack

on_failure_callback : slack.on_failure_callback

 

실제로 DAG를 굴려보면 아래와 같은 에러가 연동된 슬랙에 알람으로 온다!


구글 시트 연동하기

구글 시트를 테이블로 복사하는 예제 개요

  • 구글 클라우드 시트 API 활성화 시켜주고, 구글 서비스 어카운트를 생성해서 그 내용을 JSON 파일로 다운로드 한다.
  • 스프레드 시트를 redshift에 복제하고 싶으면 email을 공유해야한다. (코드가 복잡한것보다 세팅이 더 복잡함)

curl -X GET --user "airflow:airflow" <http://localhost:8080/api/v1/config>

→ airflow dags list를 api로 불러오는 방법

sheet2Redshift = .csv 파일로 다운로드 → s3에다가 저장 → Redshift에다가 벌크업데이트

 

Gsheet_to_Redshift.py

"""
 - 구글 스프레드시트에서 읽기를 쉽게 해주는 모듈입니다. 아직은 쓰는 기능은 없습니다만 쉽게 추가 가능합니다.

 - 메인 함수는 get_google_sheet_to_csv입니다.
  - 이는 google sheet API를 통해 구글 스프레드시트를 읽고 쓰는 것이 가능하게 해줍니다.
  - 읽으려는 시트(탭)가 있는 스프레드시트 파일이 구글 서비스 어카운트 이메일과 공유가 되어있어야 합니다.
  - Airflow 상에서는 서비스어카운트 JSON 파일의 내용이 google_sheet_access_token이라는 이름의 Variable로 저장되어 있어야 합니다.
    - 이 이메일은 iam.gserviceaccount.com로 끝납니다.
    - 이 Variable의 내용이 매번 파일로 쓰여지고 그 파일이 구글에 권한 체크를 하는데 사용되는데 이 파일은 local_data_dir Variable로 지정된 로컬 파일 시스템에 저장된다. 이 Variable은 보통 /var/lib/airflow/data/로 설정되며 이를 먼저 생성두어야 한다 (airflow 사용자)
  - JSON 기반 서비스 어카운트를 만들려면 이 링크를 참고하세요: https://denisluiz.medium.com/python-with-google-sheets-service-account-step-by-step-8f74c26ed28e

 - 아래 2개의 모듈 설치가 별도로 필요합니다.
  - pip3 install oauth2client
  - pip3 install gspread

 - get_google_sheet_to_csv 함수:
  - 첫 번째 인자로 스프레드시트 링크를 제공. 이 시트를 service account 이메일과 공유해야합니다.
  - 두 번째 인자로 데이터를 읽어올 tab의 이름을 지정합니다.
  - 세 번째 인자로 지정된 test.csv로 저장합니다.
gsheet.get_google_sheet_to_csv(
    'https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-_lXBa0VSmQAs98uUnmfOqvDYYjuE50/',
    'Test',
    'test.csv',
)

 - 여기 예제에서는 아래와 같이 테이블을 만들어두고 이를 구글스프레드시트로부터 채운다
CREATE TABLE MyName.spreadsheet_copy_testing (
    col1 int,
    col2 int,
    col3 int,
    col4 int
);
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
from plugins import gsheet
from plugins import s3

import requests
import logging
import psycopg2
import json


def download_tab_in_gsheet(**context):
    url = context["params"]["url"]
    tab = context["params"]["tab"]
    table = context["params"]["table"]
    data_dir = Variable.get("DATA_DIR")

    gsheet.get_google_sheet_to_csv(
        url,
        tab,
        data_dir+'{}.csv'.format(table)
    )
     

def copy_to_s3(**context):
    table = context["params"]["table"]
    s3_key = context["params"]["s3_key"]

    s3_conn_id = "aws_conn_id"
    s3_bucket = "grepp-data-engineering"
    data_dir = Variable.get("DATA_DIR")
    local_files_to_upload = [ data_dir+'{}.csv'.format(table) ]
    replace = True

    s3.upload_to_s3(s3_conn_id, s3_bucket, s3_key, local_files_to_upload, replace)


dag = DAG(
    dag_id = 'Gsheet_to_Redshift',
    start_date = datetime(2021,11,27), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    max_active_tasks = 2,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

sheets = [
    {
        "url": "https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-_lXBa0VSmQAs98uUnmfOqvDYYjuE50/",
        "tab": "SheetToRedshift",
        "schema": "MyName",
        "table": "spreadsheet_copy_testing"
    }
]

for sheet in sheets:
    download_tab_in_gsheet = PythonOperator(
        task_id = 'download_{}_in_gsheet'.format(sheet["table"]),
        python_callable = download_tab_in_gsheet,
        params = sheet,
        dag = dag)

    s3_key = sheet["schema"] + "_" + sheet["table"]

    copy_to_s3 = PythonOperator(
        task_id = 'copy_{}_to_s3'.format(sheet["table"]),
        python_callable = copy_to_s3,
        params = {
            "table": sheet["table"],
            "s3_key": s3_key
        },
        dag = dag)

    run_copy_sql = S3ToRedshiftOperator(
        task_id = 'run_copy_sql_{}'.format(sheet["table"]),
        s3_bucket = "grepp-data-engineering",
        s3_key = s3_key,
        schema = sheet["schema"],
        table = sheet["table"],
        copy_options=['csv', 'IGNOREHEADER 1'],
        method = 'REPLACE',
        redshift_conn_id = "redshift_dev_db",
        aws_conn_id = 'aws_conn_id',
        dag = dag
    )

    download_tab_in_gsheet >> copy_to_s3 >> run_copy_sql

 

plugins/gsheet.py

# -*- coding: utf-8 -*-
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
from oauth2client.service_account import ServiceAccountCredentials

import base64
import gspread
import json
import logging
import os
import pandas as pd
import pytz


def write_variable_to_local_file(variable_name, local_file_path):
    content = Variable.get(variable_name)
    f = open(local_file_path, "w")
    f.write(content)
    f.close()


def get_gsheet_client():
    data_dir = Variable.get("DATA_DIR")
    scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
    gs_json_file_path = data_dir + 'google-sheet.json'

    write_variable_to_local_file('google_sheet_access_token', gs_json_file_path)
    credentials = ServiceAccountCredentials.from_json_keyfile_name(gs_json_file_path, scope)
    gc = gspread.authorize(credentials)

    return gc


def p2f(x):
    return float(x.strip('%'))/100


def get_google_sheet_to_csv(
    sheet_uri,
    tab,
    filename,
    header_line=1,
    remove_dollar_comma=0,
    rate_to_float=0):
    """
    Download data in a tab (indicated by "tab") in a spreadsheet ("sheet_uri") as a csv ("filename")
    - if tab is None, then the records in the first tab of the sheet will be downloaded
    - if tab has only one row in the header, then just use the default value which is 1
    - setting remove_dollar_comma to 1 will remove any dollar signs or commas from the values in the CSV file
      - dollar sign might need to be won sign instead here
    - setting rate_to_float to 1 will convert any percentage numeric values to fractional values (50% -> 0.5)
    """

    data, header = get_google_sheet_to_lists(
        sheet_uri,
        tab,
        header_line,
        remove_dollar_comma=remove_dollar_comma)

    if rate_to_float:
        for row in data:
            for i in range(len(row)):
                if str(row[i]).endswith("%"):
                    row[i] = p2f(row[i])

    data = pd.DataFrame(data, columns=header).to_csv(
        filename,
        index=False,
        header=True,
        encoding='utf-8'
    )


def get_google_sheet_to_lists(sheet_uri, tab=None, header_line=1, remove_dollar_comma=0):
    gc = get_gsheet_client()

    # no tab is given, then take the first sheet
    # here tab is the title of a sheet of interest
    if tab is None:
        wks = gc.open_by_url(sheet_uri).sheet1
    else:
        wks = gc.open_by_url(sheet_uri).worksheet(tab)

    # list of lists, first value of each list is column header
    print(wks.get_all_values())
    print(int(header_line)-1)
    data = wks.get_all_values()[header_line-1:]

    # header = wks.get_all_values()[0]
    header = data[0]
    if remove_dollar_comma:
        data = [replace_dollar_comma(l) for l in data if l != header]
    else:
        data = [l for l in data if l != header]
    return data, header


def add_df_to_sheet_in_bulk(sh, sheet, df, header=None, clear=False):
    records = []
    headers = list(df.columns)
    records.append(headers)

    for _, row in df.iterrows():
        record = []
        for column in headers:
            if str(df.dtypes[column]) in ('object', 'datetime64[ns]'):
                record.append(str(row[column]))
            else:
                record.append(row[column])
        records.append(record)

    if clear:
        sh.worksheet(sheet).clear()
    sh.values_update(
        '{sheet}!A1'.format(sheet=sheet),
        params={'valueInputOption': 'RAW'},
        body={'values': records}
    )


def update_sheet(filename, sheetname, sql, conn_id):
    client = get_gsheet_client()
    hook = PostgresHook(postgres_conn_id=conn_id)
    sh = client.open(filename)
    df = hook.get_pandas_df(sql)
    print(sh.worksheets())
    sh.worksheet(sheetname).clear()
    add_df_to_sheet_in_bulk(sh, sheetname, df.fillna(''))


def replace_dollar_comma(lll):
    return [ ll.replace(',', '').replace('$', '') for ll in lll ]

반대방향은(Redshift → Sheet) SQLToGoogleSheetsOperator을 사용한다.

 


API & Airflow 모니터링

  • Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습
  • Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습
  • airflow.cfg 의 api섹션에서 auth_backend 값을 변경한다.
    • auth_backend = airflow.api.auth.backend.basic_auth
  • docker-compose.yaml에는 이미 설정이 되어있다.
    • AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session’

→ 다음 명령어로 확인하기

$ docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session

health API 호출 / 응답

curl -X GET --user "airflow:airflow" <http://localhost:8080/health>

 

특정 DAG를 API로 Trigger 하기

curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}'
"<http://localhost:8080/api/v1/dags/HelloWorld/dagRuns>"

모든 DAG를 리스트하기

curl -X GET --user "airflow:airflow" <http://localhost:8080/api/v1/dags>

모든 Variable 리스트하기

curl -X GET --user "airflow:airflow" <http://localhost:8080/api/v1/variables>

 

모든 Config 리스트하기

  • 보안이슈로 인해 이거는 기본설정으로 막혀있음 → .config 파일에서 조작하여 뚫는방법이 존재함
curl -X GET --user "airflow:airflow" <http://localhost:8080/api/v1/config>

 

Variables/Connections Import/Export

airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json


DAG dependencies

DAG를 실행하는 법

  1. 다른 DAG에 의해 트리거하는 두가지 방법 : Explicit (권장됨)/ Reactive Trigger
  2. 알아두면 좋은 상황에 따라 다른 태스크 실행 방식
    • 조건에 따라 다른 태스크로 분기
    • 과거 데이터 Backfill시에는 불필요한 태스크 처리
    • 앞단 태스크들의 실행상황
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
 task_id="trigger_B",
 trigger_dag_id="트리거하려는DAG이름",
 conf={ 'path': '/opt/ml/conf' },
 execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
 reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
 wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
)

# DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja 템플릿(dag_run.conf["path"])으로 접근 가능.
# DAG B PythonOperator(**context)에서라면 kwargs['dag_run'].conf.get('path')

Sensor → ExternalTaskSensor을 통해 폴링을 해서 진입 여부를 확인

  • 비효율적인 방법이다.
  • airflow 용량이 부족해짐

Branch Python Operator

  • 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 오퍼레이터

 

(스파크 강의도 맛보기로 조금 들었지만 나중을 위해 패스..)


7주차 후기 (마지막)

7주동안 토요일 오전에 졸린걸 참아가며 완강했다. 뭐랄까 개인적으로 근래 들었던 강의중 가장 만족스러웠던 것 같다.

눈물나는 가격을 감수하고 결제한 강의였지만 나의 데엔 커리어에 있어서 좋은 자양분이 되어준 것 같다.

 

수업 첫날 한기용멘토님이 하신 말씀중에 생각나는건 운동, 글쓰기 같은 복리가 있는 활동을 꾸준하게 해주어야 한다고 말씀하셨던 것이 기억에 남는다. 사실 이 블로그를 시작하게 된 계기도 말씀을 듣고 나의 복리활동을 시작하기 위함이다. 복리활동의 목표가 완강하고 강의 후기를 기록하는 것 이였으니 소기의 성과는 달성한 것 같아 기분이 좋다.

 

혹시라도, 만약 이 강의가 어떤지 후기를 알고싶어 검색해서 여기를 찾아왔다면 구매를 망설이지 말고 지르라고 적극 권유해주고싶다.

스파크 강의도 듣고 후기를 정리해야겠다.