- Spark 파일 포맷
- Execution Plan
- Bucketing과 Partitioning
- 요약
Spark 파일 포맷
- 데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요
(Unstructured, semi-structured는 사람이 읽을 수 있음, Structured는 사람이 읽을 수 없음)
Spark의 주요 파일 타입
특징
|
CSV
|
JSON
|
PARQUET_2
|
AVRO
|
컬럼 스토리지
|
X
|
X
|
Y
|
X
|
압축 가능
|
Y
|
Y
|
Y
|
Y
|
Splittable
|
Y_1
|
Y_1
|
Y
|
Y
|
Human readable
|
Y
|
Y
|
X
|
X
|
Nested structure support
|
X
|
Y
|
Y
|
Y
|
Schema evolution
|
X
|
X
|
Y
|
Y
|
Parquet 빼고 모두 행별로 저장한다.
Splittable이라 함은 HDFS에서 저장할 때 블럭으로 나뉘어 지는데, 그 블럭들이 스파크에서 파티션별로 올라갈 수 있는지의 여부
Y_1 : 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
PARQUET_2 : Spark의 기본 파일 포맷
Parquet: Spark의 기본 파일 포맷
- 트위터와 클라우데라에서 공동 개발 (Doug Cutting)
- Row-Wise Storage(쓰기 최적화)
- Column-Wise Storage (읽기 최적화)
- Hybrid Storage (Row Group) : 데이터 단위로 그룹이 생성됨.
같은 Row Group안에서는 Columnal Storage방식으로 저장된다.
DataFrame에서 다른 포맷 사용 방법 실습
DataFrame.write.format("avro"). ...
DataFrame.write.format("parquet"). ...
from pyspark.sql import *
from pyspark.sql.functions import *
if __name__ == "__main__":
#avro는 path 설정을 해줘야한다.
spark = SparkSession \
.builder \
.appName("Spark Writing Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
.getOrCreate()
df = spark.read \
.format("csv") \
.load("appl_stock.csv")
#파티션 없는(하나의 블럭)형태 - avro type으로 저장
print("Num Partitions before: " + str(df.rdd.getNumPartitions()))
df.groupBy(spark_partition_id()).count().show()
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
#4개로 재분할
df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
- snappy 로 압축한 것은 Splittable한 압축방식이다. 압축이 되었어도 블럭단위로 나눌 수 있다는 장점이 있다.
#4개였던 파티션을 2개로 모으기
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
df2.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataOutput/json/") \
.save()
!ls -tl dataOutput/
Schema Evolution 소개 : Parquet 파일 3개로 테스트
(아래의 표와 같이)서로 다른 스키마를 갖는 3개의 parquet파일을 받는다.
- schema1.parquet
- schema2.parquet
- schema3.parquet
df = spark.read. \
option("mergeSchema", True). \
parquet("*.parquet")
df.printSchema()
Schema Evolution을 사용하면 3개의 데이터프레임을 하나의 데이터프레임을 갖는 형태로 가지고 올 수 있다.
Execution Plan
Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가?
다음 데이터 프레임 연산을 자세히 보자
spark.read.option("header", True).csv(“test.csv”). \
where("gender <> 'F'"). \ # gender가 F 가 아닌
select("name", "gender"). \
groupby("gender"). \
count(). \
show()
- 파티션의 관점에서 바라본다면..
- 로딩이 되는 순간 파티션이 하나 혹은 그 이상의 파티션으로 구성됨
- Where, Select 는 셔플링 없이 파티션과 관련 없음
- 그러나, Groupby를 만나게 되는 순간 groupby의 키에 맞게 정렬이 되어야 하기 때문에 셔플링이 발생한다.
- show 를 통해 드라이브로 가지고 온다. (= write, collect)
- show는 action이라고 부른다. action은 앞에있는 DF를 실제로 수행시키는 역할을 함
(왜? : spark는 Lazy execution을 수행하기 때문)
- 첫번째 stage : WHERE, SELECT
- 두번째 stage : GROUPBY, COUNT
- show는 action이라고 부른다. action은 앞에있는 DF를 실제로 수행시키는 역할을 함
Transformations and Actions
- Transformations
- Narrow Dependencies: 독립적인 Partition level 작업 (select, filter, map 등등..)
- Wide Dependencies: Shuffling이 필요한 작업 (groupby, reduceby, partitionby, repartition 등등..)
- Actions
- Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)
- Lazy Execution 의 장점? 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨
즉, 하나의 Job은 다수의 transformation으로 되고 narrow 또는 wide인지에 따라 stage로 구성됨
Jobs, Stages, Tasks
- Action -> Job -> 1+ Stages -> 1+ Tasks
- Action: Job을 하나 만들어내고 코드가 실제로 실행됨
- Job
- 하나 혹은 그 이상의 Stage로 구성됨
- Stage는 Shuffling(Wide dependency)이 발생하는 경우 새로 생김
- Stage
- DAG의 형태로 구성된 Task들 존재
- 여기 Task들은 병렬 실행이 가능
- Task
- 가장 작은 실행 유닛으로 Executor에 의해 실행됨
Transformations and Actions 시각화
spark.read.option("header", True). \
csv(“test.csv”). \
where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \
show()
# .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨 - 각 컬럼에 타입이 무엇인지 판단하는 과정
- read 에 trigger 되어 job 이 하나 생성된다. 레코드의 헤더를 읽어서 무엇이 있는지 확인한다. (job0)
- 컬럼에서 header가 있다는 것을 있다는 것을 표시해주었기 때문에, 헤더를 만들어주는 주는 job이 생성됨
- show 에 의해 또다른 job이 trigger 되어 생성된다. collect.limit을 통해 드라이버에 보내준다. (job1)
- groupby를 통해 stage 가 나뉘어지고 각 스테이지에서 narrow transformation이 하나의 task로 실행이 된다.
WordCount 코드
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
# load with schema of one column named value
df = spark.read.text("shakespeare.txt")
df.printSchema()
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
input("stopping ...")
- 위 'Transformations and Actions' 에서는 csv의 헤더가 존재하는 것을 표시해주었기 때문에 job이 생성되었는데, WordCount 에서는 df.value 를 split함으로써 job을 생성하지 않게 된다.
이 코드는 몇 개의 Job을 만들어 낼까? : 1개의 잡(두개의 스테이지가 groupby의 앞뒤로 생성된다.)
show()가 없다면 이 코드는 몇 개의 Job을 만들어 낼까? : job이 생성되지 않음(의미없는 일이 생기는것과 진배없음), lazy execution의 이점중 하나가 쓸모없는 코드를 실행하지 않는다는 것이 장점이다.
JOIN 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.appName("Shuffle Join Demo") \
.master("local[3]") \
.config("spark.sql.shuffle.partitions", 3) \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.collect()
input("Waiting ...")
spark.stop()
이 코드는 몇 개의 Job을 만들어 낼까? : 3개의 job이 생성됨(df_large, df_small, df_large/df_small shuffling and merge)
df_small이 충분히 작다면? : shuffle hashing join은 엄청난 오버헤드 발생하게된다. 대신 broadcasting join을 사용하게 된다.
Broadcast join 코드
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
...
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
...
spark.stop()
이 코드는 몇 개의 Job을 만들어 낼까?
df_small이 충분히 작다면? : broadcast 를 통해 명시적으로 가지고 와도 되는데 spark.sql.adaptive.autoBroadcastJoinThreshold 의 옵션을 통해 spark가 얼마나 작은 데이터를 가지고 있는지 판단할 수 있다.
- Spark WebUI로 확인못하고 local의 Standalone 모드에서 확인할 수 있다.
- 돌리려는 코드에서 마지막에 종료하지 않도록 지정해주면 됨 (input("stopping ...") 을 넣고 돌린다.)
Bucketing과 Partitioning
HDFS 데이터를 처리 형태에 맞춰 최적화할 수 있다면 처리 시간을 단축하고 리소스를 덜 사용할 수 있다.
Bucketing과 File System Partitioning 소개
- 둘다 Hive 메타스토어의 사용이 필요: saveAsTable
- 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것
- Bucketing
- 먼저 Aggregation이나 Window 함수나 JOIN(셔플링을 최소화 시키는 것)에서 많이 사용되는 컬럼이 있는지?
- 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장 - 이때의 버킷의 수도 지정
- File System Partitioning : (이때까지는 데이터프레임을 나누는 단위를 의미)
- 원래 Hive에서 많이 사용
- 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
- 위의 컬럼들을 Partition Key라고 부름
Bucketing
- DataFrame을 특정 ID를 기준으로 나눠서 테이블로 저장
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
- DataFrameWriter의 bucketBy 함수 사용
- 인자로 (Bucket의 수, 기준 ID) 지정
- 데이터의 특성을 잘 알고있는 경우 사용가능 : bucketing을 잘하면 join을 할 때 셔플링이 사라진다!
Bucketing 예시
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL 저장하기") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.adaptive.enabled", False) \ #config 에서 최적화를 하지 않음으로서 학습적인 목적으로 Execution plan을 제대로 볼 수 있도록 한다.
.enableHiveSupport() \
.getOrCreate()
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
df_join = df_user_session_channel.join(df_session_timestamp, join_expr, "inner") # 앞의조인: 데이터의 분포가 랜덤하게 되어있는것
df_join.show(10)
spark.sql("DROP TABLE IF EXISTS bk_usc")
spark.sql("DROP TABLE IF EXISTS bk_st")
#3개의 버켓을 구성하고 'sessionid'를 기준으로 join함으로서 그걸 bk_usc, bk_st 라고 명명한다. action 2개 생성됨
df_user_session_channel.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_usc") # 뒤의조인: 세션id 기준으로 같은수의 버켓을 만들어 놓았기 때문에 join할 때 오버헤드가 줄어든다.
df_session_timestamp.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_st")
df_bk_usc = spark.read.table("bk_usc")
df_bk_st = spark.read.table("bk_st")
join_expr2 = df_bk_usc.sessionid == df_bk_st.sessionid
df_join2 = df_bk_usc.join(df_bk_st, join_expr2, "inner")
df_join2.show(10)
input("Waiting ...")
File System Partitioning
- 데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장
DataFrame에서 이야기하는 Partition- Hive에서 사용하는 Partitioning을 말함
- Partitioning의 예와 이점
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 보통 위의 구조로 이미 저장되는 경우가 많음
- 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
- 데이터 관리도 쉬워짐 (Retention Policy 적용시)
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- DataFrameWriter의 partitionBy 사용 - write할 때만 사용
- Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨! Cardinality(가능한 경우의수)가 낮은것을 선택해야한다.
df = df.withColumn("year", year(df.Date)) \ .withColumn("month", month(df.Date))
spark.sql("DROP TABLE IF EXISTS appl_stock")
df.write.partitionBy("year", "month").saveAsTable("appl_stock")
요약...
- Spark 파일 포맷으로 가장 최적은 PARQUET (row그룹으로 되어있는 columnal)
- Spark Job 최적화를 위해서는 Execution Plan 확인
- Bucketing(특정컬럼을 최적화, 재분배)과 Partitioning(특정컬럼을 기준으로, 디렉토리 형태로 저장)을 통해 입력 데이터 구조를 최적화 (HDFS에 존재하는 데이터의 포맷을 최적화)
'Spark' 카테고리의 다른 글
[Spark] Spark 내부동작과 클라우드 옵션(Part3. Spark EMR) (0) | 2024.02.15 |
---|---|
[Spark] Spark 내부동작과 클라우드 옵션(Part2. Spark ML) (2) | 2024.02.14 |
[Spark] Spark 프로그래밍: SQL (3) | 2024.02.13 |
[Spark] Spark 프로그래밍: DataFrame (2) (0) | 2024.02.12 |
[Spark] Spark 프로그래밍: DataFrame (1) (1) | 2024.02.11 |