#1. 목표: Python 으로 제작된 Algorithm 소스들을 일정 시간동안 Google Cloud 내에서 실행한다.
- 비용등의 문제로 1000건의 Algorithm을 Kubernetes 를 이용하여 동시에 실행처리한다.
#2. 이슈: 기존에 개발된 1000건의 Algorithm을 동시에 처리하기 위해 web-was 를 이용해 multi-processing 처리를 하였으나,
데이터 건수에 따라 Kubernetes 의 Autoscaling 이 적절하게 동작하지 않는다.
#3. 분석: 동시 작업 처리에 대한 프레임웍 사용을 하지 않고, 직접 개발을 통해 진행함으로써 다양한 문제가 발생
a) web-was 방식으로 multi-processing 처리시에 응답시간 문제등 다양한 문제 발생
b) 1000건의 request 를 이미 was 에 입력되고, 그 처리를 이미 입력받은 kubernetes 는 기존 pod 의 적정 처리량을 over 하게 되고, 새로운 pod를 생성하지만 이미 모든 request는 기존 pod로 보냈기 때문에 새로운 pod는 생성 후에도 어떤 request 도 수신하지 못함.
#4. 해결방안: Dataproc, Spark, Ray 등 데이터 처리를 Parallel 하게 진행할 수 있는 프레임워크 사용.
→ Spark 사용하기로 결정 ### 난 Python 도 Spark 도 이번이 처음!!! ###
#5. 해결방안에 대한 반대의견:
1. 기존 1000 여개의 Python 소스를 Spark 형식으로 변경하게 많은 일이 진행되어야 함.
→ 소스 변경없이 처리하기 위해 진행.
Spark on Kubernetes : https://spark.apache.org/docs/latest/running-on-kubernetes.html
Prerequisite
1. Cluster 설치
gcloud container clusters create spark-kube \
--cluster-version 1.15.9-gke.24 \
--zone us-east4-a \
--node-locations us-east4-a,us-east4-b,us-east4-c \
--num-nodes 2 --enable-autoscaling --min-nodes 1 --max-nodes 4
2. node pool 구성 : Algorithm 실행 메모리 사이즈등을 고려하여 구성
a) 메모리 사이즈가 작은 경우 에러발생.
3. Container Repository 생성
gcr.io/{PROJECT ID}/{TAG ID}
4. RBAC(Role Based Access Control) : (
- RBAC 안했을 때 아래와 같은 에러 발생
Caused by:io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://kubernetes.default.svc/api/v1/namespaces/default/pods/spark-pi-15543??????069-driver. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "spark-pi-1554304245069-driver" is forbidden: User "system:serviceaccount:default:default" cannot get resource "pods" in API group "" in the namespace "default".
a) serviceaccount list
》 kubectl get serviceaccount
b) spark serviceaccount 생성
》 kubectl create serviceaccount spark
serviceaccount "spark" created
》 kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
clusterrolebinding "spark-role" created
5. google cloud(GKE) 사용 설정
》 export PROJECT_ID="{PROJECT ID}"
》 export ZONE="us-east4-a"
》 export KUBE_CLUSTER_NAME="{Cluster Name}"
》 gcloud config set project ${PROJECT_ID}
》 gcloud config set compute/zone ${ZONE}
》 gcloud container clusters get-credentials ${KUBE_CLUSTER_NAME}
1. Spark Source Download
- Spark 버전과 package 선택 후 3. Download Spark 링크 선택 후 mirror 페이지 복사 후 실행.
wget http://apache.tt.co.kr/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
2. Dockerfile 수정
- PATH: kubernetes/dockerfile/spark
- Dockerfile 수정 : Python Algorithm 소스 실행을 위한 라이브러리 및 소스 추가(##ADDED)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-jdk-slim
ARG spark_jars=jars
ARG img_path=kubernetes/dockerfiles
ARG k8s_tests=kubernetes/tests
##ADDED
ENV DEBIAN_FRONTEND noninteractive
##ADDED
## install necessary packages
RUN apt-get update -y && apt-get install -y python3 python3-pip python3-numpy \
python3-matplotlib python3-scipy python3-pandas python3-simpy
##ADDED
## set python3 as default (currently python 3.7)
RUN update-alternatives --install "/usr/bin/python" "python" "$(which python3)" 1
##ADDED
## developed python algorithm need library belows.
RUN pip3 install --upgrade six>=1.13.0
RUN pip3 install fire \
smart_open \
google-cloud-storage \
pandas-gbq \
scikit-learn==0.22.2 \
pyjanitor \
xgboost \
lightgbm \
bayesian-optimization \
catboost \
google-cloud-bigquery \
pandas==1.0.1 \
py4j
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .
RUN set -ex && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules libnss3 && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/work-dir && \
touch /opt/spark/RELEASE && \
rm /bin/sh && \
ln -sv /bin/bash /bin/sh && \
echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
rm -rf /var/cache/apt/*
COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY ${k8s_tests} /opt/spark/tests
COPY data /opt/spark/data
##ADDED
## python algorithm sources copy
COPY ml_app/parameter_data /app/
COPY ml_app/*.py /app/
COPY ml_app.zip /app/
COPY ml_app/*.json /app/
COPY ml_app/df_modeling/ /app/df_modeling/
##ADDED
# creadtial json file
ENV GOOGLE_APPLICATION_CREDENTIALS /app/service_aacount.json
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
- Python Algorithm 개발 소스
3. Spark Python Applicatin
- UDF(User Defined Function) 로 Function을 등록하면, Spark 처리로직으로 인식하여 Driver가 Executor 로 전달하여 실행
# -*- coding: utf-8 -*-
# Creation Date : 2020-04-15
# last Modified : 2019-04-15
#
from __future__ import print_function
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, col
import queue
from threading import Thread
def callpython(val):
import main_cns
A = val.split(' ')
main_cns3.main(A[0], A[1], A[2], A[3], A[4], int(A[5]))
return val + " finished"
def main():
# Executor가 실행할 Python library를 library path 설정
conf = SparkConf().setAppName("main_cns") \
.setExecutorEnv('PYTHONPATH', 'PYTHONPATH:/app/ml_app.zip') \
.set("spark.scheduler.mode", "FAIR")
# Python source 의 Entrypoint 를 path 설정.
sc = SparkContext(conf=conf, pyFiles=['/app/main_XXX.py'])
sc.addPyFile('/app/ml_app.zip')
sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false")
## User Defined Function 을 이용하여
## Driver 실행 코드가 Executor에 실행 가능
sqlContext.udf.register('udf_callpython', callpython)
def callpython_t(x):
rdd.map(sqlContext.sql("select udf_callpython('"+x+"') ").show())
return 1
rdd = sc.parallelize([1,2,3], 3)
que = queue.Queue()
threads_list = list()
lines = [line.rstrip() for line in open('/app/parameter_data')]
## Thread 처리를 이용해 Alogrithm python을 동시에 실행
for A in lines:
if not A.startswith('#'):
thread = Thread(target=lambda q, arg1: q.put(callpython_t(arg1)), args=(que, A ))
thread.start()
threads_list.append(thread)
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print(result)
sc.stop()
if __name__ == '__main__':
main()
4. Python Source library 생성
- ml_app 폴더로 이동 후 : zip -r ../ml_app.zip .
5. Docker Image Build
- target dockerfile : kubernetes/dockerfile/spark/Dockerfile
- bin/docker-image-tool.sh 실행하여 Image 생성 후 Container Reposiotory 빌드.
》 bin/docker-image-tool.sh -r gcr.io/{PROJECT_ID} -t {TAG_ID} build
》 bin/docker-image-tool.sh -r gcr.io/{PROJECT_ID} -t {TAG_ID} push
6. Spark Application Run
- MASTER IP Check :
》 kubectl cluster-info | grep master |awk '{print $6}'
- spark 실행
》 bin/spark-submit \
--master k8s://https://{MASETR IP}\
--deploy-mode cluster \
--name spark-py-pi \
--driver-cores 1000m \
--executor-memory 4g \
--conf spark.executor.instances=30 \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=gcr.io/{PROJECT_ID}/spark-py:{TAG_ID} \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--py-files local:///app/ml_app.zip local:///app/main_spark.py
###. Unauthorized Exception ###
- 다음과 같은 에러 발생시
Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: Unauthorized
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onFailure(WatchConnectionManager.java:206)
at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:571)
》 kubectl -n kube-system get secret
참조 : https://www.waitingforcode.com/apache-spark/setting-up-apache-spark-kubernetes-microk8s/read