본문 바로가기
Airflow

[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week3

by 개복취 2023. 9. 5.

3주차 주요내용

1. 데이터 파이프라인?

2. Airflow 소개 및 구성

3. 데이터 파이프라인을 만들 때 고려할 점

4. Backfill이란?

 


 

airbnb에서 만들어서 그런지 air돌림자는 없어지지 않았다.

 

데이터 파이프라인이란?

  • 데이터 엔지니어링에서의 Data Pipeline은 전체적인 ETL 프로세스를 의미한다.
  • ETL (Extract, Trasform, Load): 데이터 시스템 밖에 있는 데이터를 가져오는 방법에 대한 이야기이다.
    (Data Pipeline = ETL = Data Workflow = DAG(Directed Acyclic graph)) → 이걸 하는게 데이터 엔지니어의 역량이다.
  • ELT : 데이터 시스템 내부에 이미 들어와 있는 데이터를 조합해서 깨끗한 데이터를 만드는 과정 (T라고도 함) → CTAS를 서비스로 만드는게 dbt이다.

데이터 레이크 vs. 데이터 웨어하우스

  • 데이터 레이크 (AWS Athena, AWS S3)
    • 구조화 데이터 + 비구조화 데이터
    • 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지
    • DW 보다 몇배는 더 큰 스토리지
  • 데이터 웨어하우스 (Redshift, Snowflake, BigQuery, Hive)
    • 보존 기한이 있는 구조화된 데이터를 저장하는 스토리지
    • BI툴은 DataWarehouse를 백엔드로 사용함

DataPipeline의 정의

  • 데이터를 소스로부터 목적지로 복사하는 작업
    • 보통 코딩(파이썬 / 스칼라)보다는 SQL로 이뤄진다.
    • 이 경우, 목적지는 데이터 웨어하우스가 된다.

Raw Data ETL Jobs - 데이터 파이프라인의 종류 (1)

  1. 데이터 소스에서 데이터를 읽어다가 (데이터 크기가 많은경우, API Call)
  2. 데이터 포맷 변환 후 (데이터의 크기가 많은경우, Spark)
  3. 데이터 웨어하우스 로드

Summary/Report Jobs (ELT) - 데이터 파이프라인의 종류 (2)

  1. DW로부터 데이터를 읽어 다시 DW에 쓰는 ETL
  2. Data를 읽어서 리포트나 써머리 형태의 테이블을 만드는 용도
  3. 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재 (다양한 조건문들이 붙지만 결과론적으론 CTAS 쓴다고함)
    → Analytic Engineer 대부분이 이 작업을 한다.

Production Data Jobs - 데이터 파이프라인의 종류 (3)

유데미 효자상품

  • 강의 평점이나, 수강생을 실시간으로 집계해서 SQL문으로 업데이트 하는것은 매우 버거운 작업중 하나.
  • 이런 데이터의 계산은 로드가 많이 걸리기 때문에, DE 쪽에서 집계한 정보만을 가져오는 방식으로 진행한다.
  • 데이터 사이언티스트가 머신러닝 모델의 feature으로 들어가는 데이터를 미리 계산하는 경우가 있음 (실시간은 매우 오래걸리기 때문)
    • 이러한 데이터 파이프라인을 데이터 사이언티스트와 함께 구축해 나간다.
    • 이정도되면 사실상 데이터 엔지니어보다 머신러닝 엔지니어에 가까워짐 (production level 에 DE들이 무엇인가 하게 된다는건 그 조직이 성장해간다는 이야기이다.)
  1. DW로부터 데이터를 읽어 다른 Storage로 쓰는 ETL
    1. Summary 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
    2. 머신러닝 모델에서 필요한 피쳐들을 미리 계산해 두는 경우
  2. 이 경우 흔한 타겟 스토리지
    1. NoSQL (Cassandra/ HBase/ DynamoDB)
    2. OLTP (MySQL)
    3. Redis / Memcache 와 같은 캐시
    4. ElasticSearch와 같은 검색엔진

Airflow 소개

에어플로우의 진가를 100% 발휘하려면 도커 위에다 설치해야한다.

  • ETL을 쉽게 만들 수 있도록 해준다.
    • 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공해준다.
    • 다양한 데이터 파이프라인 관리 관련 다양한 기능을 제공해준다 : BackFill(백필)
  • Airflow 에서는 데이터 파이프라인을 DAG라고 부른다.
    • 하나의 DAG에서는 하나 이상의 태스크로 구성된다.
  • Airflow 선택은 큰 회사의 버전을 따라가는게 제일 좋다. (아래 링크를 통해 어느 airflow 버전을 사용하는지 확인할 수 있다. / 높은 버전은 도박에 가깝다.) https://cloud.google.com/composer/docs/concepts/versioning/composer-versions

 

Airflow 구성

  1. Webserver : 웹서버 (웹 ui 인터페이스를 보여준다.)
  2. Scheduler : 스케쥴러 (정해진 스케쥴에 따른 작업을 진행한다.)
  3. Worker : 워커 (스케쥴러가 DAG를 보고있다가 DAG를 구성하는 TASK를 나눠주는것을 의미함)
  4. Database: 데이터베이스 (Sqllite가 기본 > PostgreSQL 사용)
  5. Queue: 큐 (워커 서버가 따로 존재할 때)
    • 서버 하나에 DAG 수가 많아지면 Worker 노드를 별도로 세팅하고 스케쥴러가 나눠줄수 있도록 한다.

Airflow 스케일링 방법

  1. 스케일 업 (더 좋은 사양의 서버 사용) : 어느 시점이 되면 버틸수 없는 상황이 된다.
  2. 스케일 아웃 (서버 추가) : 서비스마다 전용하드웨어를 추가해서 사용하는건 비효율적이다. 중앙에 서버팜을 생성해서 나눠주고 돌려받는 과정을 거칠 수 있도록 한다. (k8s)
  3. 도커와 k8s 사용하기 → 6주차에 더 설명할 예정

  • queue 서비스를 위해 Redis 많이사용한다.
  • backend, Dataengineer 많이 성장하게된다면 k8s 공부하게 될 것이다.

Airflow 개발의 장단점

  • 장점
    • 데이터 파이프라인을 세밀하게 제어 가능
    • 다양한 데이터 소스와 웨어하우스를 지원
    • Backfill(백필)이 쉽다.
  • 단점
    • 배우기가 쉽지않음
    • 상대적으로 개발환경을 구성하기가 쉽지 않음
    • 직접 운영이 쉽지않음 클라우드 버전 사용이 선호됨
      1. GCP - Cloud Composure
      2. AWS - Managed Workflows for Apache Airflow
      3. Azure - Azure Data Factory Managed Airflow

DAG란?

  • Directed Acyclic Graph 줄임말 (Airflow 에서의 ETL 부르는 명칭)
    • 왜 Acyclic인가? : ETL 과정에서 Load 이후 Extract하지 않는 형태이기 때문에 비순환이라고 한다.
    • Airflow에서는 아래 두개의 그림처럼 루프가 생성되지 않는 형태를 가지고 있다.

non-cyclic 한 노드를 가지고 있다.

  • 비순환하는 다수의 task를 서로 연결하여 실행순서를 스케쥴링을 할 수 있다.
  • 3개의 태스크로 구성됨 - Extract, Transform, Load 로 구성된다.
    • 태스크란? - Airflow의 오퍼레이터로 만들어짐
      • 다양한 종류의 오퍼레이터를 제공한다.
      • 경우에 맞게 사용 오퍼레이터를 결정하거나 직접 개발한다.

 

  • DAG 사용에 필요한 정보는 아래와 같다.
from airflow import DAG

test_dag = DAG(
	"HelloWorld" #DAG name
	schedule = "0 9 * * *", #DAG 스케쥴러 포멧 (cron 문법을 따른다) - 매일 UTC:9시 0분
	tags = ['test']
	default_args = default_args # 딕셔너리로 되어있는거 가지고옴
)

Linux crontab 문법을 따른다.

Airflow에서 task는 Operator 단위이다.

  • airflow 에서 여러가지 Operator을 제공하는데 그중 하나는 ‘BashOperator’
  • unique_id, dag=부모 (DAG 부모의 주체를 가져오면 됨)
  • 무엇을 하는지는 bash_command 에서 지정해주면 되는것이다.
  • ‘>>’ 의 operator를 통해 실행 순서를 정해줄 수 있다.

  • DummyOperator을 사용하면 위와같은 형태가된다.
  • 왜 DummyOperator는 아무것도 안하는건데 왜 사용해야 하는가?
    • 다수의 task가 동시에 실행되는 환경에서 그룹핑하기 위한 목적이다.
    • 명확하게 시작하는 시점과, 끝나는 시점을 알기위해 사용된다.
  • docker을 사용하면 여러개의 컨테이너를 올려서 사용하게 될 것이다.

데이터 파이프라인을 만들 때 고려해야할 점

이상과 현실간의 괴리

내용과 무관한 사진입니다.

  • 이상 혹은 환상
    • 내 데이터 파이프라인은 문제 없이 동작할 것이다.
    • 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다.
  • 현실 혹은 실상
    • 데이터 파이프라인은 많은 이유로 실패함
      • 버그…
      • 데이터 소스상의 이슈
      • 데이터 파이프라인간의 의존도에 대한 이해도 부족
    • 데이터 파이프라인 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어난다.

Best Practice(1)

  • 데이터가 작은 경우 매번 통째로 복사해서 테이블을 만들기(Full Refresh)
  • Incremental update 만이 가능하다면, 데이터소스가 갖춰야할 몇가지 조건이 있음
    • 가능하면, full refresh로 버틸수 있을때까지 버티는 것이 좋다. 효율적으로 짠다고 고도화하는건 지양한다. 그러나 어느순간 Incremental update가 필요한 시점이 있다.
    • 만약 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요
      1. created
      2. modified
      3. deleted
  • 데이터 소스가 API 라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
  • 데이터 파이프라인에 문제가 있어서 이슈가 생겨서 ‘Backfill’을 하는 과정이 있음
    • Backfill을 얼마나 쉽게하느냐가 데이터엔지니어의 삶에 있어 영향을 미친다.

Best Practice(2)

웹공부하면 지겹게 봐야하는 'Idempotent'

  • 멱등성을 보장하는 것이 중요 (PUT vs. POST)
    • 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 한다.
    • 예를 들면, 중복 데이터가 생기지 말아야한다.

Best Practice(3)

  • 실패한 데이터 파이프라인을 재실행이 쉬워야한다.
  • 과거 데이터를 다시 채우는 과정이 쉬워야 한다.
  • Airflow는 이부분에 강점을 갖고 있음

Best Practice(4)

Documentation

  • 데이터 파이프라인의 입, 출력을 명확히 하고 문서화
    • 데이터 디스커버리 문제
  • 주기적으로 쓸모없는 데이터들을 삭제한다.
  • → 귀찮은 작업이기 때문에, 업무에서 해방하여 이거 하나만 하는것을 집중해서 하는것이 좋다.

Best Practice(5)

  • 데이터 파이프라인 사고시마다 사고 리포트 쓰기
    • 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
  • 중요 데이터 파이프라인의 입, 출력을 체크하기
    • 아주 간단하게 입, 출력 레코드 수가 몇개인지 체크하기
    • 써머리 테이블, Pk uniqueness를 보장
    • 중복 레코드 체크

BackFill이란?

Full Refresh vs. Incremental Update

  • Full Refresh: 매번 소스의 내용을 다 읽어오는 방식 : 유지보수 쉬움, 데이터 커지면 사용불가
  • Incremental Update : 효율성이 좋지만 유지보수 힘들어짐, daily/hourly로 동작한다.

Backfill의 용이성 여부 → 데이터 엔지니어 삶에 직접적인 영향을 준다.

Daily DAG 작성하는 방법?

  • 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어온다.
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산

y = datetime.now() - timedelta(1) #하루빼면, 어제가 된다.
yesterday = datetime.strftime(y, '%Y-%m-%d')

# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

1년치 데이터를 Backfill 해야한다면??

from datetime import datetime, timedelta
y = datetime.now() - timedelta(1) # line should erase1
yesterday = datetime.strftime(y, '%Y-%m-%d') # line should erase2
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
  • ‘문제가 없을거야’ 라는 생각을 가지고 작성해서 엉망진창으로 운영이 된다.
  • 내가 직접 날짜를 계산하지말고, 시스템이 제공해주는 날짜를 사용하자.

어떻게 ETL을 구현하면 편해질까?

  • 시스템적으로 이걸 쉽게 해주는 방법을 구현한다.
    • 날짜별로 backfill 결과를 기록하고 성공 여부 기록한다.
    • 날짜를 시스템에서 ETL인자로 제공
    • 데이터엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜를 사용

 


 

3주차 후기

새로운 개념뿐만 아니라 데이터 엔지니어가 가져야할 마음가짐과 자세를 알아볼 수 있었던 좋은 시간이었다.

사실상 주니어때는 직접 시행착오를 겪어보지 않고서는 알 수없는 막연한 내용인데 강의를 듣고 "아 이럴수도 있겠구나" 모먼트를 느꼈다.

내용이 점점 많아지는추세이지만, airflow에 대해 깊이 알게되어서 점점 재미있어지는것 같다.