像Google一樣構建機器學習系統2

按照上篇文章搭建了一套Kubeflow Pipelines之後,我們一起小試牛刀,用一個真實的案例,學習如何開發一套基於Kubeflow Pipelines的機器學習工作流。

準備工作

機器學習工作流是一個任務驅動的流程,同時也是數據驅動的流程,這裡涉及到數據的導入和準備,模型訓練Checkpoint的導出評估,到最終模型的導出。這就需要分佈式存儲作為傳輸的媒介,這裡使用NAS作為分佈式存儲。

  • 創建分佈式存儲,這裡以NAS為例。這裡NFS_SERVER_IP需要替換成真實NAS服務器地址

1.創建阿里雲NAS服務,可以參考文檔

2.需要在 NFS Server 中創建 /data

# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs

3.創建對應的Persistent Volume.

# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: user-susan
labels:
user-susan: pipelines
spec:
persistentVolumeReclaimPolicy: Retain
capacity:
storage: 10Gi
accessModes:

- ReadWriteMany
nfs:
server: NFS_SERVER_IP
path: "/data"

# kubectl create -f nfs-pv.yaml

4.創建Persistent Volume Claim

# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: user-susan
annotations:
description: "this is the mnist demo"
owner: Tom
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
selector:
matchLabels:
user-susan: pipelines
# kubectl create -f nfs-pvc.yaml

開發Pipeline

由於Kubeflow Pipelines提供的例子都是依賴於Google的存儲服務,這導致國內的用戶無法真正體驗Pipelines的能力。阿里雲容器服務團隊提供了訓練MNIST模型的例子,方便您在阿里雲上使用和學習Kubeflow Pipelines。具體步驟為3步:

(1)下載數據

(2)利用TensorFlow進行模型訓練

(3)模型導出

這3個步驟中後一個步驟都依賴與前一個步驟完成。

在Kubeflow Pipelines中可以用Python代碼描述了這樣一個流程, 完整代碼可以查看standalone_pipeline.py。我們在這個例子中使用了arena_op這是對於Kubeflow默認的container_op封裝,能夠實現對於分佈式訓練MPI和PS模式的無縫銜接,另外也支持使用GPU和RDMA等異構設備和分佈式存儲的簡單接入,同時也方便從git源同步代碼。是一個比較實用的工具API。而arena_op是基於開源項目Arena。

