본문 바로가기

Airflow15

[Apache Airflow 기반의 데이터 파이프라인] 태스크간 의존성 정의하기 기본 의존성 유형 브랜치하기 조건부 태스크 XCom(cross-communication)을 사용하여 데이터를 공유하기 XCom 사용시 고려사항 기본 의존성 유형 선형체인(linear chain) : 연속적으로 실행되는 작업 팬아웃/팬인 (fan-out/fan-in) : 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대의 동작을 수행하는 유형 선형 의존성 유형 download_launches >> get_pictures # 작업 의존성을 각각 설정하기 get_pictures >> notify download_launches >> get_pictures >> notify # 또는 여러 개의 의존성을 설정할수 있다. 태스크 의존성을 통해 Airflow 업스트림 의존성이 성공적으로 실행된 뒤 다음 태.. 2024. 2. 19.
[Apache Airflow 기반의 데이터 파이프라인] Airflow 콘텍스트를 사용하여 태스크 템플릿 작성하기 개요 PythonOperator 템플릿 키워드 인자를 받아들이는 또다른 방법 : 명시적으로 변수를 알려주기 PythonOperator에 변수제공 :두가지 이상의 데이터 소스에서 데이터를 다운로드 다른 시스템과 연결하기 개요 어떤 종류의 데이터로 작업을 하든지, 파이프라인을 구축하기 전에 접근 방식에 대한 기술적 계획을 세우는 것이 중요하다. 솔루션은 항상 다른 사용자가 데이터로 무엇을 하려는지에 따라 달라지므로, 질문에 대한 답을 알고 나면 기술적 세부 사항에 대한 문제를 해결할 수 있다. jinja 탬플릿 {{이중 중괄호}}을 통해 런타임 시 삽입 될 변수를 나타내서 사용할 수 있다. print("Hello {{ name }}!") 런타임 시 값을 입력하기 때문에 프로그래밍 할 때에는 값을 알 수 없다.. 2024. 2. 17.
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week7 7주차 주요내용 Airflow Configuration for Production Usage Slack 연동하기 구글 시트 연동하기: 시트 => Redshift 테이블 API & Airflow 모니터링 Dag Dependencies Airflow Configuration for Production Usage 제일 중요한 파일은 airflow.cfg (/var/lib/airflow or /opt/airflow/) Any changes here will be reflected when you restart the webserver and scheduler core 섹션의 dags_folder가 DAG들이 있는 디렉토리가 되어야한다. /var/lib/airflow/dags dag_dir_list_interva.. 2023. 10. 12.
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week6 6주차 주요내용 OLTP 테이블 복사하기 Backfill 실행하기 Summary 테이블 만들기 (ELT) MySQL(OLTP) 테이블 복사하기 서비스 운영을 위한 데이터를 MySQL에 적재해놓는다. OLTP(Online Transaction Process) : 서비스를 운영하는데 필요한 최소한의 정보등록 → OLAP(Online Analytical Process) : 데이터 분석과 데이터 프로세싱을 위한 DB구축 MySQL .nps 데이터를 Redshift nps 데이터에 적재하는 두가지 방법 MySQL에서 레코드 하나씩 읽어온 다음에 INSERT 로 루프돌면서 적재시키기 upsert가 지원이 되지 않고 INSERT/UPDATE 과정을 두번 거쳐야하기 때문에 오래걸림 COPY를 통해 UPSERT를 구현 .. 2023. 10. 11.
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week5 5주차 주요내용 airflow.cfg 파해치기 Open Weathermap DAG 구현하기 Primary Key Uniqueness 보장하기 Backfill과 Airflow airflow.cfg 파해지기 1. DAGs 폴더는 어디에 지정되는가? core 섹션의 dags_folder 키 - 도커 sh 로 들어가는 과정 /opt/airflow/dags : dags파일이 있을거라고 생각함 2. DAGs 폴더에 스캔 주기를 정해주는 키의 이름이 무엇인가? core 섹션의 dags_dir_list_interval 키 (default = 300s), 최대 5분을 기다리는것이다. 3. API 형태로 외부에서 조작하고 싶다면? 어떤 섹션을 변경해야 하는가? api 섹션의 auth_backend를 airflow.api... 2023. 10. 10.
[데이터엔지니어] 실리콘 밸리에서 날아온 엔지니어링 스타터 키트 Week4 4주차 주요내용 멱등성과 트랜잭션 Airflow 이해하기 NameGender.py DAG 개선하기 Yahoo Finance API DAG 작성 멱등성과 트랜잭션 멱등성이란? 데이터 파이프라인이 연속 식행되었을 때 소스에 있는 데이터가 그대로 저장되어야 함을 이야기함 가령 Full refresh를 하는 데이터 파이프라인이라면… 먼저 DW의 관련 테이블에서 모든 레코드들을 삭제한다. 데이터 소스에서 읽어온 데이터를 DW테이블로 적재한다. 만일 1이 성공하고 2가 실패한다면…? 만일 1이 실행된 다음 누군가 이 테이블을 사용한다면? → 위와 같은 상황을 방지하기 위해, 우리는 트랜잭션에 대한 개념을 인지해야한다. 트랜잭션이란? Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법 T.. 2023. 10. 9.
반응형