- Spark ML 소개
- 실습: 머신러닝 모델 만들기
- Spark ML 피쳐변환
- Spark ML Pipeline 살펴보기
Spark ML 소개
Spark이 제공해주는 머신러닝에 대해 배워보자
Spark ML 소개 (1)
- 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리
- Classification, Regression, Clustering, Collaborative Filtering, Dimensionality Reduction(피쳐가 너무 많은 경우). 전체 리스트는 링크 참고
- 아직 딥러닝은 지원은 아직 미약함
- 여기에는 RDD 기반과 데이터프레임 기반의 두 버전이존재
- spark.mllib vs. spark.ml
- spark.mllib가 RDD 기반이고 spark.ml은 데이터프레임 기반
- spark.mllib는 RDD위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨, 항상 spark.ml을 사용할 것!
- spark.mllib vs. spark.ml
Spark ML의 장점
- 원스톱 ML 프레임웍!
- 데이터프레임과 SparkSQL등을 이용해 전처리
- Spark MLlib를 이용해 모델 빌딩
- ML Pipeline을 통해 모델 빌딩 자동화
- MLflow로 모델 관리하고 서빙
- 대용량 데이터도 처리 가능!
Spark ML 소개: MLflow
- 모델의 관리와 서빙을 위한 Ops 관련 기능도 제공
- MLflow
- 모델 개발과 테스트와 관리와 서빙까지 제공해주는 End-to-End 프레임웍
- MLflow는 파이썬, 자바, R, API를 지원
- MLflow는 트래킹(Tracking), 모델(Models), 프로젝트(Projects)를 지원
Spark ML 제공 알고리즘
- Classification: Logistic regression, Decision tree, Random forest, Gradient-boosted tree, ...
- Regression: Linear regression, Decision tree, Random forest, Gradient-boosted tree, ...
- Clustering: K-means, LDA(Latent Dirichlet Allocation), GMM(Gaussian Mixture Model), ..
- Collaborative Filtering(Recommendation): 명시적인 피드백과 암묵적인 피드백 기반
- 명시적인 피드백의 예: 리뷰 평점
- 암묵적인 피드백의 예: 클릭, 구매 등등
실습: 머신러닝 모델 만들기
보스턴 주택가격 예측 모델 만들기: Regression
타이타닉 승객 생존 예측 모델 만들기: Classification
Spark ML 기반 모델 빌딩의 기본 구조
- 여느 라이브러리를 사용한 모델 빌딩과 크게 다르지 않음
- 트레이닝셋 전처리
- 모델 빌딩
- 모델 검증 (confusion matrix)
- Scikit-Learn과 비교했을 때 장점
- 차이점은 결국 데이터의 크기
- Scikit-Learn은 하나의 컴퓨터에서 돌아가는 모델 빌딩
- Spark MLlib는 여러 서버 위에서 모델 빌딩
- 트레이닝셋의 크기가 크면 전처리와 모델 빌딩에 있어 Spark이 큰 장점을 가짐
- Spark은 ML 파이프라인을 통해 모델 개발의 반복을 쉽게 해줌
- 차이점은 결국 데이터의 크기
보스턴 주택가격 예측
- 1970년대 미국 인구조사 서비스 (US Census Service)에서 보스턴 지역의 주택 가격 데이터를 수집한 데이터를 기반으로 모델 빌딩
- CMU Stat Lib Archive : 개별 주택가격의 예측이 아니라 지역별 중간 주택가격 예측임
- Regression 알고리즘 사용 예정: 연속적인 주택가격을 예측하기에 Classification 알고리즘은 사용불가
보스턴 주택가격 트레이닝 셋 보기
- 총 506개의 레코드로 구성되며 13개의 피쳐와 레이블 필드(주택가격) 로 구성
- 506개 동네의 주택 중간값 데이터임 (개별 주택이 아님에 유의)
- 14번째 필드가 바로 예측해야하는 중간 주택 가격
#보스턴 주택 가격 예측 모델 만들기
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Boston Housing Linear Regression example") \
.getOrCreate()
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)
#피쳐 벡터를 만들기
from pyspark.ml.feature
import VectorAssembler
feature_columns = data.columns[:-1] #전체의 컬럼을 의미함 (마지막 label 'medv'값 빼고)
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") #오브젝트 생성
#입력으로 df주면, 출력으로 df가 나온다. (transform은 df던져주면 컬럼이 추가된 df를 반환할 수 있게 한다.)
data_2 = assembler.transform(data)
#훈련용과 테스트용 데이터를 나누고 Linear Regression 모델을 하나 만든다
train, test = data_2.randomSplit([0.7, 0.3])
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features", labelCol="medv")
model = algo.fit(train) #모델 생성
#모델 성능 측정
evaluation_summary = model.evaluate(test)
evaluation_summary #<pyspark.ml.regression.LinearRegressionSummary at 0x7f070bc64e20>
evaluation_summary.meanAbsoluteError #3.078919367841893
evaluation_summary.rootMeanSquaredError #3.934134704246837
evaluation_summary.r2 #0.7596516870936096
#이러한 값으로 base_line으로 판단해서 여러번 시행하는 과정을 거친다.
#모델 예측값 살펴보기
predictions = model.transform(test) # spark에서 transform을 봤을때 입력 DF에서 새로운 컬럼을 붙였구나라고 판단해야함
predictions.select(predictions.columns[13:]).show()
model.save("boston_housing_model")
#학습 데이터를 gdrive에 받아다가 access할 수 있다.
from google.colab import drive drive.mount('/content/gdrive')
model_save_name = "boston_housing_model" path = F"/content/gdrive/My Drive/boston_housing_model" model.save(path)
#저장된 모듈을 가지고 올 때 import시켜야한다.
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load(path) # "boston_housing_model")
predictions2 = loaded_model.transform(test)
#medv, features, prediction 가지고옴
predictions2.select(predictions.columns[13:]).show()
타이타닉 승객 생존 예측
- 머신러닝의 Hello World라고 할 수 있는 굉장히 유명한 데이터셋
- 2015년 캐글에서 “Titanic - Machine Learning from Diaster”라는 이름의 튜토리얼로 시작됨
- Binary Classification 알고리즘 사용 예정
- 생존 혹은 비생존을 예측하는 것이라 Binary Classification을 사용
- 정확히는 Binomial Logistic Regression을 사용 (2개 클래스 분류기)
- AUC (Area Under the Curve)의 값이 중요한 성능 지표가 됨
- True Positive Rate과 False Positive Rate
- True Positive Rate: 생존한 경우를 얼마나 맞게 예측했나? 흔히 Recall이라고 부르기도함
- False Positive Rate: 생존하지 못한 경우를 생존한다고 얼마나 예측했나?
- 총 892개의 레코드로 구성되며 11개의 피쳐와 레이블 필드(생존여부) 로 구성
- 2번째 필드(Survived) 바로 예측해야하는 승객 생존 여부
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
#데이터 클린업:
#
#PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미가 없음).
#Cabin도 비어있는 값이 너무 많아서 사용하지 않을 예정
#Age는 중요한 정보인데 비어있는 레코드들이 많아서 디폴트값을 채워줄 예정
#Gender의 경우 카테고리 정보이기에 숫자로 인코딩 필요
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(final_data)
final_data = imputer_model.transform(final_data)
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
final_data.select("Age", "AgeImputed").show()
#성별 정보 인코딩: male -> 0, female -> 1
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
#피쳐 벡터를 만들기
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data_vec = assembler.transform(final_data)
#훈련용과 테스트용 데이터를 나누고 binary classification 모델을 하나 만든다
train, test = data_vec.randomSplit([0.7, 0.3])
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features", labelCol="Survived")
model = algo.fit(train)
#모델 성능 측정
predictions = model.transform(test)
predictions.select(['Survived','prediction', 'probability']).show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)
Spark ML 피쳐변환
Spark ML가 제공해주는 피쳐변환에 대해 배워보자
피쳐 추출과 변환
- 피쳐 값들을 모델 훈련에 적합한 형태로 바꾸는 것을 지칭
- 크게 두 가지가 존재: Feature Extractor와 Feature Transformer
- Feature Transformer가 하는 일
- 먼저 피쳐 값들은 숫자 필드이어야함: 텍스트 필드(카테고리 값들)를 숫자 필드로 변환해야함
- 숫자 필드값의 범위 표준화 : 숫자 필드라고 해도 가능한 값의 범위를 특정 범위(0부터 1)로 변환해야 함, 이를 피쳐 스케일링 (Feature Scaling) 혹은 정규화 (Normalization)
- 비어있는 필드들의 값을 어떻게 채울 것인가? : Imputer. 앞서 타이타닉 승객 생존 분류기에 써봤음
- Feature Extractor가 하는 일
- 기존 피쳐에서 새로운 피쳐를 추출
- TF-IDF, Word2Vec, ... : 많은 경우 텍스트 데이터를 어떤 형태로 인코딩하는 것이 여기에 해당함
피쳐 변환 StringIndexer: 텍스트 카테고리를 숫자로 변환 (1)
- 아래 왼쪽과 같은 값을 갖는 Color라는 이름의 피쳐가 존재한다면
- 이를 오른쪽과 같은 숫자로 변환해주는 것이 피쳐변환의 목적
피쳐 변환 StringIndexer: 텍스트 카테고리를 숫자로 변환 (2)
- Scikit-Learn은 sklearn.preprocessing 모듈 아래 여러 인코더 존재
- OneHotEncoder, LabelEncoder, OrdinalEncoder, ...
- Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 두 개의 인코더 존재
- StringIndexer, OneHotEncoder
- 사용법은 Indexer 모델을 만들고(fit), Indexer 모델로 데이터프레임을 transform
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data_with_transformed_gender_gender = gender_indexer_model.transform(final_data)
피쳐 변환 Scaler: 숫자 필드값의 범위 표준화 (1)
- 숫자필드 값의 범위를 특정 범위 (예를들면, 0부터 1)로 변환하는것
- 피쳐 스케일링 (Feature Scaling) 혹은 정규화 (Normalization)라 부름
피쳐 변환 Scaler: 숫자 필드값의 범위 표준화 (2)
- Scikit-Learn은 sklearn.preprocessing 모듈 아래 두 개의 스케일러 존재 : StandardScaler, MinMaxScaler
- Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 두 개의 스케일러 존재
- StandardScaler, MinMaxScaler
- 사용법은 Scaler 모델을 만들고(fit), Scaler 모델로 데이터프레임을 transform
- StandardScaler
- 각 값에서 평균을 빼고 이를 표준편차로 나눔. 값의 분포가 정규분포를 따르는 경우 사용
- MinMaxScaler
- 모든 값을 0과 1사이로 스케일. 각 값에서 최소값을 빼고 (최대값-최소값)으로 나눔
피쳐 변환 Imputer: 값이 없는 필드 채우기 (1)
- 값이 존재하지 않는 레코드들이 존재하는 필드들의 경우 기본값을 정해서 채우는 것. Impute한다고 부름
피쳐 변환 Imputer: 값이 없는 필드 채우기 (2)
- Scikit-Learn은 sklearn.preprocessing 모듈 아래 존재 : Imputer
- Spark MLlib의 경우 pyspark.ml.feature 모듈 밑에 존재
- Imputer
- 사용법은 Imputer 모델을 만들고(fit), Imputer 모델로 데이터프레임을 transform
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(final_data)
final_data_age_transformed = imputer_model.transform(final_data)
Spark ML Pipeline 살펴보기
모델 빌딩과 테스트 과정을 자동화하자!
모델 빌딩과 관련된 흔한 문제들
- 트레이닝 셋의 관리가 안됨
- 모델 훈련 방법이 기록이 안됨
- 어떤 트레이닝 셋을 사용했는지?
- 어떤 피쳐들을 사용했는지?
- 하이퍼 파라미터는 무엇을 사용했는지?
- 모델 훈련에 많은 시간 소요
- 모델 훈련이 자동화가 안된 경우 매번 각 스텝들을 노트북 등에서 일일히 수행
- 에러가 발생할 여지가 많음(특정 스텝을 까먹거나 조금 다른방식 적용)
ML Pipeline의 등장
- 앞서 언급한 문제들 중 2와 3을해결!
- 자동화를 통해 에러 소지를 줄이고 반복을 빠르게 가능하게 해줌
Spark ML 관련 개념 정리
- ML 파이프라인이란?
- 데이터 과학자가 머신러닝 개발과 테스트를 쉽게 해주는 기능 (데이터 프레임 기반)
- 머신러닝 알고리즘에 관계없이 일관된 형태의 API를 사용하여 모델링이 가능
- ML 모델개발과 테스트를 반복가능해줌 (데이터프레임, Transformer, Estimator, Parameter)
- 뒤에 설명할 Transformer와 Estimator로 구성됨. (4개의 요소로 구성)
- ML 파이프라인의 구성요소: 데이터 프레임
- ML 파이프라인에서는 데이터프레임이 기본 데이터 포맷
- 기본적으로 CSV, JSON, Parquet, JDBC(관계형 데이터베이스)를 지원
- ML 파이프라인에서 다음 2가지의 새로운 데이터소스를 지원
- 이미지 데이터소스
- jpeg, png 등의 이미지들을 지정된 디렉토리에서 로드
- LIBSVM 데이터소스
- label과 features 두 개의 컬럼으로 구성되는 머신러닝 트레이닝 셋 포맷
- features 컬럼은 벡터 형태의 구조를 가짐
- 이미지 데이터소스
- ML 파이프라인의 구성요소: Transformer (1)
- 입력 데이터프레임을 다른 데이터프레임으로 변환
- 하나 이상의 새로운 컬럼을 추가
- 2 종류의 Transformer가 존재하며 transform이 메인 함수
- Feature Transformer, Learning Model
- 입력 데이터프레임을 다른 데이터프레임으로 변환
- ML 파이프라인의 구성요소: Transformer (2)
- Feature Transformer
- 입력 데이터프레임의 컬럼으로부터 새로운 컬럼을 만들어내 이를 추가한 새로운 데이터프레임을 출력으로 내줌. 보통 피쳐 엔지니어링을 하는데 사용
- 예) Imputer, StringIndexer, VectorAssembler
- Imputer는 기본값 지정에 사용
- StringIndexer는 categorical 정보를 숫자 정보로 변환
- VectorAssembler: 주어진 컬럼들을 통합하여 하나의 벡터 컬럼으로 변환
- Feature Transformer
- ML 파이프라인의 구성요소: Transformer (3)
- Learning Model
- 머신러닝 모델에 해당.
- 피쳐 데이터프레임을 입력으로 받아 예측값이 새로운 컬럼으로 포함된 데이터 프레임을 출력으로 내줌: prediction, probability
- Learning Model
- ML 파이프라인의 구성요소: Feature Transformer
- transform 함수
- ML 파이프라인의 구성요소: Estimator (1)
- 머신러닝 알고리즘에 해당. fit이 메인 함수
- 트레이닝셋 데이터프레임을 입력으로 받아서 머신러닝 모델(Transformer)을 만들어냄
- 입력: 데이터프레임 (트레이닝 셋)
- 출력: 머신러닝 모델
- 예를 들어, LogisticRegression은 Estimator이고 LogisticRegression.fit()를 호출하면 머신 러닝 모델(Transformer)을 만들어냄
- 머신러닝 알고리즘에 해당. fit이 메인 함수
- ML 파이프라인의 구성요소: Estimator (2)
- ML Pipeline도 Estimator
- Estimator는 저장과 읽기 함수 제공 : 즉 모델과 ML Pipeline을 저장했다가 나중에 다시 읽을 수 있음
- save와 load
- ML 파이프라인의 구성요소: Estimator
- ML 파이프라인의 구성요소: Parameter
- Transformer와 Estimator의 공통 API로 다양한 인자를 적용해줌
- 두 종류의 파라미터가 존재:
- Param(하나의 이름과 값)
- ParamMap (Param 리스트)
- 파라미터의 예)
- 훈련 반복수 (iteration) 지정을 위해 setMaxIter()를 사용
- ParamMap(lr.maxIter -> 10)
- 파라미터는 fit (Estimator) 혹은 transform (Transformer)에 인자로 지정 가능
ML Pipeline
- 하나 이상의 Transformer와 Estimator가 연결된 모델링 웍플로우
- 입력은 데이터프레임
- 출력은 머신러닝 모델
- ML Pipeline 그자체도 Estimator
- 따라서 ML Pipeline의 실행은 fit 함수의 호출로 시작
- 저장했다가 나중에 다시 로딩하는 것이 가능 (Persistence)
- 한번 파이프라인을 만들면 반복 모델빌딩이 쉬워짐
요약...
- Spark ML와 제공 기능에 대해 배움
- pyspark.ml을 이용해 실제 머신러닝 모델을 만들어봄
- ML Pipeline을 비롯한 다양한 Spark ML 기능에 대해 배움
'Spark' 카테고리의 다른 글
[Spark] Spark 내부동작과 클라우드 옵션(Part3. Spark EMR) (0) | 2024.02.15 |
---|---|
[Spark] Spark 내부동작과 클라우드 옵션(Part1. Spark 내부동작) (1) | 2024.02.13 |
[Spark] Spark 프로그래밍: SQL (3) | 2024.02.13 |
[Spark] Spark 프로그래밍: DataFrame (2) (0) | 2024.02.12 |
[Spark] Spark 프로그래밍: DataFrame (1) (1) | 2024.02.11 |