On the journey of
[PySPARK] 11. 머신러닝 파이프라인 관리, 배포 및 확장 본문
[PySPARK] 11. 머신러닝 파이프라인 관리, 배포 및 확장
dlrpskdi 2023. 10. 12. 23:02AWS와...여러 교육과..논문에 치여 1달 만에(마지막 chapter 10이 ... 9월 10일 글이었다) PySPARK 글을 쓴다....
https://nowolver.tistory.com/148
[PySPARK] MLlib을 사용한 머신러닝
지금까지는 스파크를 사용한 데이터 엔지니어링 워크로드에 중점을 뒀지만 이번장에서는 데이터를 활용한 머신러닝에 초점을 둘 것. 아파치 스파크의 머신러닝 라이브러리인 MLlib을 사용하여 M
nowolver.tistory.com
모델 관리
머신러닝 솔루션의 종단 간 재현성
- 모델을 생성한 코드, 훈련에 사용된 환경, 훈련된 데이터 및 모델 자체를 재현할 수 있어야 함
재현성 관리 예시
- 라이브러리 버전 관리
- 라이브러리 버전이 언급되지 않은 경우 일반적으로 최신 버전을 설치하게 됨
- 코드가 이전 버전의 라이브러리를 기반으로 빌드되어 설치된 버전과 다른 기본 동작을 활용할 수 있는 경우, 최신 버전을 사용하면 코드가 손상되거나 결과가 달라질 수 있음
- 데이터 진화
- 시간이 지난 후 기본 데이터가 변경된다면 파이프라인이 중단되거나 결과가 달라질 수 있음
- 초기 빌드 후 누군가가 새로운 열을 추가하거나 더 많은 데이터를 추가한 경우 발생할 수 있음
- 실행 순서
- 코드는 오류 없이 위에서 아래로 실행되어야 함
- 순서가 맞지 않거나 상태 보전적인 셀(stateful cell)을 여러 번 실행할 경우 결과를 재현하기 어렵게 만듬
- 병렬 작업
- 처리량을 최대화하기 위해 GPU는 많은 작업을 병렬로 실행
- 실행 순서가 항상 보장되는 것은 아니므로 결과가 비결정적일 수 있음
재현할 수 없다는 것은 모델을 채택하거나 생산에 투입하는 데 방해가 될 수 있음
- 모델, 데이터, 종속성 버전 추적을 위해 자체 내부 도구를 구축할 수는 있지만 오류가 발생되기 쉽고 유지 관리에 많은 노력이 필요
- 업계 표준을 갖는 것이 필요
머신러닝 실험 재현을 위한 오픈소스 및 상용 툴이 있으며, 이 섹션에서는 ML플로에 초점을 맞출 것
- MLflow는 개발자가 실험을 재현 및 공유하고, 모델을 관리하는 등의 작업을 수행하는 데 도움이 되는 오픈소스 플랫폼이다.
- Python, R 및 Java, Scala의 인터페이스와 REST API를 제공한다.
- MLflow에는 네 가지 주요 구성 요소가 있다. (아래 Fig 11-1 참조)
- 트래킹(Tracking): 매개변수, 메트릭, 코드 버전, 모델 그리고 plot 및 text와 같은 아티팩트를 기록하는 API를 제공한다.
- 프로젝트(Projects): 다른 플랫폼에서 실행하기 위해 데이터 과학 프로젝트 및 해당 종속성을 패키징하는 표준화된 형식이다.
- 모델 훈련 프로세스를 관리하는 데 도움이 된다.
- 모델(Models): 다양한 실행 환경에 배포하기 위해 모델을 패키징하는 표준화된 형식이다. 모델을 필드하는 데 사용된 알고리즘이나 라이브러리에 관계없이 모델을 로드하고 적용하기 위한 일관된 API를 제공한다.
- 레지스트리(Registry): 모델 계보, 모델 버전, 단계 전환 및 주석을 트랭킹하는 저장소이다.
트래킹
- ML플로 트래킹은 실제로 학습을 수행하는 라이브러리 및 환경에 구애받지 않는 로깅 API
- 데이터 과학 코드를 실행하는 실행 개념을 중심으로 구성되어 있음
- 실행은 여러 실행이 주어진 실험의 일부가 될 수 있도록 집계
- ML플로 트래킹 서버는 많은 실험을 호스팅할 수 있고, 그 후 노트북, 로컬 앱, 클라우드 작업을 사용하여 트래킹 서버에 로그인 할 수 있음
트래킹 서버에 기록할 수 있는 사항
- 매개변수
- 코드에 대한 키/값 입력 (Ex. 하이퍼파라미터)
- 메트릭
- 숫자값 (Ex. RMSE 또는 정확도 값)
- 아티팩트
- 파일, 데이터 및 모델
- 메타데이터
- 실행된 소스코드 또는 코드 버전에 대한 실행 정보
- 모델
- 학습한 모델
기본적으로 트래킹 서버는 모든것을 파일 시스템에 기록하지만, 매개변수 및 메트릭과 같이 더 빠른 쿼리가 필요할 때 데이터베이스를 지정할 수 있다. 아래 예제를 보자.
# 파이썬 예제
filePath = "C:/Pyspark/pyspark_dataset/LearningSparkV2-master/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"
airbnbDF = spark.read.csv(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed = 42)
categoricalCols = [field for (field, dataType) in trainDF.dtypes
if dataType == "string"]
indexOutputCols = [x+"Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols = categoricalCols,
outputCols = indexOutputCols,
handleInvalid = "skip")
numericCols = [field for (field, dataType) in trainDF.dtypes
if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
rf = RandomForestRegressor(labelCol = "price", maxBins = 40, maxDepth = 5, numTrees = 100, seed = 42)
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
터미널에서 mlflow ui 명령어 입력하면 https:/localhost:5000/ 에서 아래와 같은 이미지를 확인할 수 있음
사용자는 아래와 같은 코드로 MLflow 클라이언트 또는 REST API를 사용하여 트래킹 서버를 쿼리할 수도 있다.
from mlflow.tracking import MlflowClient
client = MlflowClient()
runs = client.search_runs(run.info.experiment_id, order_by=['attributes.start_time desc"],max_results=1)
run_id = runs[0].info_run_id
run[0].data.metrics
MLlib을 사용한 모델 배포 옵션
- 머신러닝 모델의 배포
- 조직과 사용 사례에 따라 다름
- 비즈니스 제약조건은 대기 시간, 처리량, 비용 등 다양한 요구사항 부과
- 배치, 스트리밍, 실시간, 모바일/임베디드 등 배포 모드
- 3가지 예측 모델의 배포 옵션 예시
- 처리량, 대기시간의 Trade-Off를 보여줌(아래 표 참조)
- 배치
- 정기적인 일정에 따라 예측 생성, 다른 곳에 제공될 결과를 영구 저장소에 기록
- 예정된 실행 중에만 컴퓨팅 비용 지불 → 가장 저렴하고 쉬운 배포 옵션
- 모든 예측 작업이 분할되어 누적되는 오버헤드가 적어 데이터 포인트당 훨씬 더 효율적
- 스파크의 경우, 드라이버와 실행기 사이 앞뒤로 통신하는 오버헤드로 특히 영향도가 큼
- 단점: 다음 예측 배치를 생성하기 위해 몇 시간 또는 며칠로 예약되어 지연시간이 있음
- 스트리밍
- 처리량과 대기 시간 간의 적절한 균형 제공
- 마이크로 데이터 배치에 대해 지속적으로 예측 → 몇 초~ 몇 분 만에 예측
- 정형화 스트리밍의 경우 거의 모든 코드가 배치 사용 사례와 동일 → 배치로 쉽게 전환 가능
- VM 또는 컴퓨팅 리소스에 대한 비용을 지불해야함
- 스트림이 내결함성을 유지하고 수신 데이터 스파이크 시에 버퍼링을 제공할 수 있는지 확인
- 실시간 배포
- 처리량보다 대기 시간을 우선시하고 몇 밀리초 만에 예측 결과 생성
- 인프라는 로드 밸런싱을 지원해야 하며 수요가 급등하는 경우 많은 동시 요청으로 확장할 수 있어야 함
- 스파크가 대기 시간 요구사항을 충족할 수 없는 유일한 옵션
- 스파크 외부로 모델을 보내야 함
- ex) REST 엔드포인트
- 모델을 스파크 외부로 기능을 준비하고 가져오는데, 시간이 많이 걸리며 어려울 수 있음
- 모델링 프로세스 시작 전 : 모델 베포 요구사항 정의해야 함
배치
머신러닝 모델 배포의 대부분 사용 사례 나타냄 (구현하기 가장 쉬운 옵션)
- 일반 작업 실행 : 예측 생성, 결과 저장 (데이터베이스, 데이터 레이크 등)
- model.transform() : 데이터 프레임 모든 파티션에 병렬로 모델 적용
# 파이썬 예제
# 플로우를 사용하여 저장된 모델을 로드
import mlflow.spark
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")
# 예측 생성
inputDF = spark.read.parquet("/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet")
predDF = pipelineModel.transform(inputDF)
- 배포 시 염두 해야할 사항들은 대표적으로 아래와 같은 것들이 있다.
얼마나 자주 예측을 생성할 것인가?
얼마나 자주 모델을 재교육할 예정인가?
모델을 어떻게 버전화할 것인가?
스트리밍
정형화 스트리밍은 데이터를 처리하고 예측을 생성하기 위해 시간별 또는 야간 작업을 기다리는 대신 들어오는 데이터에 대한 추론을 지속적으로 수행할 수 있다.
이 접근 방식은 컴퓨팅 시간에 대해 지속적으로 비용을 지불해야 하므로 배치 솔루션보다 비용이 많이 들지만 예측을 더 자주 생성하여 더 빠른 조치를 취할 수 있다는 추가 이점을 얻을 수 있다.
#ML플로를 사용하여 저장된 모델을 로드
pipelineModel = mlflow.spark.load_model(f”run:/{run_id}/model”)
#시뮬레이션된 스트리밍 데이터를 셋업
repartitionedPath=’/databricks-dataasets/learning=spark-v2/sf-airbnb
sf-airbnb-clean-100p.parquet”
schema=spark.read.parquet(reparttitionPath).schema
streamingData = (spark
.readStream
.schema(schema) #이 방식으로 스키마를 설정할 수 있음
.option(”maxFilesPerTrigger”,1)
.parquet(repartitionedPath))
#예측 생성
streamPred = pipelineModel.transfirm(strimingData)
실시간 추론을 위한 모델 내보내기 패턴
- 사기 탐지, 광고 추천 등 실시간 추론이 필요한 일부 영역이 있다.
- 짧은 지연 시간을 제공하는 관리형 서비스 중엔 Amazon 세이지메이커(SageMaker) 및 Azure ML과 같은 솔루션이 인기가 높다.
- 이 섹션에서는 해당 서비스에 배포할 수 있도록 MLlib 모델을 내보내는 방법을 보여줄 것이다.
- 스파크에서 모델을 내보내는 한 가지 방법은 기본적으로 파이썬, C 등으로 모델을 다시 구현하는 것이다. 모델의 계수를 추출하는 것이 간단해 보일 수 있으나 모든 피처 엔지니어링 및 전처리 단계를 함께 내보내는 것(OneHotEncoder, VectorAssembler 등)은 굉장히 번거롭고 오류가 발생하기 쉽다.
- MLeap3 및 ONNX14와 같은 몇 가지 오픈소스 라이브러리가 있어 지원되는 MLlib 모델 하위 집합을 자동으로 내보내 스파크에 대한 종속성을 제거할 수 있다.
- MLlib 모델을 내보내는 대신, XGBoost5 및 H2Oai의 스파클링 워터Sparkling Water 16(이름은 H2O와 스파크의 조합에서 파생됨)와 같은 스파크와 통합될 수 있는 다른 써드파티 라이브러리도 실시간 시나리오에서 배포하기 편리하게 되어 있다.
이때 XGBoost란!?
-구조화된 데이터 문제에 대한 캐글kaggle 대회에서 가장 성공적인 알고리즘 중 하나
-데이터 과학자들 사이에서 매우 인기 있는 라이브러리
아래 설명된 대로 MLlib 파이프라인을 훈련한 후, XGBoost 모델을 추출하고 파이썬에서 제공하기 위해 비스파크 모델로 저장할 수 있다.
#파이썬 예제
import XGBoost as xgb
bst = xgb.Booster({'nthread': 4})
bst.load_model("XGBoost_native_model")
Llib 파이프라인을 훈련한 후, XGBoost 모델을 추출하고 파이썬에서 제공하기 위해 비스파크 모델로 저장할 수 있다.
⇒ 실시간 제공 환경에서 사용하기 위해 MLlib 모델을 내보내는 다양한 방법에 대해 배웠으므로 이제 MLlib이 아닌 모델에 Spark를 활용하는 방법에 대해 학습
비MLlib 모델에 스파크 활용
- 해당 섹션에서 다루는 것
- 스파크를 사용하여 판다스 UDF를 사용하여 단일 노드 모델의 분산 추론을 수행
- 하이퍼파라미터 조정을 수행
- 피처 엔지니어링을 확장하는 방법
MLlib 항상 머신러닝 요구사항에 가장 적합한 솔루션이라고는 할 수 없다. Spark MLlib에서 제공하는 알고리즘이 아닐 수 있고, 매우 짧은 대기 시간 요구를 충족하지 못할 수 있다. 그런 경우 활용할 수 있는 Spark의 다른 솔루션들을 살펴보자.
판다스 UDF
MLlib가 모델 분산 학습에 효과적인 것은 맞지만, 사용자 정의 함수로도 사이킷런&텐서플로 모델을 데이터의 하위집합으로 빌드하고 스파크를 사용해 전체 데이터 세트에 대해 분산 추론을 수행할 수 있다.
파이썬에서 데이터 프레임의 각 레코드에 모델을 적용하고자 한다면 판다스 UDF가 최적화되어 있다.
먼저 Pandas와 PySpark를 비교해보자.
- PySpark - 즉시 사용 가능한 데이터 변환을 허용하며 빅데이터 워크로드에 사용되는 분산 처리 시스템
- Pandas - PySpark는 더 많은 기능을 사용할 수 있어 강력하지만 메모리 처리 특성으로 매우 큰 데이터 세트는 처리할 수 없다.
Spark 3.0의 pandas API 예제
- mapInPandas() - pandas.dataframe을 사용하여 기능을 수행하고 결과로 새로운 pandas.dataframe을 반환한다.
- groupBy() - 데이터 세트 전체를 셔플하여 각 그룹의 데이터와 모델을 단일 시스템으로 맞춰 모델을 병렬화한다.
import mlflow.sklearn
import pandas as pd
def predict(iterator):
model_path = f"runs:/{run_id}/random-forest-model"
model = mlflow.sklearn.load_model(model_path)
for features in iterator:
yield pd.DataFrame(model.predict(features))
df mapInPandas(predict, "prediction double").show(3)
df.groupBy("device_id").applyInPandas(build_model, schema=trainReturnSchema)
Joblib
경량 파이프라이닝을 제공하는 도구세트
데이터 복사본을 자동으로 브로드캐스트하여서 하이퍼파라미터를 조정할 수 있음
→ 모델을 병렬로 훈련하고 평가가 가능
→ 그러나 단일 모델과 모든 데이터가 단일 시스템에 맞아야 한다는 한계가 존재
pip install joblibspark를 통해 설치, 사이킷런 0.21 이상 및 파이스파크 2.4.4 이상 사용
# In Python
from sklearn.utils import parallel_backend
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
import pandas as pd
from joblibspark import register_spark
register_spark() # Register Spark backend
df = pd.read_csv("/dbfs/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-numeric.csv")
X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1),
df[["price"]].values.ravel(), random_state=42)
rf = RandomForestRegressor(random_state=42)
param_grid = {"max_depth": [2, 5, 10], "n_estimators": [20, 50, 100]}
gscv = GridSearchCV(rf, param_grid, cv=3)
with parallel_backend("spark", n_jobs=3):
gscv.fit(X_train, y_train)
print(gscv.cv_results_)
Distributed Cross Validation에서 반환된 매개 변수에 대한 설명은 사이킷런 GridSearchCV 문서를 참고하였다 :)
HyperOPT(하이퍼옵트)
실제값, 이산 및 조건부 차원을 포함할 수 있는 다루기 힘든(awkward) 검색 공간에 대한 직렬 및 병렬 최적화를 위한 파이썬 라이브러리로, pip install hyperopt로 설치한다. 스파크로 하이퍼옵트를 확장하는 방법은 2가지가 있다.
- 분산 교육 알고리즘(MLlib)과 함께 단일 머신 하이퍼옵트 사용
- 하이퍼옵트와 MLlib 사용에 대해 크게 제약 사항은 없음
- SparkTrials 클래스와 함께 단일 머신 교육 알고리즘과 함께 분산 하이퍼옵트 사용 분산 하이퍼파리미터 평가를 분산 교육 모델과 결합 X
케라스 모델에 대한 하이퍼파라미터 검색 병렬화는 깃허브 저장소에서 진행되며, 아래 코드는 하이퍼옵트의 주요 구성요소를 설명하기 위한 코드이다.
import hyperopt
best_hyperparameters = hyperopt.fmin(
fn = training_function,
space = search_space,
algo = hyperopt.tpe.suggest,
max_evals = 64,
trials = hyperopt.SparkTrials(SparkTrials(parallelism=4)
)
- fmin()은 training_function에 사용할 새로운 하이퍼파라미터 구성을 생성
- 1번을 SparkTrials 전달
- SparkTrials는 훈련 작업의 배치를 각 스파크 실행기에서 단일 작업 스파크 작업으로 병렬 실행
- 작업이 끝나면 결과와 해당 손실을 드라이버에 반환
위 과정을 통해 더 나은 하이퍼파라미터를 구성을 계산 → 대규모로 확장 가능, MLflow에서 트래킹 가능
- parallelism은 동시에 평가할 최대 시도 횟수를 결정한다.
- 1이면 각 모델을 순차적으로 훈련하지만 적응 알고리즘을 최대한 활용 해 더 나은 모델을 도출
- parallelism=max_evals는 무작위 검색을 수행
- 1과 max_evals 사이의 숫자를 활용해 확장성과 적응성 간의 균형을 유지
- 병렬 처리는 스파크의 실행기 수로 설정
- fmin()에서 timeout도 지정가능
요약
- MLlib은 대규모 데이터에는 좋지만 작은 데이터로 실시간을 추론하는 것에는 비추천.
- 배포 요구사항은 여러가지 변수가 있으므로 모델 구축 프로세스 전에 고려해야 한다.
'Experiences & Study > PySPARK & Data Engineering' 카테고리의 다른 글
[PySPARK] MLlib을 사용한 머신러닝 (0) | 2023.09.10 |
---|---|
[PySPARK] 정형 스트리밍 Part.1 (0) | 2023.09.05 |
[PySPARK] 스파크 애플리케이션의 최적화 및 튜닝 (0) | 2023.09.04 |
[PySPARK] 스파크 SQL과 데이터세트 (0) | 2023.09.03 |
[PySPARK] 스파크 SQL과 데이터프레임 Part.2 (2) (0) | 2023.09.03 |