- Spark SQL
- Aggregation, JOIN, UDF
- Spark SQL 실습
- Hive 메타스토어 사용하기
- 유닛 테스트
- 요약
Spark SQL
- SQL은 빅데이터 세상에서도 중요!
- 데이터 분야에서 일하고자 하면 반드시 익혀야할 기본 기술
- 구조화된 데이터를 다루는한 SQL은 데이터 규모와 상관없이 쓰임
(Data discovery issue : 잘못된 쿼리를 작성해서 문제가 생길 수 있음)
- 모든 대용량 데이터 웨어하우스는 SQL 기반
- Redshift, Snowflake, BigQuery
- Hive/Presto
- Spark도 예외는 아님
- Spark SQL이 지원됨
Spark SQL이란?
- Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈
- 데이터 프레임 작업을 SQL로 처리 가능
- 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능: 판다스에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재
- HQL(Hive Query Language)과 호환 제공
- Hive 테이블들을 읽고 쓸 수 있음 (Hive Metastore)
Spark SQL vs. DataFrame
- 하지만 SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음
- 두개를 동시에 사용할 수 있다는 점 분명히 기억
- Familiarity/Readability: SQL이 가독성이 더 좋고 더 많은 사람들이 사용 가능
- Optimization: Spark SQL 엔진이 최적화하기 더 좋음 (SQL은 Declarative) (Catalyst Optimizer와 Project Tungsten)
- Interoperability/Data Management : SQL이 포팅도 쉽고 접근권한 체크도 쉬움
Spark SQL 사용법 - SQL 사용 방법
- 데이터 프레임을 기반으로 테이블 뷰 생성: 테이블이 만들어짐
- createOrReplaceTempView: Spark Session이 살아있는 동안 존재
- createOrReplaceGlobalTempView: Spark 드라이버가 살아있는 동안 존재
- Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
SparkSession 사용 외부 데이터베이스 연결
- Spark Session의 read 함수를 호출(로그인 관련 정보와 읽어오고자 하는 테이블 혹은 SQL을 지정).
결과가 데이터 프레임으로 리턴됨
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \ #여기에 select문이 들어와도 됨
.load()
Aggregation, JOIN, UDF
Aggregation 함수
- DataFrame이 아닌 SQL로 작성하는 것을 추천
- Group By : SUM, MIN, MAX, COUNT..
- Window : ROW_NUMBER, FIRST_VALUE, LAST_VALUE
- Rank
JOIN
- SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 머지
- 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
- 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
- JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성
- 조인의 방식에 따라 다음 두 가지가 달라짐
- 어떤 레코드들이 선택되는지?
- 어떤 필드들이 채워지는지?
최적화 관점에서 본 조인의 종류들
- Shuffle JOIN : data skew 발생함
- 일반 조인방식
- Bucket JOIN: 조인 키를 바탕으로 새로 파티션을 새로 만들고 조인을 하는 방식
- Broadcast JOIN : 왼쪽, 오른쪽 비교했을 때 하나가 작은 경우 큰 데이터프레임에 작은 데이터프레임을 붙이는 것
- 큰 데이터와 작은 데이터 간의 조인
- 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것 (broadcasting)
- spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
UDF(User Defined Function)이란 무엇인가?
- DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수 (DF에서 .withColumn 함수와 같이 사용하는 것이 일반적)
- Scalar 함수 vs. Aggregation 함수
- Scalar 함수 예: UPPER, LOWER, ...
- Aggregation 함수 (UDAF : pyspark에서는 지원하지 않음) 예: SUM, MIN, MAX
성능이 중요하다면? : Scala나 Java로 구현하는 것이 제일 좋음 파이썬을 사용해야한다면 Pandas UDF로 구현
UDF - DataFrame에 사용해보기
#lambda함수로 등록
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper()) df.withColumn("Curated Name", upperUDF("Name"))
##########
#Python 함수로 등록
def upper(s): return s.upper()
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
UDF - DataFrame, SQL에 사용해보기
#UDF - DataFram에 사용해보기
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data) df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
#####
#UDF - SparkSQL에 사용해보기
def plus(x, y): return x + y
plusUDF = spark.udf.register("plus", plus) spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
UDF - Pandas UDF Scalar 함수 사용
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series: #값들의 집합이 들어왔다가, 값 하나가 나감
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2)
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()
UDF - DataFrame/SQL에 Aggregation 사용
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show() df.agg(averageUDF("b").alias("count")).show()
Spark SQL 실습
- 사용자가 외부 링크를 타고 오거나 직접 방문해서 올 경우 세션을 생성함
- 즉.하나의 사용자 ID는 여러 개의 세션 ID를 가질 수 있음 (구글 Analytics의 정의)
- 보통 세션의 경우 세션을 만들어낸 소스를 채널이란 이름으로 기록해둠 (마케팅 기여도 분석을 위함)
- 또한, 세션이 생긴 시간도 기록한다.
- 이러한 정보를 기반으로 다양한 데이터 분석과 지표 설정이 가능하다 : 마케팅 관련, 사용자 트래픽 관련
매출 사용자 10명 알아내기 (Ranking)
top_rev_user_df = spark.sql("""
SELECT userid,
SUM(str.amount) revenue
SUM(CASE WHEN str.refunded = False THEN str.amount END) net_revenue #(refund 포함x)
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid GROUP BY 1
ORDER BY 2 DESC
LIMIT 10""")
top_rev_user_df.show()
월별 채널별 매출(refund 포함)과 총 방문자, 매출 발생 방문자, 전환률 정보 계산하기 (Grouping)
SELECT LEFT(ts, 7) "month", channel,
COUNT(DISTINCT usc.userid) uniqueUsers,
COUNT(DISTINCT CASE WHEN amount > 0 THEN usc.userid END) paidUsers,
ROUND(paidUsers::float*100/NULLIF(uniqueUsers, 0),2) conversionRate,
SUM(amount) grossRevenue,
SUM(CASE WHEN refunded is False THEN amount END) netRevenue
ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END) * 100
/ COUNT(DISTINCT userid), 2) conversionRate
FROM raw_data.user_session_channel usc
LEFT JOIN raw_data.session_timestamp t ON t.sessionid = usc.sessionid
LEFT JOIN raw_data.session_transaction st ON st.sessionid = usc.sessionid
GROUP BY 1, 2;
사용자별로 처음 채널과 마지막 채널 알아내기 (Windowing)
WITH RECORD AS ( /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/
SELECT userid, channel,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
FROM user_session_channel u
LEFT JOIN session_timestamp t ON u.sessionid = t.sessionid
)
SELECT/*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
f.userid,
f.channel first_channel,
l.channel last_channel
FROM RECORD f
JOIN RECORD l ON f.userid = l.userid
WHERE f.seq_first = 1 or l.seq_last = 1
ORDER BY userid
#################
SELECT DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid;
Hive 메타스토어 사용하기
Spark 데이터베이스와 테이블 (1)
- 카탈로그: 테이블과 뷰에 관한 메타 데이터 관리
- 기본으로 메모리 기반 카탈로그 제공 (세션이 끝나면 사라짐)
- Hive와 호환되는 카탈로그 제공 - Persistent!
- 테이블 관리 방식 : 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리 (2단계)
- 특정한 데이터베이스 지칭을 '데이터베이스이름.테이블'의 형태로 지칭한다.
Spark 데이터베이스와 테이블 (2)
- 메모리 기반 테이블/뷰: 임시 테이블로 앞서 사용해봤음
- 스토리지 기반 테이블
- 기본적으로 HDFS와 Parquet 포맷을 사용, Hive와 호환되는 메타스토어 사용
- 두 종류의 테이블이 존재 (Hive와 동일한 개념)
- Managed Table
- Spark이 실제 데이터와 메타 데이터 모두 관리
- Unmanaged (External) Table
- Spark가 메타 데이터만 관리
- Managed Table
Spark SQL - 스토리지 기반 카탈로그 사용 방법
- Hive와 호환되는 메타스토어 사용
- SparkSession 생성시 enableHiveSupport() 호출
- 기본으로 “default”라는 이름의 데이터베이스 생성
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
Spark SQL - Managed Table 사용 방법
- 두 가지 테이블 생성방법
- dataframe.saveAsTable("테이블이름")
- SQL 문법 사용 (CREATE TABLE, CTAS)
- spark.sql.warehouse.dir가 가리키는 위치에 데이터가 저장됨
- "PARQUET"이 기본 데이터 포맷
- 선호하는 테이블 타입
- Spark 테이블로 처리하는 것의 장점 (파일로 저장하는 것과 비교시)
- JDBC/ODBC등으로 Spark을 연결해서 접근 가능 (태블로, 파워BI)
Spark SQL - External Table 사용 방법
- 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용 - LOCATION이란 프로퍼티 사용 (위치를 기반으로 로딩하겠다.)
- 메타 데이터만 카탈로그에 기록됨
- 데이터는 이미 존재.
- External Table은 삭제되어도 데이터는 그대로임
CREATE TABLE table_name (
column1 type1,
column2 type2,
column3 type3,
... )
USING PARQUET
LOCATION 'hdfs_path';
유닛 테스트
유닛 테스트란?
- 코드 상의 특정 기능 (보통 메소드의 형태)을 테스트하기 위해 작성된 코드
- 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트
- CI/CD를 사용하려면 전체 코드의 테스트 커버러지가 굉장히 중요해짐
- 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적
- JUnit for Java
- NUnit for .NET
- unittest for Python
요약...
- 데이터 분석에 SQL이 필수적
- Spark에는 Spark SQL 모듈이 존재 : 데이터프레임을 SQL로 처리 가능
- Hive 메타스토어와 같이 많이 사용
- 코드와 SQL 검증을 위한 유닛 테스트 작성
'Spark' 카테고리의 다른 글
[Spark] Spark 내부동작과 클라우드 옵션(Part2. Spark ML) (2) | 2024.02.14 |
---|---|
[Spark] Spark 내부동작과 클라우드 옵션(Part1. Spark 내부동작) (1) | 2024.02.13 |
[Spark] Spark 프로그래밍: DataFrame (2) (0) | 2024.02.12 |
[Spark] Spark 프로그래밍: DataFrame (1) (1) | 2024.02.11 |
[Spark] 빅데이터 처리와 Spark 소개(2) (1) | 2024.02.10 |