본문 바로가기
Spark

[Spark] Spark 프로그래밍: DataFrame (2)

by 개복취 2024. 2. 12.

 

  1. 개발/실습 환경 소개
  2. Spark DataFrame 실습
  3. 요약
  4. +) : Spark vs. Pandas

개발/실습 환경 소개

Spark 개발 환경 옵션

  • Local Standalone Spark + Spark Shell : 한대에서 돌아가는 local모드의 프로그램
  • Python IDE – PyCharm, Visual Studio
  • Databricks Cloud – 커뮤니티 에디션을 무료로 사용
  • 다른 노트북 – 주피터 노트북구글 Colab아나콘다 등등 (colab의 사양이 나쁘지않고, 브라우저를 통해 사용이 용이하다.)

 

Local Standalone Spark

  • Spark Cluster Manager로 local[n : thread 갯수] 지정
    • master local[n]으로 지정
    • master는 클러스터 매니저를 지정하는데 사용
  • 주로 개발이나 간단한 테스트 용도
  • 하나의 JVM에서 모든 프로세스를 실행
    • 하나의 Driver와 하나의 Executor가 실행됨
    • 1+ 쓰레드가 Executor안에서 실행됨
  • Executor안에 생성되는 쓰레드 수
    • local:하나의 쓰레드만 생성
    • local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성

구글 Colab에서 Spark 사용

  • PySpark + Py4J를 설치

Mac에서 Local Standalone Spark 사용

  • Mac Catalina 혹은 이후 버전 기준: Z쉘이 기본으로 사용됨 (그전에는 Bash 쉘)
  • 자바관련설정
    • JDK8/11이 필요: 터미널에서 java -version 명령으로 체크
    • JAVA_HOME 환경변수를 Z쉘 시작 스크립트(~/.zshrc)에 등록
      • echo export "JAVA_HOME=\$(/usr/libexec/java_home)" >> ~/.zshrc

 

Spark DataFrame 실습

colab 환경에서의 pyspark, py4j 패키지 설치

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

  • 실습 1 - 헤더가없는CSV파일처리하기
    • 입력 데이터헤더 없는 CSV 파일
    • 데이터에 스키마 지정하기
    • SparkConf 사용해보기
    • measure_type값이 TMIN인 레코드 대상으로 stationId별 최소 온도 찾기
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]") # colab 의 모든 스레드를 쓰겠다.

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

#데이터에 스키마 지정하기

#1. .toDF를 사용해서 설정하는 방법
df = spark.read.format("csv")\
      .load("1800.csv")\
      .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")      

#2. 간단작업 : inferSchema를 통해 타입을 Spark에게 추측하도록 설정해주는 작업
df = spark.read.format("csv") \
      .option("inferSchema", "true")\
      .load("1800.csv")\
      .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

#3. 매뉴얼로 스키마를 정의하는 작업
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField

schema = StructType([\
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])
                     
df = spark.read.schema(schema).csv("1800.csv")

 

measure_type값이 TMIN인 레코드 대상으로 stationId별 최소 온도 찾기

#Filter out all but TMIN entries
#1번째방법 : filter 사용하기
minTemps = df.filter(df.measure_type == "TMIN")

#2번째방법 : where 메소드를 사용하기 (filter과 동일한 형태로 사용하기)
minTemps = df.where(df.measure_type == "TMIN")

#3번째 방법 : where 메소드를 사용하기 (SQL Expression 으로 필터링 적용)
minTemps = df.where("measure_type = 'TMIN'")

# Aggregate to find minimum for every station
minTempsBystation = minTemps.groupBy("stationID").min("temperature")
minTempsBystation.show()

# 리스트
results = minTempsBystation.collect()

for result in results:
  print(result[0] + "\t{:.2f}F".format(result[1]))
minTempsBystation.show() / 리스트로 되어있는 result값 뽑아올때

 SparkSQL로 처리해보기

df.createOrReplaceTempView("station1800")

results = spark.sql(""" SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY 1""").collect()

for r in results:
    print(r)

 

타입이 Row 인 형태로 출력된다.

 

  • 실습 2 - 헤더 없는 CSV 파일처리하기
    • 입력 데이터헤더 없는 CSV 파일
    • 데이터에 스키마 지정하기: cust_id, item_id, amount_spent를 데이터 컬럼으로 추가하기 (모두 숫자)
    • cust_id를 기준으로 amount_spent의 합을 계산하기
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark DataFrame #2')\
        .getOrCreate()
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([\
                    StructField("cust_id", StringType(), True), \
                    StructField("item_id", StringType(), True), \
                    StructField("amount_spent", FloatType(), True)])

df = spark.read.schema(schema).csv("customer-orders.csv")
df_ca = df.groupBy("cust_id").sum("amount_spent")

# .withColumnRenamed로 컬럼에대해 alias를 해줄 수 있다.
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")

 

sum 이외의 다른 집계함수까지도 지원한다.

import pyspark.sql.functions as f 

df_ca = df.groupBy("cust_id") \
        .agg(f.sum('amount_spent').alias('sum'))
        

# SUM, MAX, AVG값 구하기
df.groupBy("cust_id").agg( f.sum('amount_spent').alias('sum'), f.max('amount_spent').alias('max'), f.avg('amount_spent').alias('avg')).collect()

 

 

 SparkSQL로 처리해보기

df.createOrReplaceTempView("customer_orders")

spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg 
FROM customer_orders GROUP BY 1""").head(5)

spark.catalog.listTables()

spark.catalog.listTables() : 카탈로그에 테이블이 저장된다.
 

Spark은 기본으로 in-memory 카탈로그를 사용. 스토리지 기반의 카탈로그를 쓰고 싶다면 SparkSession 설정할 때 enableHiveSupport()를 호출 (Hive metastore를 카탈로그로 사용하며 Hive UDF와 Hive 파일포맷 사용 가능)

 

  • 실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기 (Regex를 이용해서 아래와 같이 변환해보는 것이 목표)

입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling

regex 패턴: “On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)” 
(
\S (non-whitespace character), \d (numeric character))

 

from pyspark.sql import SparkSession 
from pyspark import SparkConf 

conf = SparkConf() 
conf.set("spark.app.name", "PySpark DataFrame #3") 
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

 

import pyspark.sql.functions as F 
from pyspark.sql.types import * 

schema = StructType([ StructField("text", StringType(), True)]) 
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")

from pyspark.sql.functions import * 
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)' 
df_with_new_columns = transfer_cost_df\
                      .withColumn('week', regexp_extract('text', regex_str, 1))\
                      .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
                      .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
                      .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
                      .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
                      
final_df = df_with_new_columns.drop("text")
final_df.write.csv("extracted.csv")
final_df.write.format("json").save("extracted.json")

 

 

  • 실습 4. Stackoverflow 서베이 기반 인기 언어 찾기
    • stackoverflow CSV파일에서 다음 두 필드는 ;를 구분자로 프로그래밍 언어를 구분
      • LanguageHaveWorkedWith
      • LanguageWantToWorkWith
    • 이를 별개 레코드로 분리하여 가장 많이 사용되는 언어 top 50와 가장 많이 쓰고 싶은 언어 top 50를 계산해보기
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.functions import col, explode

S3_DATA_INPUT_PATH = 's3://spark-tutorial-dataset/survey_results_public.csv'
S3_DATA_OUTPUT_PATH = 's3://spark-tutorial-dataset/data-output'

spark = SparkSession.builder.appName('TopLanguage').getOrCreate()

df = spark.read.csv(S3_DATA_INPUT_PATH, header=True)['ResponseId', 'LanguageHaveWorkedWith', 'LanguageWantToWorkWith']

df2 = df.withColumn("language_have", func.split(func.trim(func.col("LanguageHaveWorkedWith")), ";"))\
        .withColumn("language_want", func.split(func.trim(func.col("LanguageWantToWorkWith")), ";"))

df_language_have = df2.select(df2.ResponseId,explode(df2.language_have).alias("language_have")).groupby("language_have").count()
df_language_want = df2.select(df2.ResponseId,explode(df2.language_want).alias("language_want")).groupby("language_want").count()

df_language50_have = df_language_have.sort("count", ascending=False).limit(50)
df_language50_want = df_language_want.sort("count", ascending=False).limit(50)

df_language50_have.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH+"/language50_have")
df_language50_want.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH+"/language50_want")

print('Selected data is successfully saved to S3: {}'.format(S3_DATA_OUTPUT_PATH))
  • 실습 5. Redshift 연결해보기
    • MAU (Monthly Active User) 계산해보기
    • 두 개의 테이블을 Redshift에서 Spark으로 로드: JDBC 연결 실습
  • DataFrame과 SparkSQL을 사용해서 조인
  • DataFrame JOIN
  • left_DF.join(right_DF, join condition, join type): join type: “inner”, “left”, “right”, “outer”, “semi”, “anti”

요약...

  • 데이터 처리에서 중요한 개념: Partition, Shuffling
  • 데이터 구조: RDD, DataFrame, Dataset
  • Spark Session 생성과 설정
  • DataFrame 프로그래밍

+) 추가 : Spark vs. Pandas

Pandas : 파이썬으로 데이터 분석을 하는데 가장 기본이 되는 모듈 중의 하나

  • 엑셀에서 하는 일을 파이썬에서 가능하게 해주는 모듈이라고 생각하면 됨
  • matplotlib(시각화) scikit-learn(머신러닝)과 같은 다른 파이썬 모듈과 같이 사용됨
  • 한대의 서버에서 다룰수 있는 데이터로 크기가 제약이 됨
  • 큰 데이터의 경우 Spark을 사용, 작은 데이터를 다루는데 굳이 Spark을 쓸 필요가 없음!
  • 병렬 처리를 지원하지 않음: 소규모의 구조화된 데이터 (테이블 형태의 데이터)를 다루는데 최적

Pandas로 할 수 있는 일의 예

  • 구조화된 데이터를 읽어오고 저장하기
    • CSV, JSON 등등 다양한 포맷 지원
    • 웹과 관계형 데이터베이스에서 읽어오는 것도 가능
    • 다양한 통계 뽑아보기
      • 컬럼 별로 평균표준편차, percentile 등 계산하기
      • 컬럼 A와 컬럼 B간의 상관 관계 계산하기 (correlation)
  • 데이터 청소 작업 -> 데이터 전처리
    • 컬럼별로값이존재하지않는경우디폴트값지정하기
    • 컬럼별로 값의 범위를 조정하기 (normalization)
  • Visualization
    • Matplotlib와 연동하여 다양한 형태의 시각화 지원 (히스토그램라인 등등)

 

Pandas의 데이터 구조

  • 엑셀의 시트에 해당하는 것이 Dataframe
  • 엑셀 시트의 컬럼에 해당하는 것이 Series
  • 입력 dataframe을 원하는 최종 dataframe으로 계속 변환하는 것이 핵심

 

Pandas 데이터 변환

  • numpy array로 변환 가능: to_numpy()
  • numpy array에서 Pandas dataframe으로 변환도 가능. 이 경우 컬럼 헤더를 지정해주어야함
  • Spark dataframe으로 변환 가능
  • Spark dataframe에서 Pandas dataframe으로도 변환가능하지만 작은 데이터인 경우에만 의미가 있음 (보통 요약 통계정보나 일부 작은 데이터를 검증용으로 보는 경우)
  • parallelize, collect

요한 개념: Partition, Shuffling ● 데이터 구조: RDD, DataFrame, Dataset
● Spark Session 
생성과 설정
● DataFrame 프로그래밍