본문 바로가기
Spark

[Spark] Spark 프로그래밍: SQL

by 개복취 2024. 2. 13.

  1. Spark SQL
  2. Aggregation, JOIN, UDF
  3. Spark SQL 실습
  4. Hive 메타스토어 사용하기
  5. 유닛 테스트
  6. 요약

Spark SQL

  • SQL은 빅데이터 세상에서도 중요!
    • 데이터 분야에서 일하고자 하면 반드시 익혀야할 기본 기술 
    • 구조화된 데이터를 다루는한 SQL은 데이터 규모와 상관없이 쓰임
      (Data discovery issue : 잘못된 쿼리를 작성해서 문제가 생길 수 있음)
  • 모든 대용량 데이터 웨어하우스는 SQL 기반
    • Redshift, Snowflake, BigQuery
    • Hive/Presto
  • Spark도 예외는 아님
    • Spark SQL이 지원됨

 

Spark SQL이란?

  • Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈
  • 데이터 프레임 작업을 SQL로 처리 가능
    • 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능: 판다스에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재
  • HQL(Hive Query Language)과 호환 제공
    • Hive 테이블들을 읽고 쓸 수 있음 (Hive Metastore)

Spark SQL vs. DataFrame

  • 하지만 SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음
    • 두개를 동시에 사용할 수 있다는 점 분명히 기억
  1. Familiarity/Readability: SQL이 가독성이 더 좋고 더 많은 사람들이 사용 가능
  2. Optimization: Spark SQL 엔진이 최적화하기 더 좋음 (SQLDeclarative) (Catalyst OptimizerProject Tungsten)
  3. Interoperability/Data Management : SQL포팅도 쉽고 접근권한 체크도 쉬움

Spark SQL 사용법 - SQL 사용 방법

  • 데이터 프레임을 기반으로 테이블 뷰 생성: 테이블이 만들어짐
    • createOrReplaceTempView: Spark Session이 살아있는 동안 존재
    • createOrReplaceGlobalTempView: Spark 드라이버가 살아있는 동안 존재
  • Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음
namegender_df.createOrReplaceTempView("namegender") 
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1 
""")

print(namegender_group_df.collect())

 

SparkSession 사용 외부 데이터베이스 연결

  • Spark Session의 read 함수를 호출(로그인 관련 정보와 읽어오고자 하는 테이블 혹은 SQL을 지정).
    결과가 데이터 프레임으로 리턴됨
df_user_session_channel = spark.read \
	.format("jdbc") \
	.option("driver", "com.amazon.redshift.jdbc42.Driver") \
	.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
	.option("dbtable", "raw_data.user_session_channel") \ #여기에 select문이 들어와도 됨
	.load()

 

Aggregation, JOIN, UDF

 

Aggregation 함수

  • DataFrame이 아닌 SQL로 작성하는 것을 추천
    • Group By : SUM, MIN, MAX, COUNT..
    • Window : ROW_NUMBER, FIRST_VALUE, LAST_VALUE
    • Rank

JOIN

  • SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 머지
  • 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
  • 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
    • JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성
    • 조인의 방식에 따라 다음 두 가지가 달라짐
      1. 어떤 레코드들이 선택되는지?
      2. 어떤 필드들이 채워지는지?

최적화 관점에서 본 조인의 종류들

  • Shuffle JOIN : data skew 발생함
    • 일반 조인방식
    • Bucket JOIN: 조인 키를 바탕으로 새로 파티션을 새로 만들고 조인을 하는 방식
  • Broadcast JOIN : 왼쪽, 오른쪽 비교했을 때 하나가 작은 경우 큰 데이터프레임에 작은 데이터프레임을 붙이는 것
    • 큰 데이터와 작은 데이터 간의 조인
    • 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것 (broadcasting)
  • spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
JOIN을 그림으로 이해하기 /  Broadcast JOIN을 그림으로 이해하기

UDF(User Defined Function)이란 무엇인가?

  • DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수 (DF에서 .withColumn 함수와 같이 사용하는 것이 일반적)
  • Scalar 함수 vs. Aggregation 함수
    • Scalar 함수 예: UPPER, LOWER, ...
    • Aggregation 함수 (UDAF : pyspark에서는 지원하지 않음) : SUM, MIN, MAX
    •  
 

성능이 중요하다면? : ScalaJava로 구현하는 것이 제일 좋음 파이썬을 사용해야한다면 Pandas UDF로 구현

 

 

UDF - DataFrame에 사용해보기

#lambda함수로 등록

import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper()) df.withColumn("Curated Name", upperUDF("Name"))

##########

#Python 함수로 등록
def upper(s): return s.upper()

# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

 

 

UDF - DataFrame, SQL에 사용해보기

#UDF - DataFram에 사용해보기

data = [
	{"a": 1, "b": 2}, 
	{"a": 5, "b": 5}
]
df = spark.createDataFrame(data) df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))

#####

#UDF - SparkSQL에 사용해보기

def plus(x, y): return x + y

plusUDF = spark.udf.register("plus", plus) spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

UDF - Pandas UDF Scalar 함수 사용

from pyspark.sql.functions import pandas_udf 
import pandas as pd

@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series: #값들의 집합이 들어왔다가, 값 하나가 나감
	return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf2)

df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()

 

UDF - DataFrame/SQLAggregation 사용

from pyspark.sql.functions import pandas_udf 
import pandas as pd

@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
	return v.mean()

averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show() df.agg(averageUDF("b").alias("count")).show()

 

Spark SQL 실습

 

  • 사용자가 외부 링크를 타고 오거나 직접 방문해서 올 경우 세션을 생성함
  • 즉.하나의 사용자 ID는 여러 개의 세션 ID를 가질 수 있음 (구글 Analytics의 정의)
  • 보통 세션의 경우 세션을 만들어낸 소스를 채널이란 이름으로 기록해둠 (마케팅 기여도 분석을 위함)
  • 또한, 세션이 생긴 시간도 기록한다.
  • 이러한 정보를 기반으로 다양한 데이터 분석과 지표 설정이 가능하다 : 마케팅 관련, 사용자 트래픽 관련

매출 사용자 10명 알아내기 (Ranking)

top_rev_user_df = spark.sql("""
SELECT userid, 
	SUM(str.amount) revenue
    SUM(CASE WHEN str.refunded = False THEN str.amount END) net_revenue  #(refund 포함x)
	FROM user_session_channel usc
	LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid 
	LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid GROUP BY 1
	ORDER BY 2 DESC
	LIMIT 10""")

top_rev_user_df.show()

 

월별 채널별 매출(refund  포함)과 총 방문자, 매출 발생 방문자, 전환률 정보 계산하기 (Grouping)

SELECT LEFT(ts, 7) "month", channel,
    COUNT(DISTINCT usc.userid) uniqueUsers,
    COUNT(DISTINCT CASE WHEN amount > 0 THEN usc.userid END) paidUsers, 
    ROUND(paidUsers::float*100/NULLIF(uniqueUsers, 0),2) conversionRate, 
    SUM(amount) grossRevenue,
    SUM(CASE WHEN refunded is False THEN amount END) netRevenue
    ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END) * 100
    	/ COUNT(DISTINCT userid), 2) conversionRate
FROM raw_data.user_session_channel usc
LEFT JOIN raw_data.session_timestamp t ON t.sessionid = usc.sessionid 
LEFT JOIN raw_data.session_transaction st ON st.sessionid = usc.sessionid 
GROUP BY 1, 2;

 

사용자별로 처음 채널과 마지막 채널 알아내기 (Windowing)

WITH RECORD AS ( /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/ 
SELECT userid, channel,
	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last 
FROM user_session_channel u
LEFT JOIN session_timestamp t ON u.sessionid = t.sessionid
)
SELECT/*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
	f.userid,
	f.channel first_channel,
	l.channel last_channel 
FROM RECORD f
JOIN RECORD l ON f.userid = l.userid
WHERE f.seq_first = 1 or l.seq_last = 1
ORDER BY userid

#################

SELECT DISTINCT A.userid,
	FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel, 
	LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel 
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid;

Hive 메타스토어 사용하기

Spark 데이터베이스와 테이블 (1)

  • 카탈로그: 테이블과 뷰에 관한 메타 데이터 관리
    • 기본으로 메모리 기반 카탈로그 제공 (세션이 끝나면 사라짐)
    • Hive와 호환되는 카탈로그 제공 - Persistent!
  • 테이블 관리 방식 : 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리 (2단계)
    • 특정한 데이터베이스 지칭을 '데이터베이스이름.테이블'의 형태로 지칭한다.

Spark 데이터베이스와 테이블 (2)

  • 메모리 기반 테이블/: 임시 테이블로 앞서 사용해봤음
  • 스토리지 기반 테이블
    • 기본적으로 HDFSParquet 포맷을 사용, Hive와 호환되는 메타스토어 사용
    • 두 종류의 테이블이 존재 (Hive와 동일한 개념)
      1. Managed Table
        • Spark이 실제 데이터와 메타 데이터 모두 관리
      2. Unmanaged (External) Table
        • Spark가 메타 데이터만 관리

Spark SQL - 스토리지 기반 카탈로그 사용 방법

  • Hive와 호환되는 메타스토어 사용
  • SparkSession 생성시 enableHiveSupport() 호출
    • 기본으로 default라는 이름의 데이터베이스 생성
from pyspark.sql import SparkSession
spark = SparkSession \
	.builder \
	.appName("Python Spark Hive") \
	.enableHiveSupport() \
	.getOrCreate()


Spark SQL - Managed Table 사용 방법

  • 두 가지 테이블 생성방법
    1. dataframe.saveAsTable("테이블이름")
    2. SQL 문법 사용 (CREATE TABLE, CTAS)
  • spark.sql.warehouse.dir가 가리키는 위치에 데이터가 저장됨
    • "PARQUET"이 기본 데이터 포맷
  • 선호하는 테이블 타입
  • Spark 테이블로 처리하는 것의 장점 (파일로 저장하는 것과 비교시)
    • JDBC/ODBC등으로 Spark 연결해서 접근 가능 (태블로, 파워BI)

 

Spark SQL - External Table 사용 방법

  • 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용 - LOCATION이란 프로퍼티 사용 (위치를 기반으로 로딩하겠다.)
  • 메타 데이터만 카탈로그에 기록됨 
    • 데이터는 이미 존재.
    • External Table은 삭제되어도 데이터는 그대로임
CREATE TABLE table_name ( 
	column1 type1,
	column2 type2,
	column3 type3,

... )

USING PARQUET

LOCATION 'hdfs_path';

 

유닛 테스트

유닛 테스트란?

  • 코드 상의 특정 기능 (보통 메소드의 형태) 테스트하기 위해 작성된 코드
  • 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트
  • CI/CD를 사용하려면 전체 코드의 테스트 커버러지가 굉장히 중요해짐
  •  각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적
    • JUnit for Java
    • NUnit for .NET
    • unittest for Python

요약...

  • 데이터 분석에 SQL이 필수적
  • Spark에는 Spark SQL 모듈이 존재 : 데이터프레임을 SQL로 처리 가능
  • Hive 메타스토어와 같이 많이 사용
  • 코드와 SQL 검증을 위한 유닛 테스트 작성