@dsl.pipeline(
name='pipeline to run jobs',
description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
dropout='0.9',
model_version='1',
commit='f097575656f927d86d99dd64931042e1a9003cb2'):
"""A pipeline for end to end machine learning workflow."""
data=["user-susan:/training"]
gpus=1
# 1. prepare data
prepare_data = arena.standalone_job_op(
name="prepare-data",
image="byrnedo/alpine-curl",
data=data,
command="mkdir -p /training/dataset/mnist && \
cd /training/dataset/mnist && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
# 2. downalod source code and train the models
train = arena.standalone_job_op(
name="train",
image="tensorflow/tensorflow:1.11.0-gpu-py3",
sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
env=["GIT_SYNC_REV=%s" % (commit)],
gpus=gpus,
data=data,
command='''
echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
--max_steps 500 --data_dir /training/dataset/mnist \
--log_dir /training/output/mnist --learning_rate %s \
--dropout %s''' % (prepare_data.output, learning_rate, dropout),

metrics=["Train-accuracy:PERCENTAGE"])
# 3. export the model
export_model = arena.standalone_job_op(
name="export-model",
image="tensorflow/tensorflow:1.11.0-py3",
sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
env=["GIT_SYNC_REV=%s" % (commit)],
data=data,
command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

Kubeflow Pipelines會將上面的代碼轉化成一個有向無環圖(DAG),其中的每一個節點就是Component(組件),而Component(組件)之間的連線代表它們之間的依賴關係。從Pipelines UI可以看到DAG圖:

像Google一樣構建機器學習系統2 - 開發你的機器學習工作流

首先具體理解一下數據準備的部分,這裡我們提供了arena.standalone_job_op的Python API, 需要指定該步驟的名稱:name,需要使用的容器鏡像:image,要使用的數據以及其對應到容器內部的掛載目錄:data,這裡的data是一個數組格式, 如data=["user-susan:/training"],表示可以掛載到多個數據。 user-susan是之前創建的Persistent Volume Claim,而/training為容器內部的掛載目錄。

prepare_data = arena.standalone_job_op(
name="prepare-data",
image="byrnedo/alpine-curl",
data=data,
command="mkdir -p /training/dataset/mnist && \
cd /training/dataset/mnist && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

而上述步驟實際上是從指定地址利用curl下載數據到分佈式存儲對應的目錄/training/dataset/mnist,請注意這裡的/training為分佈式存儲的根目錄,類似大家熟悉的根mount點;而/training/dataset/mnist是子目錄。其實後面的步驟可以通過使用同樣的根mount點,讀到數據,進行運算。

第二步是利用下載到分佈式存儲的數據,並通過git指定固定commit id下載代碼,並進行模型訓練

train = arena.standalone_job_op(
name="train",
image="tensorflow/tensorflow:1.11.0-gpu-py3",
sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
env=["GIT_SYNC_REV=%s" % (commit)],
gpus=gpus,
data=data,

command='''
echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
--max_steps 500 --data_dir /training/dataset/mnist \
--log_dir /training/output/mnist --learning_rate %s \
--dropout %s''' % (prepare_data.output, learning_rate, dropout),
metrics=["Train-accuracy:PERCENTAGE"])

可以看到這個步驟比數據準備要相對複雜一點,除了和第一步驟中的name,image, data和command之外,在模型訓練步驟中,還需要指定:

  • 獲取代碼的方式: 從可重現實驗的角度來看,對於運行試驗代碼的追本溯源,是非常重要的一環。可以在API調用時指定sync_source的git代碼源,同時通過設定env中GIT_SYNC_REV指定訓練代碼的commit id
  • gpu: 默認為0,就是不使用GPU;如果為大於0的整數值,就代表該步驟需要這個數量的GPU數。
  • metrics: 同樣是從可重現和可比較的實驗目的出發,用戶可以將需要的一系列指標導出,並且通過Pipelines UI上直觀的顯示和比較。具體使用方法分為兩步,1.在調用API時以數組的形式指定要收集指標的metrics name和指標的展示格式PERCENTAGE或者是RAW,比如metrics=["Train-accuracy:PERCENTAGE"]。2.由於Pipelines默認會從stdout日誌中收集指標,你需要在真正運行的模型代碼中輸出{metrics name}={value}或者{metrics name}:{value}, 可以參考具體樣例代碼
像Google一樣構建機器學習系統2 - 開發你的機器學習工作流

值得注意的是:

在本步驟中指定了和prepare_data相同的data參數["user-susan:/training"],就可以在訓練代碼中讀到對應的數據,比如--data_dir /training/dataset/mnist,

另外由於該步驟依賴於prepare_data,可以在方法中通過指定prepare_data.output表示兩個步驟的依賴關係。

最後export_model是基於train訓練產生的checkpoint,生成訓練模型:

export_model = arena.standalone_job_op(
name="export-model",
image="tensorflow/tensorflow:1.11.0-py3",
sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
env=["GIT_SYNC_REV=%s" % (commit)],
data=data,
command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

export_model和第二步train類似,甚至要更為簡單,它只是從git同步模型導出代碼並且利用共享目錄/training/output/mnist中的checkpoint執行模型導出。

整個工作流程看起來還是很直觀的, 下面就可以定義一個Python方法將整個流程貫穿在一起。

@dsl.pipeline(
name='pipeline to run jobs',
description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
dropout='0.9',
model_version='1',
commit='f097575656f927d86d99dd64931042e1a9003cb2'):

@dsl.pipeline是表示工作流的裝飾器,這個裝飾器中需要定義兩個屬性,分別是name和description

入口方法sample_pipeline中定義了4個參數learning_rate,dropout,model_version和commit,分別可以在上面的train和export_model階段使用。這裡的參數的值實際上是 dsl.PipelineParam類型,定義成dsl.PipelineParam的目的在於可以通過Kubeflow Pipelines的原生UI可以將其轉換成輸入表單,表單的關鍵字是參數名稱,而默認值為參數的值. 值得注意的是,這裡的dsl.PipelineParam對應值的實際上只能是字符串和數字型;而數組和map,以及自定義類型都是無法通過轉型進行變換的。

而實際上,這些參數都可以在用戶提交工作流時進行覆蓋,以下就是提交工作流對應的UI:

像Google一樣構建機器學習系統2 - 開發你的機器學習工作流

提交Pipeline

您可以在自己的Kubernetes內將前面開發工作流的Python DSL提交到Kubeflow Pipelines服務中, 實際提交代碼很簡單:

 KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
import kfp.compiler as compiler
compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
client = kfp.Client(host=KFP_SERVICE)
try:
experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
except:
experiment_id = client.create_experiment(EXPERIMENT_NAME).id
run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
params={'learning_rate':learning_rate,
'dropout':dropout,
'model_version':model_version,
'commit':commit})

利用compiler.compile將Python代碼編譯成執行引擎(Argo)識別的DAG配置文件

通過Kubeflow Pipeline的客戶端創建或者找到已有的實驗,並且提交之前編譯出的DAG配置文件

在集群內準備一個python3的環境,並且安裝Kubeflow Pipelines SDK

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

登錄到Python3的環境後,執行如下命令,連續提交兩個不同參數的任務

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

查看運行結果

登錄到Kubeflow Pipelines的UI: https://{pipeline地址}/pipeline/#/experiments, 比如

https://11.124.285.171/pipeline/#/experiments
像Google一樣構建機器學習系統2 - 開發你的機器學習工作流

點擊Compare runs按鈕,可以比較兩個實驗的輸入,花費的時間和精度等一系列指標。讓實驗可追溯是讓實驗可重現的第一步;而利用Kubeflow Pipelines本身的實驗管理能力則是開啟實驗可重現的第一步。

像Google一樣構建機器學習系統2 - 開發你的機器學習工作流

總結

實現一個可以運行的Kubeflow Pipeline需要的步驟是:

1.構建Pipeline(流水線)中需要的最小執行單元Component(組件),如果是利用原生定義的dsl.container_ops,需要構建兩部分代碼:

  • 構建運行時代碼:通常是為每個步驟構建容器鏡像,作為Pipelines和真正執行業務邏輯代碼之間的適配器。它所做的事情為獲取Pipelines上下文的輸入參數,調用業務邏輯代碼,並且將需要傳遞到下個步驟的輸出按照Pipelines的規則放到容器內的指定位置,由底層工作流組件負責傳遞。 這樣產生的結果是運行時代碼與業務邏輯代碼會耦合在一起。可以參考Kubeflow Pipelines的例子
  • 構建客戶端代碼:這個步驟通常是長成下面的樣子, 熟悉Kubernetes的朋友會發現這個步驟實際上就是在編寫Pod Spec:
container_op = dsl.ContainerOp(
name=name,
image='<train-image>',
arguments=[
'--input_dir', input_dir,
'--output_dir', output_dir,
'--model_name', model_name,
'--model_version', model_version,
'--epochs', epochs
],
file_outputs={'output': '/output.txt'}
)
container_op.add_volume(k8s_client.V1Volume(
host_path=k8s_client.V1HostPathVolumeSource(
path=persistent_volume_path),
name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(
mount_path=persistent_volume_path,
name=persistent_volume_name))
/<train-image>

利用原生定義的dsl.container_ops的好處在於靈活,由於開放了和Pipelines的交互接口,用戶可以在container_ops這個層面做許多事情。但是它的問題在於:

  • 複用度低,每個Component都需要構建鏡像和開發運行時代碼
  • 複雜度高,使用者需要了解Kubernetes的概念,比如resource limit, PVC, node selector等一系列概念
  • 支持分佈式訓練困難,由於container_op為單容器操作,如果需要支持分佈式訓練就需要在container_ops中提交和管理類似TFJob的任務。這裡會帶來複雜度和安全性的雙重挑戰,複雜度比較好理解,安全性是說提交TFJob這類任務的權限會需要開放額外的權限給Pipeline的開發者。

另一種方式是使用arena_op這種可以重用的Component API,它使用通用運行時代碼,可以免去重複構建運行時代碼的工作;同時利用通用一套的arena_op API簡化用戶的使用;也支持Parameter Server和MPI等場景。建議您使用這種方式編譯Pipelines

2.將構建好的Component(組件)拼接成Pipeline(流水線)

3.將Pipeline(流水線)編譯後Argo的執行引擎(Argo)識別的DAG配置文件, 並提交的DAG配置文件到Kubeflow Pipelines, 並利用Kubeflow Pipelines自身的UI查看流程結果。


分享到:


相關文章: