3주차 주요내용
1. 데이터 파이프라인?
2. Airflow 소개 및 구성
3. 데이터 파이프라인을 만들 때 고려할 점
4. Backfill이란?
데이터 파이프라인이란?
- 데이터 엔지니어링에서의 Data Pipeline은 전체적인 ETL 프로세스를 의미한다.
- ETL (Extract, Trasform, Load): 데이터 시스템 밖에 있는 데이터를 가져오는 방법에 대한 이야기이다.
(Data Pipeline = ETL = Data Workflow = DAG(Directed Acyclic graph)) → 이걸 하는게 데이터 엔지니어의 역량이다. - ELT : 데이터 시스템 내부에 이미 들어와 있는 데이터를 조합해서 깨끗한 데이터를 만드는 과정 (T라고도 함) → CTAS를 서비스로 만드는게 dbt이다.
데이터 레이크 vs. 데이터 웨어하우스
- 데이터 레이크 (AWS Athena, AWS S3)
- 구조화 데이터 + 비구조화 데이터
- 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지
- DW 보다 몇배는 더 큰 스토리지
- 데이터 웨어하우스 (Redshift, Snowflake, BigQuery, Hive)
- 보존 기한이 있는 구조화된 데이터를 저장하는 스토리지
- BI툴은 DataWarehouse를 백엔드로 사용함
DataPipeline의 정의
- 데이터를 소스로부터 목적지로 복사하는 작업
- 보통 코딩(파이썬 / 스칼라)보다는 SQL로 이뤄진다.
- 이 경우, 목적지는 데이터 웨어하우스가 된다.
Raw Data ETL Jobs - 데이터 파이프라인의 종류 (1)
- 데이터 소스에서 데이터를 읽어다가 (데이터 크기가 많은경우, API Call)
- 데이터 포맷 변환 후 (데이터의 크기가 많은경우, Spark)
- 데이터 웨어하우스 로드
Summary/Report Jobs (ELT) - 데이터 파이프라인의 종류 (2)
- DW로부터 데이터를 읽어 다시 DW에 쓰는 ETL
- Data를 읽어서 리포트나 써머리 형태의 테이블을 만드는 용도
- 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재 (다양한 조건문들이 붙지만 결과론적으론 CTAS 쓴다고함)
→ Analytic Engineer 대부분이 이 작업을 한다.
Production Data Jobs - 데이터 파이프라인의 종류 (3)
- 강의 평점이나, 수강생을 실시간으로 집계해서 SQL문으로 업데이트 하는것은 매우 버거운 작업중 하나.
- 이런 데이터의 계산은 로드가 많이 걸리기 때문에, DE 쪽에서 집계한 정보만을 가져오는 방식으로 진행한다.
- 데이터 사이언티스트가 머신러닝 모델의 feature으로 들어가는 데이터를 미리 계산하는 경우가 있음 (실시간은 매우 오래걸리기 때문)
- 이러한 데이터 파이프라인을 데이터 사이언티스트와 함께 구축해 나간다.
- 이정도되면 사실상 데이터 엔지니어보다 머신러닝 엔지니어에 가까워짐 (production level 에 DE들이 무엇인가 하게 된다는건 그 조직이 성장해간다는 이야기이다.)
- DW로부터 데이터를 읽어 다른 Storage로 쓰는 ETL
- Summary 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 머신러닝 모델에서 필요한 피쳐들을 미리 계산해 두는 경우
- 이 경우 흔한 타겟 스토리지
- NoSQL (Cassandra/ HBase/ DynamoDB)
- OLTP (MySQL)
- Redis / Memcache 와 같은 캐시
- ElasticSearch와 같은 검색엔진
Airflow 소개
- ETL을 쉽게 만들 수 있도록 해준다.
- 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공해준다.
- 다양한 데이터 파이프라인 관리 관련 다양한 기능을 제공해준다 : BackFill(백필)
- Airflow 에서는 데이터 파이프라인을 DAG라고 부른다.
- 하나의 DAG에서는 하나 이상의 태스크로 구성된다.
- Airflow 선택은 큰 회사의 버전을 따라가는게 제일 좋다. (아래 링크를 통해 어느 airflow 버전을 사용하는지 확인할 수 있다. / 높은 버전은 도박에 가깝다.) https://cloud.google.com/composer/docs/concepts/versioning/composer-versions
Airflow 구성
- Webserver : 웹서버 (웹 ui 인터페이스를 보여준다.)
- Scheduler : 스케쥴러 (정해진 스케쥴에 따른 작업을 진행한다.)
- Worker : 워커 (스케쥴러가 DAG를 보고있다가 DAG를 구성하는 TASK를 나눠주는것을 의미함)
- Database: 데이터베이스 (Sqllite가 기본 > PostgreSQL 사용)
- Queue: 큐 (워커 서버가 따로 존재할 때)
- 서버 하나에 DAG 수가 많아지면 Worker 노드를 별도로 세팅하고 스케쥴러가 나눠줄수 있도록 한다.
Airflow 스케일링 방법
- 스케일 업 (더 좋은 사양의 서버 사용) : 어느 시점이 되면 버틸수 없는 상황이 된다.
- 스케일 아웃 (서버 추가) : 서비스마다 전용하드웨어를 추가해서 사용하는건 비효율적이다. 중앙에 서버팜을 생성해서 나눠주고 돌려받는 과정을 거칠 수 있도록 한다. (k8s)
- 도커와 k8s 사용하기 → 6주차에 더 설명할 예정
- queue 서비스를 위해 Redis 많이사용한다.
- backend, Dataengineer 많이 성장하게된다면 k8s 공부하게 될 것이다.
Airflow 개발의 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 웨어하우스를 지원
- Backfill(백필)이 쉽다.
- 단점
- 배우기가 쉽지않음
- 상대적으로 개발환경을 구성하기가 쉽지 않음
- 직접 운영이 쉽지않음 클라우드 버전 사용이 선호됨
- GCP - Cloud Composure
- AWS - Managed Workflows for Apache Airflow
- Azure - Azure Data Factory Managed Airflow
DAG란?
- Directed Acyclic Graph 줄임말 (Airflow 에서의 ETL 부르는 명칭)
- 왜 Acyclic인가? : ETL 과정에서 Load 이후 Extract하지 않는 형태이기 때문에 비순환이라고 한다.
- Airflow에서는 아래 두개의 그림처럼 루프가 생성되지 않는 형태를 가지고 있다.
- 비순환하는 다수의 task를 서로 연결하여 실행순서를 스케쥴링을 할 수 있다.
- 3개의 태스크로 구성됨 - Extract, Transform, Load 로 구성된다.
- 태스크란? - Airflow의 오퍼레이터로 만들어짐
- 다양한 종류의 오퍼레이터를 제공한다.
- 경우에 맞게 사용 오퍼레이터를 결정하거나 직접 개발한다.
- 태스크란? - Airflow의 오퍼레이터로 만들어짐
- DAG 사용에 필요한 정보는 아래와 같다.
from airflow import DAG
test_dag = DAG(
"HelloWorld" #DAG name
schedule = "0 9 * * *", #DAG 스케쥴러 포멧 (cron 문법을 따른다) - 매일 UTC:9시 0분
tags = ['test']
default_args = default_args # 딕셔너리로 되어있는거 가지고옴
)
Airflow에서 task는 Operator 단위이다.
- airflow 에서 여러가지 Operator을 제공하는데 그중 하나는 ‘BashOperator’
- unique_id, dag=부모 (DAG 부모의 주체를 가져오면 됨)
- 무엇을 하는지는 bash_command 에서 지정해주면 되는것이다.
- ‘>>’ 의 operator를 통해 실행 순서를 정해줄 수 있다.
- DummyOperator을 사용하면 위와같은 형태가된다.
- 왜 DummyOperator는 아무것도 안하는건데 왜 사용해야 하는가?
- 다수의 task가 동시에 실행되는 환경에서 그룹핑하기 위한 목적이다.
- 명확하게 시작하는 시점과, 끝나는 시점을 알기위해 사용된다.
- docker을 사용하면 여러개의 컨테이너를 올려서 사용하게 될 것이다.
데이터 파이프라인을 만들 때 고려해야할 점
이상과 현실간의 괴리
- 이상 혹은 환상
- 내 데이터 파이프라인은 문제 없이 동작할 것이다.
- 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다.
- 현실 혹은 실상
- 데이터 파이프라인은 많은 이유로 실패함
- 버그…
- 데이터 소스상의 이슈
- 데이터 파이프라인간의 의존도에 대한 이해도 부족
- 데이터 파이프라인 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어난다.
- 데이터 파이프라인은 많은 이유로 실패함
Best Practice(1)
- 데이터가 작은 경우 매번 통째로 복사해서 테이블을 만들기(Full Refresh)
- Incremental update 만이 가능하다면, 데이터소스가 갖춰야할 몇가지 조건이 있음
- 가능하면, full refresh로 버틸수 있을때까지 버티는 것이 좋다. 효율적으로 짠다고 고도화하는건 지양한다. 그러나 어느순간 Incremental update가 필요한 시점이 있다.
- 만약 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요
- created
- modified
- deleted
- 데이터 소스가 API 라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
- 데이터 파이프라인에 문제가 있어서 이슈가 생겨서 ‘Backfill’을 하는 과정이 있음
- Backfill을 얼마나 쉽게하느냐가 데이터엔지니어의 삶에 있어 영향을 미친다.
Best Practice(2)
- 멱등성을 보장하는 것이 중요 (PUT vs. POST)
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 한다.
- 예를 들면, 중복 데이터가 생기지 말아야한다.
Best Practice(3)
- 실패한 데이터 파이프라인을 재실행이 쉬워야한다.
- 과거 데이터를 다시 채우는 과정이 쉬워야 한다.
- Airflow는 이부분에 강점을 갖고 있음
Best Practice(4)
- 데이터 파이프라인의 입, 출력을 명확히 하고 문서화
- 데이터 디스커버리 문제
- 주기적으로 쓸모없는 데이터들을 삭제한다.
- → 귀찮은 작업이기 때문에, 업무에서 해방하여 이거 하나만 하는것을 집중해서 하는것이 좋다.
Best Practice(5)
- 데이터 파이프라인 사고시마다 사고 리포트 쓰기
- 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
- 중요 데이터 파이프라인의 입, 출력을 체크하기
- 아주 간단하게 입, 출력 레코드 수가 몇개인지 체크하기
- 써머리 테이블, Pk uniqueness를 보장
- 중복 레코드 체크
BackFill이란?
Full Refresh vs. Incremental Update
- Full Refresh: 매번 소스의 내용을 다 읽어오는 방식 : 유지보수 쉬움, 데이터 커지면 사용불가
- Incremental Update : 효율성이 좋지만 유지보수 힘들어짐, daily/hourly로 동작한다.
Backfill의 용이성 여부 → 데이터 엔지니어 삶에 직접적인 영향을 준다.
Daily DAG 작성하는 방법?
- 지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어온다.
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1) #하루빼면, 어제가 된다.
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
1년치 데이터를 Backfill 해야한다면??
from datetime import datetime, timedelta
y = datetime.now() - timedelta(1) # line should erase1
yesterday = datetime.strftime(y, '%Y-%m-%d') # line should erase2
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
- ‘문제가 없을거야’ 라는 생각을 가지고 작성해서 엉망진창으로 운영이 된다.
- 내가 직접 날짜를 계산하지말고, 시스템이 제공해주는 날짜를 사용하자.
어떻게 ETL을 구현하면 편해질까?
- 시스템적으로 이걸 쉽게 해주는 방법을 구현한다.
- 날짜별로 backfill 결과를 기록하고 성공 여부 기록한다.
- 날짜를 시스템에서 ETL인자로 제공
- 데이터엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템이 지정해준 날짜를 사용
3주차 후기
새로운 개념뿐만 아니라 데이터 엔지니어가 가져야할 마음가짐과 자세를 알아볼 수 있었던 좋은 시간이었다.
사실상 주니어때는 직접 시행착오를 겪어보지 않고서는 알 수없는 막연한 내용인데 강의를 듣고 "아 이럴수도 있겠구나" 모먼트를 느꼈다.
내용이 점점 많아지는추세이지만, airflow에 대해 깊이 알게되어서 점점 재미있어지는것 같다.
'Airflow' 카테고리의 다른 글
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week6 (0) | 2023.10.11 |
---|---|
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week5 (2) | 2023.10.10 |
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week4 (1) | 2023.10.09 |
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week2 (0) | 2023.08.29 |
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week1 (0) | 2023.08.21 |