본문 바로가기
Spark

[Spark] Spark 내부동작과 클라우드 옵션(Part2. Spark ML)

by 개복취 2024. 2. 14.

  1. Spark ML 소개
  2. 실습: 머신러닝 모델 만들기 
  3. Spark ML 피쳐변환
  4. Spark ML Pipeline 살펴보기

Spark ML 소개

Spark이 제공해주는 머신러닝에 대해 배워보자

Spark ML 소개 (1)

  • 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리
    • Classification, Regression, Clustering, Collaborative Filtering, Dimensionality Reduction(피쳐가 너무 많은 경우). 전체 리스트는 링크 참고
    • 아직 딥러닝은 지원은 아직 미약함
  • 여기에는 RDD 기반과 데이터프레임 기반의 두 버전이존재
    • spark.mllib vs. spark.ml
      • spark.mllibRDD 기반이고 spark.ml데이터프레임 기반
      • spark.mllibRDD위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨, 항상 spark.ml을 사용할 것!

Spark ML의 장점

  • 원스톱 ML 프레임웍!
    • 데이터프레임과 SparkSQL등을 이용해 전처리
    • Spark MLlib를 이용해 모델 빌딩
    • ML Pipeline을 통해 모델 빌딩 자동화
    • MLflow로 모델 관리하고 서빙
  • 대용량 데이터도 처리 가능!

Spark ML 소개: MLflow

  • 모델의 관리와 서빙을 위한 Ops 관련 기능도 제공
  • MLflow
    • 모델 개발과 테스트와 관리와 서빙까지 제공해주는 End-to-End 프레임웍
    • MLflow는 파이썬, 자바, R, API를 지원
    • MLflow는 트래킹(Tracking), 모델(Models), 프로젝트(Projects)를 지원

 

Spark ML 제공 알고리즘

  • Classification: Logistic regression, Decision tree, Random forest, Gradient-boosted tree, ...
  • Regression: Linear regression, Decision tree, Random forest, Gradient-boosted tree, ...
  • Clustering: K-means, LDA(Latent Dirichlet Allocation), GMM(Gaussian Mixture Model), ..
  • Collaborative Filtering(Recommendation): 명시적인 피드백암묵적인 피드백 기반
    • 명시적인 피드백의 : 리뷰 평점
    • 암묵적인 피드백의 : 클릭, 구매 등등

 

실습: 머신러닝 모델 만들기

보스턴 주택가격 예측 모델 만들기: Regression 

타이타닉 승객 생존 예측 모델 만들기: Classification

 

Spark ML 기반 모델 빌딩의 기본 구조

  • 여느 라이브러리를 사용한 모델 빌딩과 크게 다르지 않음
    1. 트레이닝셋 전처리
    2. 모델 빌딩
    3. 모델 검증 (confusion matrix)
  • Scikit-Learn과 비교했을 때 장점
    • 차이점은 결국 데이터의 크기
      • Scikit-Learn하나의 컴퓨터에서 돌아가는 모델 빌딩
      • Spark MLlib여러 서버 위에서 모델 빌딩
    • 트레이닝셋의 크기가 크면 전처리와 모델 빌딩에 있어 Spark이 큰 장점을 가짐
    • SparkML 파이프라인을 통해 모델 개발의 반복을 쉽게 해줌

 

보스턴 주택가격 예측

  • 1970년대 미국 인구조사 서비스 (US Census Service)에서 보스턴 지역의 주택 가격 데이터를 수집한 데이터를 기반으로 모델 빌딩
  • CMU Stat Lib Archive : 개별 주택가격의 예측이 아니라 지역별 중간 주택가격 예측임
  • Regression 알고리즘 사용 예정: 연속적인 주택가격을 예측하기에 Classification 알고리즘은 사용불가

 

보스턴 주택가격 트레이닝 셋 보기

  • 총 506개의 레코드로 구성되며 13개의 피쳐와 레이블 필드(주택가격) 로 구성
    • 506개 동네의 주택 중간값 데이터임 (개별 주택이 아님에 유의)
    • 14번째 필드가 바로 예측해야하는 중간 주택 가격

 
#보스턴 주택 가격 예측 모델 만들기

from pyspark.sql import SparkSession 
spark = SparkSession \ 
	.builder \ 
	.appName("Boston Housing Linear Regression example") \ 
	.getOrCreate()
    
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)


#피쳐 벡터를 만들기

from pyspark.ml.feature 
import VectorAssembler 

feature_columns = data.columns[:-1] #전체의 컬럼을 의미함 (마지막 label 'medv'값 빼고)
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") #오브젝트 생성

#입력으로 df주면, 출력으로 df가 나온다. (transform은 df던져주면 컬럼이 추가된 df를 반환할 수 있게 한다.)
data_2 = assembler.transform(data)

#훈련용과 테스트용 데이터를 나누고 Linear Regression 모델을 하나 만든다
train, test = data_2.randomSplit([0.7, 0.3])

 

from pyspark.ml.regression import LinearRegression

algo = LinearRegression(featuresCol="features", labelCol="medv") 
model = algo.fit(train) #모델 생성

#모델 성능 측정
evaluation_summary = model.evaluate(test)
evaluation_summary #<pyspark.ml.regression.LinearRegressionSummary at 0x7f070bc64e20>
evaluation_summary.meanAbsoluteError	#3.078919367841893
evaluation_summary.rootMeanSquaredError	#3.934134704246837
evaluation_summary.r2					#0.7596516870936096
#이러한 값으로 base_line으로 판단해서 여러번 시행하는 과정을 거친다.

#모델 예측값 살펴보기
predictions = model.transform(test) # spark에서 transform을 봤을때 입력 DF에서 새로운 컬럼을 붙였구나라고 판단해야함
predictions.select(predictions.columns[13:]).show()

model.save("boston_housing_model")

#학습 데이터를 gdrive에 받아다가 access할 수 있다.
from google.colab import drive drive.mount('/content/gdrive')
model_save_name = "boston_housing_model" path = F"/content/gdrive/My Drive/boston_housing_model" model.save(path)

#저장된 모듈을 가지고 올 때 import시켜야한다.
from pyspark.ml.regression import LinearRegressionModel 
loaded_model = LinearRegressionModel.load(path) # "boston_housing_model")
predictions2 = loaded_model.transform(test)

#medv, features, prediction 가지고옴
predictions2.select(predictions.columns[13:]).show()

 

타이타닉 승객 생존 예측

  • 머신러닝의 Hello World라고 할 수 있는 굉장히 유명한 데이터셋
  • Binary Classification 알고리즘 사용 예정
    • 생존 혹은 비생존을 예측하는 것이라 Binary Classification을 사용
    • 정확히는 Binomial Logistic Regression을 사용 (2개 클래스 분류기)
  • AUC (Area Under the Curve)의 값이 중요한 성능 지표가 됨
    • True Positive RateFalse Positive Rate
    • True Positive Rate: 생존한 경우를 얼마나 맞게 예측했나? 흔히 Recall이라고 부르기도함
    • False Positive Rate: 생존하지 못한 경우를 생존한다고 얼마나 예측했나?
 
  • 총 892개의 레코드로 구성되며 11개의 피쳐와 레이블 필드(생존여부) 로 구성
    • 2번째 필드(Survived) 바로 예측해야하는 승객 생존 여부

data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)

#데이터 클린업:
#
#PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미가 없음).
#Cabin도 비어있는 값이 너무 많아서 사용하지 않을 예정
#Age는 중요한 정보인데 비어있는 레코드들이 많아서 디폴트값을 채워줄 예정
#Gender의 경우 카테고리 정보이기에 숫자로 인코딩 필요

final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])

from pyspark.ml.feature import Imputer

imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed']) 
imputer_model = imputer.fit(final_data) 
final_data = imputer_model.transform(final_data)
from pyspark.ml.feature import StringIndexer 

gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed') 
gender_indexer_model = gender_indexer.fit(final_data) 
final_data = gender_indexer_model.transform(final_data)

from pyspark.ml.feature import StringIndexer 

gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed') 
gender_indexer_model = gender_indexer.fit(final_data) 
final_data = gender_indexer_model.transform(final_data)

final_data.select("Age", "AgeImputed").show()

#성별 정보 인코딩: male -> 0, female -> 1

from pyspark.ml.feature import StringIndexer 

gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed') 
gender_indexer_model = gender_indexer.fit(final_data) 
final_data = gender_indexer_model.transform(final_data)

 

#피쳐 벡터를 만들기

from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features') 
data_vec = assembler.transform(final_data)

#훈련용과 테스트용 데이터를 나누고 binary classification 모델을 하나 만든다
train, test = data_vec.randomSplit([0.7, 0.3])

from pyspark.ml.classification import LogisticRegression 

algo = LogisticRegression(featuresCol="features", labelCol="Survived") 
model = algo.fit(train)

#모델 성능 측정
predictions = model.transform(test)
predictions.select(['Survived','prediction', 'probability']).show()


from pyspark.ml.evaluation import BinaryClassificationEvaluator 
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC') 
evaluator.evaluate(predictions)

Spark ML 피쳐변환

Spark ML가 제공해주는 피쳐변환에 대해 배워보자

 

피쳐 추출과 변환

  • 피쳐 값들을 모델 훈련에 적합한 형태로 바꾸는 것을 지칭
  • 크게 두 가지가 존재: Feature ExtractorFeature Transformer
  • Feature Transformer가 하는 일
    • 먼저 피쳐 값들은 숫자 필드이어야함: 텍스트 필드(카테고리 값들)를 숫자 필드로 변환해야함
    • 숫자 필드값의 범위 표준화 :  숫자 필드라고 해도 가능한 값의 범위를 특정 범위(0부터 1)로 변환해야 함, 이를 피쳐 스케일링 (Feature Scaling) 혹은 정규화 (Normalization)
    • 비어있는 필드들의 값을 어떻게 채울 것인가? : Imputer. 앞서 타이타닉 승객 생존 분류기에 써봤음
  • Feature Extractor가 하는 일
    • 기존 피쳐에서 새로운 피쳐를 추출
    • TF-IDF, Word2Vec, ... : 많은 경우 텍스트 데이터를 어떤 형태로 인코딩하는 것이 여기에 해당함

 

피쳐 변환 StringIndexer: 텍스트 카테고리를 숫자로 변환 (1)

  • 아래 왼쪽과 같은 값을 갖는 Color라는 이름의 피쳐가 존재한다면
  • 이를 오른쪽과 같은 숫자로 변환해주는 것이 피쳐변환의 목적

 

피쳐 변환 StringIndexer: 텍스트 카테고리를 숫자로 변환 (2)

  • Scikit-Learn은 sklearn.preprocessing 모듈 아래 여러 인코더 존재
    • OneHotEncoder, LabelEncoder, OrdinalEncoder, ...
  • Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 두 개의 인코더 존재
    • StringIndexer, OneHotEncoder
  • 사용법은 Indexer 모델을 만들고(fit), Indexer 모델로 데이터프레임을 transform
from pyspark.ml.feature import StringIndexer

gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed') 
gender_indexer_model = gender_indexer.fit(final_data) 
final_data_with_transformed_gender_gender = gender_indexer_model.transform(final_data)


피쳐 변환 Scaler: 숫자 필드값의 범위 표준화 (1)

  • 숫자필드 값의 범위를 특정 범위 (예를들면, 0부터 1)로 변환하는것
  • 피쳐 스케일링 (Feature Scaling) 혹은 정규화 (Normalization)라 부름

 

피쳐 변환 Scaler: 숫자 필드값의 범위 표준화 (2)

  • Scikit-Learn은 sklearn.preprocessing 모듈 아래 두 개의 스케일러 존재 : StandardScaler, MinMaxScaler
  • Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 두 개의 스케일러 존재
    • StandardScaler, MinMaxScaler
    • 사용법은 Scaler 모델을 만들고(fit), Scaler 모델로 데이터프레임을 transform
  • StandardScaler
    • 각 값에서 평균을 빼고 이를 표준편차로 나눔. 값의 분포가 정규분포를 따르는 경우 사용
  • MinMaxScaler
    • 모든 값을 0과 1사이로 스케일. 각 값에서 최소값을 빼고 (최대값-최소값)으로 나눔

 

 

피쳐 변환 Imputer: 값이 없는 필드 채우기 (1)

  • 값이 존재하지 않는 레코드들이 존재하는 필드들의 경우 기본값을 정해서 채우는 것. Impute한다고 부름

 

피쳐 변환 Imputer: 값이 없는 필드 채우기 (2)

  • Scikit-Learn은 sklearn.preprocessing 모듈 아래 존재 : Imputer
  • Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 존재
    • Imputer
    • 사용법은 Imputer 모델을 만들고(fit), Imputer 모델로 데이터프레임을 transform
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed']) 
imputer_model = imputer.fit(final_data)
final_data_age_transformed = imputer_model.transform(final_data)

 

Spark ML Pipeline 살펴보기

모델 빌딩과 테스트 과정을 자동화하자!

 

모델 빌딩과 관련된 흔한 문제들

  1. 트레이닝 셋의 관리가 안됨
  2. 모델 훈련 방법이 기록이 안됨
    • 어떤 트레이닝 셋을 사용했는지?
    • 어떤 피쳐들을 사용했는지?
    • 하이퍼 파라미터는 무엇을 사용했는지?
  3. 모델 훈련에 많은 시간 소요
    • 모델 훈련이 자동화가 안된 경우 매번 각 스텝들을 노트북 등에서 일일히 수행
    • 에러가 발생할 여지가 많음(특정 스텝을 까먹거나 조금 다른방식 적용)

ML Pipeline의 등장

  • 앞서 언급한 문제들 중 2와 3을해결!
  • 자동화를 통해 에러 소지를 줄이고 반복을 빠르게 가능하게 해줌

 

Spark ML 관련 개념 정리

  • ML 파이프라인이란? 
    • 데이터 과학자가 머신러닝 개발과 테스트를 쉽게 해주는 기능 (데이터 프레임 기반)
    • 머신러닝 알고리즘에 관계없이 일관된 형태의 API를 사용하여 모델링이 가능
    • ML 모델개발과 테스트를 반복가능해줌 (데이터프레임, Transformer, Estimator, Parameter)
      • 뒤에 설명할 TransformerEstimator로 구성됨. (4개의 요소로 구성)
  • ML 파이프라인의 구성요소: 데이터 프레임
    • ML 파이프라인에서는 데이터프레임이 기본 데이터 포맷
    • 기본적으로 CSV, JSON, Parquet, JDBC(관계형 데이터베이스)를 지원
    • ML 파이프라인에서 다음 2가지의 새로운 데이터소스를 지원
      1. 이미지 데이터소스
        • jpeg, png 등의 이미지들을 지정된 디렉토리에서 로드
      2. LIBSVM 데이터소스
        • labelfeatures 두 개의 컬럼으로 구성되는 머신러닝 트레이닝 셋 포맷
        • features 컬럼은 벡터 형태의 구조를 가짐
  • ML 파이프라인의 구성요소: Transformer (1)
    • 입력 데이터프레임을 다른 데이터프레임으로 변환 
      • 하나 이상의 새로운 컬럼을 추가
    • 2 종류의 Transformer가 존재하며 transform이 메인 함수
      • Feature Transformer, Learning Model
  • ML 파이프라인의 구성요소: Transformer (2)
    • Feature Transformer
      • 입력 데이터프레임의 컬럼으로부터 새로운 컬럼을 만들어내 이를 추가한 새로운 데이터프레임을 출력으로 내줌. 보통 피쳐 엔지니어링을 하는데 사용
      • ) Imputer, StringIndexer, VectorAssembler
        1. Imputer기본값 지정에 사용
        2. StringIndexercategorical 정보를 숫자 정보로 변환
        3. VectorAssembler: 주어진 컬럼들을 통합하여 하나의 벡터 컬럼으로 변환
  • ML 파이프라인의 구성요소: Transformer (3)
    • Learning Model
      • 머신러닝 모델에 해당.
      • 피쳐 데이터프레임을 입력으로 받아 예측값이 새로운 컬럼으로 포함된 데이터 프레임을 출력으로 내줌: prediction, probability
  • ML 파이프라인의 구성요소: Feature Transformer
    • transform 함수

 

 

  • ML 파이프라인의 구성요소: Estimator (1)
    • 머신러닝 알고리즘에 해당. fit이 메인 함수
      • 트레이닝셋 데이터프레임을 입력으로 받아서 머신러닝 모델(Transformer)만들어냄
      • 입력: 데이터프레임 (트레이닝 셋)
      • 출력: 머신러닝 모델
      • 예를 들어, LogisticRegressionEstimator이고 LogisticRegression.fit()를 호출하면 머신 러닝 모델(Transformer)을 만들어냄
  • ML 파이프라인의 구성요소: Estimator (2)
    • ML PipelineEstimator
    • Estimator는 저장과 읽기 함수 제공 : 즉 모델과 ML Pipeline을 저장했다가 나중에 다시 읽을 수 있음
    • saveload
  • ML 파이프라인의 구성요소: Estimator
  • ML 파이프라인의 구성요소: Parameter
    • TransformerEstimator의 공통 API로 다양한 인자를 적용해줌
    • 두 종류의 파라미터가 존재:
      • Param(하나의 이름과 값)
      • ParamMap (Param 리스트)
    • 파라미터의 예)
      • 훈련 반복수 (iteration) 지정을 위해 setMaxIter()를 사용
      • ParamMap(lr.maxIter -> 10)
    • 파라미터는 fit (Estimator) 혹은 transform (Transformer)에 인자로 지정 가능

 

 

ML Pipeline

  • 하나 이상의 Transformer와 Estimator가 연결된 모델링 웍플로우
    • 입력은 데이터프레임
    • 출력은 머신러닝 모델
  • ML Pipeline 그자체도 Estimator
    • 따라서 ML Pipeline의 실행은 fit 함수의 호출로 시작
    • 저장했다가 나중에 다시 로딩하는 것이 가능 (Persistence)
  • 한번 파이프라인을 만들면 반복 모델빌딩이 쉬워짐

요약...

  • Spark ML와 제공 기능에 대해 배움
  • pyspark.ml을 이용해 실제 머신러닝 모델을 만들어봄
  • ML Pipeline을 비롯한 다양한 Spark ML 기능에 대해 배움