HDMI-2 connected primary 1920x1080+0+0 (normal left inverted right x axis y axis) 477mm x 268mm DP-3 connected 1024x768+1920+0 (normal left inverted right x axis y axis) 0mm x 0mm
#1. 목표: Python 으로 제작된 Algorithm 소스들을 일정 시간동안 Google Cloud 내에서 실행한다.
- 비용등의 문제로 1000건의 Algorithm을 Kubernetes 를 이용하여 동시에 실행처리한다.
#2. 이슈: 기존에 개발된 1000건의 Algorithm을 동시에 처리하기 위해 web-was 를 이용해 multi-processing 처리를 하였으나,
데이터 건수에 따라 Kubernetes 의 Autoscaling 이 적절하게 동작하지 않는다.
#3. 분석: 동시 작업 처리에 대한 프레임웍 사용을 하지 않고, 직접 개발을 통해 진행함으로써 다양한 문제가 발생
a) web-was 방식으로 multi-processing 처리시에 응답시간 문제등 다양한 문제 발생
b) 1000건의 request 를 이미 was 에 입력되고, 그 처리를 이미 입력받은 kubernetes 는 기존 pod 의 적정 처리량을 over 하게 되고, 새로운 pod를 생성하지만 이미 모든 request는 기존 pod로 보냈기 때문에 새로운 pod는 생성 후에도 어떤 request 도 수신하지 못함.
#4. 해결방안: Dataproc, Spark, Ray 등 데이터 처리를 Parallel 하게 진행할 수 있는 프레임워크 사용.
→ Spark 사용하기로 결정 ### 난 Python 도 Spark 도 이번이 처음!!! ###
#5. 해결방안에 대한 반대의견:
1. 기존 1000 여개의 Python 소스를 Spark 형식으로 변경하게 많은 일이 진행되어야 함.
Caused by:io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://kubernetes.default.svc/api/v1/namespaces/default/pods/spark-pi-15543??????069-driver. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "spark-pi-1554304245069-driver" is forbidden: User "system:serviceaccount:default:default" cannot get resource "pods" in API group "" in the namespace "default".
- Dockerfile 수정 : Python Algorithm 소스 실행을 위한 라이브러리 및 소스 추가(##ADDED)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-jdk-slim
ARG spark_jars=jars
ARG img_path=kubernetes/dockerfiles
ARG k8s_tests=kubernetes/tests
##ADDED
ENV DEBIAN_FRONTEND noninteractive
##ADDED
## install necessary packages
RUN apt-get update -y && apt-get install -y python3 python3-pip python3-numpy \
python3-matplotlib python3-scipy python3-pandas python3-simpy
##ADDED
## set python3 as default (currently python 3.7)
RUN update-alternatives --install "/usr/bin/python" "python" "$(which python3)" 1
##ADDED
## developed python algorithm need library belows.
RUN pip3 install --upgrade six>=1.13.0
RUN pip3 install fire \
smart_open \
google-cloud-storage \
pandas-gbq \
scikit-learn==0.22.2 \
pyjanitor \
xgboost \
lightgbm \
bayesian-optimization \
catboost \
google-cloud-bigquery \
pandas==1.0.1 \
py4j
# Before building the docker image, first build and make a Spark distribution following
# the instructions in http://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .
RUN set -ex && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules libnss3 && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/work-dir && \
touch /opt/spark/RELEASE && \
rm /bin/sh && \
ln -sv /bin/bash /bin/sh && \
echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
rm -rf /var/cache/apt/*
COPY ${spark_jars} /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY ${k8s_tests} /opt/spark/tests
COPY data /opt/spark/data
##ADDED
## python algorithm sources copy
COPY ml_app/parameter_data /app/
COPY ml_app/*.py /app/
COPY ml_app.zip /app/
COPY ml_app/*.json /app/
COPY ml_app/df_modeling/ /app/df_modeling/
##ADDED
# creadtial json file
ENV GOOGLE_APPLICATION_CREDENTIALS /app/service_aacount.json
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
- Python Algorithm 개발 소스
3. Spark Python Applicatin
- UDF(User Defined Function) 로 Function을 등록하면, Spark 처리로직으로 인식하여 Driver가 Executor 로 전달하여 실행
# -*- coding: utf-8 -*-
# Creation Date : 2020-04-15
# last Modified : 2019-04-15
#
from __future__ import print_function
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, col
import queue
from threading import Thread
def callpython(val):
import main_cns
A = val.split(' ')
main_cns3.main(A[0], A[1], A[2], A[3], A[4], int(A[5]))
return val + " finished"
def main():
# Executor가 실행할 Python library를 library path 설정
conf = SparkConf().setAppName("main_cns") \
.setExecutorEnv('PYTHONPATH', 'PYTHONPATH:/app/ml_app.zip') \
.set("spark.scheduler.mode", "FAIR")
# Python source 의 Entrypoint 를 path 설정.
sc = SparkContext(conf=conf, pyFiles=['/app/main_XXX.py'])
sc.addPyFile('/app/ml_app.zip')
sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false")
## User Defined Function 을 이용하여
## Driver 실행 코드가 Executor에 실행 가능
sqlContext.udf.register('udf_callpython', callpython)
def callpython_t(x):
rdd.map(sqlContext.sql("select udf_callpython('"+x+"') ").show())
return 1
rdd = sc.parallelize([1,2,3], 3)
que = queue.Queue()
threads_list = list()
lines = [line.rstrip() for line in open('/app/parameter_data')]
## Thread 처리를 이용해 Alogrithm python을 동시에 실행
for A in lines:
if not A.startswith('#'):
thread = Thread(target=lambda q, arg1: q.put(callpython_t(arg1)), args=(que, A ))
thread.start()
threads_list.append(thread)
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print(result)
sc.stop()
if __name__ == '__main__':
main()