본문 바로가기
Spark

[Spark] Spark 내부동작과 클라우드 옵션(Part1. Spark 내부동작)

by 개복취 2024. 2. 13.

  1. Spark 파일 포맷
  2. Execution Plan
  3. Bucketing과 Partitioning
  4. 요약

Spark 파일 포맷

 
  • 데이터는 디스크에 파일로 저장됨: 일에 맞게 최적화 필요
    (Unstructured, semi-structured는 사람이 읽을 수 있음, Structured는 사람이 읽을 수 없음)

 

Spark의 주요 파일 타입

특징
CSV
JSON
PARQUET_2
AVRO
컬럼 스토리지
X
X
Y
X
압축 가능
Y
Y
Y
Y
Splittable
Y_1
Y_1
Y
Y
Human readable
Y
Y
X
X
Nested structure support
X
Y
Y
Y
Schema evolution
X
X
Y
Y

Parquet 빼고 모두 행별로 저장한다.

Splittable이라 함은 HDFS에서 저장할 때 블럭으로 나뉘어 지는데, 그 블럭들이 스파크에서 파티션별로 올라갈 수 있는지의 여부

 

Y_1 : 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)

PARQUET_2 : Spark의 기본 파일 포맷

 

Parquet: Spark의 기본 파일 포맷

  • 트위터와 클라우데라에서 공동 개발 (Doug Cutting)
    • Row-Wise Storage(쓰기 최적화)
    • Column-Wise Storage (읽기 최적화)
    • Hybrid Storage (Row Group) : 데이터 단위로 그룹이 생성됨.
      같은 Row Group안에서는 Columnal Storage방식으로 저장된다.

 

DataFrame에서 다른 포맷 사용 방법 실습

DataFrame.write.format("avro"). ...

DataFrame.write.format("parquet"). ...

from pyspark.sql import * 
from pyspark.sql.functions import * 

if __name__ == "__main__": 
	#avro는 path 설정을 해줘야한다.
	spark = SparkSession \ 
    		.builder \ 
            .appName("Spark Writing Demo") \ 
            .master("local[3]") \ 
            .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
            .getOrCreate()
            
	df = spark.read \ 
		.format("csv") \ 
		.load("appl_stock.csv")

 

#파티션 없는(하나의 블럭)형태 - avro type으로 저장
print("Num Partitions before: " + str(df.rdd.getNumPartitions())) 
df.groupBy(spark_partition_id()).count().show()

df.write \ 
    .format("avro") \ 
    .mode("overwrite") \
    .option("path", "dataOutput/avro/") \
    .save()
#4개로 재분할
df2 = df.repartition(4) 
print("Num Partitions after: " + str(df2.rdd.getNumPartitions())) 
df2.groupBy(spark_partition_id()).count().show()

df2.write \ 
    .format("parquet") \ 
    .mode("overwrite") \ 
    .option("path", "dataOutput/parquet/") \ 
    .save()
Partition / 4 parquet type
  • snappy 로 압축한 것은 Splittable한 압축방식이다. 압축이 되었어도 블럭단위로 나눌 수 있다는 장점이 있다.
#4개였던 파티션을 2개로 모으기
df3 = df2.coalesce(2) 
print("Num Partitions after: " + str(df3.rdd.getNumPartitions())) 
df3.groupBy(spark_partition_id()).count().show()

df2.write \ 
    .format("json") \ 
    .mode("overwrite") \ 
    .option("path", "dataOutput/json/") \ 
    .save()
!ls -tl dataOutput/
data type 형태로 저장이 된다. (디렉토리 내부에 파티션된 파일들이 존재함)

 

 

Schema Evolution 소개 : Parquet 파일 3개로 테스트

(아래의 표와 같이)서로 다른 스키마를 갖는 3개의 parquet파일을 받는다.

  • schema1.parquet
  • schema2.parquet
  • schema3.parquet
df = spark.read. \ 
    option("mergeSchema", True). \ 
    parquet("*.parquet") 
    
df.printSchema()

 

Schema Evolution을 사용하면 3개의 데이터프레임을 하나의 데이터프레임을 갖는 형태로 가지고 올 수 있다.

 

Execution Plan

Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가?

 

다음 데이터 프레임 연산을 자세히 보자

spark.read.option("header", True).csv(“test.csv”). \
	where("gender <> 'F'"). \ # gender가 F 가 아닌
	select("name", "gender"). \
	groupby("gender"). \
	count(). \
	show()
  • 파티션의 관점에서 바라본다면..
    • 로딩이 되는 순간 파티션이 하나 혹은 그 이상의 파티션으로 구성됨
    • Where, Select 는 셔플링 없이 파티션과 관련 없음
    • 그러나, Groupby를 만나게 되는 순간 groupby의 키에 맞게 정렬이 되어야 하기 때문에 셔플링이 발생한다.
    • show 를 통해 드라이브로 가지고 온다. (= write, collect)
      • show는 action이라고 부른다. action은 앞에있는 DF를 실제로 수행시키는 역할을 함
        (왜? : spark는 Lazy execution을 수행하기 때문)
        • 첫번째 stage : WHERE, SELECT
        • 두번째 stage : GROUPBY, COUNT

 

Transformations and Actions

  • Transformations
    • Narrow Dependencies: 독립적인 Partition level 작업 (select, filter, map 등등..)
    • Wide Dependencies: Shuffling이 필요한 작업 (groupby, reduceby, partitionby, repartition 등등..)
  • Actions
    • Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)
    • Lazy Execution 의 장점? 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨

즉, 하나의 Job은 다수의 transformation으로 되고 narrow 또는 wide인지에 따라 stage로 구성됨

 

 

Jobs, Stages, Tasks

  • Action -> Job -> 1+ Stages -> 1+ Tasks
  • Action: Job을 하나 만들어내고 코드가 실제로 실행됨
  • Job
    • 하나 혹은 그 이상의 Stage로 구성됨
    • StageShuffling(Wide dependency)이 발생하는 경우 새로 생김
  • Stage
    • DAG의 형태로 구성된 Task들 존재
    • 여기 Task들은 병렬 실행이 가능
  • Task
    • 가장 작은 실행 유닛으로 Executor에 의해 실행됨 

 

Transformations and Actions 시각화

spark.read.option("header", True). \ 
        csv(“test.csv”). \
        where("gender <> 'F'"). \ 
        select("name", "gender"). \ 
        groupby("gender"). \
        count(). \ 
        show()

# .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨 - 각 컬럼에 타입이 무엇인지 판단하는 과정
  • read 에 trigger 되어 job 이 하나 생성된다. 레코드의 헤더를 읽어서 무엇이 있는지 확인한다. (job0)
    • 컬럼에서 header가 있다는 것을 있다는 것을 표시해주었기 때문에, 헤더를 만들어주는 주는 job이 생성됨
  • show 에 의해 또다른 job이 trigger 되어 생성된다. collect.limit을 통해 드라이버에 보내준다. (job1)
  • groupby를 통해 stage 가 나뉘어지고 각 스테이지에서 narrow transformation이 하나의 task로 실행이 된다.
Transformation and Action 시각화

 

 

WordCount 코드

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("SparkSchemaDemo") \
    .config("spark.sql.adaptive.enabled", False) \
    .config("spark.sql.shuffle.partitions", 3) \
    .getOrCreate()


# load with schema of one column named value
df = spark.read.text("shakespeare.txt")
df.printSchema()

df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()

df_count.show()

input("stopping ...")
  • 위 'Transformations and Actions' 에서는 csv의 헤더가 존재하는 것을 표시해주었기 때문에 job이 생성되었는데, WordCount 에서는 df.value 를 split함으로써 job을 생성하지 않게 된다.

이 코드는 몇 개의 Job을 만들어 낼까? : 1개의 잡(두개의 스테이지가 groupby의 앞뒤로 생성된다.)

show()가 없다면 이 코드는 몇 개의 Job을 만들어 낼까? : job이 생성되지 않음(의미없는 일이 생기는것과 진배없음), lazy execution의 이점중 하나가 쓸모없는 코드를 실행하지 않는다는 것이 장점이다.

job level 에서의 visualizing / query level 에서의 visualizing

 

JOIN 코드

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

spark = SparkSession \
    .builder \
    .appName("Shuffle Join Demo") \
    .master("local[3]") \
    .config("spark.sql.shuffle.partitions", 3) \
    .config("spark.sql.adaptive.enabled", False) \
    .getOrCreate()

df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")

join_df.collect()
input("Waiting ...")

spark.stop()

이 코드는 몇 개의 Job을 만들어 낼까? : 3개의 job이 생성됨(df_large, df_small, df_large/df_small shuffling and merge)

df_small이 충분히 작다면? : shuffle hashing join은 엄청난 오버헤드 발생하게된다. 대신 broadcasting join을 사용하게 된다.

Query 레벨에서의 JOIN Query Visualization

 

 

 Broadcast join 코드

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

...

join_df = df_large.join(broadcast(df_small), join_expr, "inner")

...

spark.stop()

이 코드는 몇 개의 Job을 만들어 낼까?

df_small이 충분히 작다면? : broadcast 를 통해 명시적으로 가지고 와도 되는데 spark.sql.adaptive.autoBroadcastJoinThreshold 의 옵션을 통해 spark가 얼마나 작은 데이터를 가지고 있는지 판단할 수 있다.

 

  • Spark WebUI로 확인못하고 local의 Standalone 모드에서 확인할 수 있다.
  • 돌리려는 코드에서 마지막에 종료하지 않도록 지정해주면 됨 (input("stopping ...") 을 넣고 돌린다.)

Bucketing과 Partitioning

HDFS 데이터를 처리 형태에 맞춰 최적화할 수 있다면 처리 시간을 단축하고 리소스를 덜 사용할 수 있다.

 

BucketingFile System Partitioning 소개

  • 둘다 Hive 메타스토어의 사용이 필요: saveAsTable
  • 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것

 

  • Bucketing
    • 먼저 Aggregation이나 Window 함수나 JOIN(셔플링을 최소화 시키는 것)에서 많이 사용되는 컬럼이 있는지?
    • 있다면 데이터를 이 특정 컬럼()을 기준으로 테이블로 저장 - 이때의 버킷의 수도 지정
  • File System Partitioning : (이때까지는 데이터프레임을 나누는 단위를 의미)
    • 원래 Hive에서 많이 사용
    • 데이터의 특정 컬럼()을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
    • 위의 컬럼들을 Partition Key라고 부름
 

Bucketing

  • DataFrame을 특정 ID를 기준으로 나눠서 테이블로 저장
    • 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
    • DataFrameWriterbucketBy 함수 사용
    • 인자로 (Bucket의 수, 기준 ID) 지정
  • 데이터의 특성을 잘 알고있는 경우 사용가능 : bucketing을 잘하면 join을 할 때 셔플링이 사라진다!

 

Bucketing 예시

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL 저장하기") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \ 
    .config("spark.sql.adaptive.enabled", False) \ #config 에서 최적화를 하지 않음으로서 학습적인 목적으로 Execution plan을 제대로 볼 수 있도록 한다.
    .enableHiveSupport() \
    .getOrCreate()

# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"

df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
df_join = df_user_session_channel.join(df_session_timestamp, join_expr, "inner") # 앞의조인: 데이터의 분포가 랜덤하게 되어있는것
df_join.show(10)

spark.sql("DROP TABLE IF EXISTS bk_usc")
spark.sql("DROP TABLE IF EXISTS bk_st")

#3개의 버켓을 구성하고 'sessionid'를 기준으로 join함으로서 그걸 bk_usc, bk_st 라고 명명한다. action 2개 생성됨
df_user_session_channel.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_usc") # 뒤의조인: 세션id 기준으로 같은수의 버켓을 만들어 놓았기 때문에 join할 때 오버헤드가 줄어든다.
df_session_timestamp.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_st")

df_bk_usc = spark.read.table("bk_usc")
df_bk_st = spark.read.table("bk_st")

join_expr2 = df_bk_usc.sessionid == df_bk_st.sessionid
df_join2 = df_bk_usc.join(df_bk_st, join_expr2, "inner")

df_join2.show(10)

input("Waiting ...")
bucketing이 잘 되어있는걸 확인할 수 있다.

File System Partitioning

  • 데이터를 Partition Key 기반 폴더 (“Partition") 구조로 물리적으로 나눠 저장
    • DataFrame에서 이야기하는 Partition
    • Hive에서 사용하는 Partitioning을 말함
  • Partitioning의 예와 이점
    • 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
      1. 데이터 자체를 연도--의 폴더 구조로 저장
      2. 보통 위의 구조로 이미 저장되는 경우가 많음
    • 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
    • 데이터 관리도 쉬워짐 (Retention Policy 적용시)

 

  • DataFrameWriter의 partitionBy 사용 - write할 때만 사용
    • Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨! Cardinality(가능한 경우의수)가 낮은것을 선택해야한다. 

 

df = df.withColumn("year", year(df.Date)) \ .withColumn("month", month(df.Date))

spark.sql("DROP TABLE IF EXISTS appl_stock")

df.write.partitionBy("year", "month").saveAsTable("appl_stock")

요약...

  • Spark 파일 포맷으로 가장 최적은 PARQUET (row그룹으로 되어있는 columnal)
  • Spark Job 최적화를 위해서는 Execution Plan 확인
  • Bucketing(특정컬럼을 최적화, 재분배)Partitioning(특정컬럼을 기준으로, 디렉토리 형태로 저장)을 통해 입력 데이터 구조를 최적화 (HDFS에 존재하는 데이터의 포맷을 최적화)