经过一个月的尽力,围绕Mnist手写数字识别任务,咱们已经逐渐在Kubeflow上搭建了一个完好可用的Pipeline,并摸清了Kubeflow一些核心组件的基础用法。
本章将解决之前的一点遗留问题,并摸清Kubeflow的最后几个核心功用,对该系列做一个收尾。
Elyra私有镜像的构建
首要是关于ELyra私有镜像的构建。
上星期已经说到,Elyra在建立每个组件容器时,都要进行如下两个操作:
- 从github上下载bootstrapper.py,requirements-elyra.txt,requirements-elyra-py37.txt文件
- pip install -r requirements-elyra.txt(或requirements-elyra-py37.txt);
咱们对ELrya做了这两处修正:
-
将三个文件寄存至本地Minio,更改下载途径;
-
将pip源改为清华源;
重构后,Elyra成功跑通,速度嘎嘎快。
Katib 自动调参
第二件事,也是上星期说到的,使用katib tune API 超参调优pytorch练习mnist任务进程;
为了代码结构明晰,我将练习进程单独放在一个脚本中:
------------------train_model.py--------------------
def main(parameters):
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist
import logging
from minio import Minio
import os
# 界说网络结构
class Net(nn.Module):
def __init__(self):
...
def forward(self, x):
...
# 界说Dataset,便于DataLoader
class Mnistset(Dataset):
def __init__(self, x, y):
...
def __getitem__(self, index):
...
def __len__(self):
...
# 界说练习进程
def train(...):
...
# 重点,经过logging.info输出固定格局的方针,便于tune API捕捉
def test(model, device, test_loader, epoch):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
data = data.float()
output = model(data)
test_loss += F.nll_loss(output, target, reduction="sum").item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
test_accuracy = float(correct) / len(test_loader.dataset)
logging.info("Epoch {}. accuracy={:.4f} - loss={:.4f}".format(epoch, test_accuracy, test_loss))
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
level=logging.INFO,
)
logging.info("--------------------------------------------------------------------------------------")
logging.info(f"Input Parameters: {parameters}")
logging.info("--------------------------------------------------------------------------------------\n\n")
lr = float(parameters["lr"])
momentum = float(parameters["momentum"])
epochs = int(parameters["epochs"])
no_cuda = parameters["no_cuda"]
endpoint = parameters["endpoint"]
access_key = parameters["access_key"]
secret_key = parameters["secret_key"]
bucket_name = parameters["bucket_name"]
object_name = parameters["object_name"]
model_name = parameters["model_name"]
save_model = parameters["save_model"]
use_cuda = not no_cuda and torch.cuda.is_available()
if use_cuda:
print("Using CUDA")
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
# 从本地MinIO下载数据
client = Minio(endpoint,access_key,secret_key,secure=False)
client.fget_object(bucket_name,object_name,object_name)
with np.load(object_name) as f:
x_train, y_train = f['x_train'], f['y_train']
x_test, y_test = f['x_test'], f['y_test']
# 制造Dataloader
train_set = Mnistset(x_train, y_train)
train_loader = DataLoader(train_set,batch_size=64,shuffle=True,**kwargs)
test_set = Mnistset(x_test, y_test)
test_loader = DataLoader(test_set,batch_size=1000,shuffle=False,**kwargs)
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
# 练习
for epoch in range(1, epochs + 1):
train(model, device, train_loader, optimizer, epoch)
test(model, device, test_loader, epoch)
然后使用 katib.KatibClient().tune(),最能够愉快地调优了:
from kubeflow import katib
from train_model import main
exp_name = "tune-pytorch-mnist"
katib_client = katib.KatibClient()
parameters = {
"lr": katib.search.double(min=0.01, max=0.2),
"momentum": katib.search.double(min=0.1, max=1),
"epochs": katib.search.int(min=1, max=5),
"no_cuda": True,
"endpoint": "47.96.106.97:9000",
"access_key": "minio",
"secret_key": "minio123",
"bucket_name": "lifu963",
"object_name": "mnist.npz",
"model_name": "model.onnx",
}
katib_client.tune(
name=exp_name,
objective=main,
parameters=parameters,
algorithm_name="cmaes",
objective_metric_name="accuracy",
additional_metric_names=["loss"],
max_trial_count=12,
parallel_trial_count=2,
base_image='...',
)
tune API的函数参数如下:
- name: str, 试验名
- objective: Callable, 方针函数
- parameters: Dict[str, Any], 方针函数需求调优的参数
- base_image: str = ‘docker.io/tensorflow/tensorflow:2.9.1’, 方针函数运转的基础镜像
- namespace: str = ‘kubeflow-user-example-com’,
- algorithm_name: str = ‘random’, 调优算法
- objective_metric_name: str = None, 调优首要方针
- additional_metric_names: List[str] = [], 调优次要方针
- objective_type: str = ‘maximize’,
- objective_goal: float = None,
- max_trial_count: int = None,
- parallel_trial_count: int = None,
- max_failed_trial_count: int = None,
- retain_trials: bool = False,
- packages_to_install: List[str] = None, pip所需安装包
- pip_index_url: str = ‘pypi.org/simple’, pip安装途径
能够说非常全面,当然也有不足之处,首要便是,输出的log格局是要求固定的: {param_1}={:.4f} - {param_2}={:.4f} ...
,无法自界说格局。
调优结果:
还能够使用Katib 其他API打印调优相关信息;
在这儿,咱们每10秒轮询一次当时的调优最佳参数,若当时最优参数不为空,则记载该参数,用于真实的模型练习:
# 记载当时调优状况
status = katib_client.get_experiment_status(exp_name)
print(f"Katib Experiment status: {status}\n")
best_hps = {}
while best_hps == {}:
best_hps = katib_client.get_optimal_hyperparameters(exp_name) #当时最优参数
time.sleep(10)
continue
if best_hps != {}:
import json
print("Current Optimal Trial\n")
print(json.dumps(best_hps, indent=4))
for hp in best_hps["currentOptimalTrial"]["parameterAssignments"]:
if hp["name"] == "lr":
best_lr = hp["value"]
elif hp["name"] == "momentum":
best_momentum = hp["value"]
elif hp["name"] == "epochs":
best_epochs = hp["value"]
收集最优参数后,咱们就能够将该参数用于下一步的模型练习。
PytorchJob分布式练习
Kubeflow作为一个布置在k8s上的机器学习渠道,若不能分布式练习,那还有什么意义?
关于在Kubeflow上进行分布式练习,有两点需求做:
- 在练习脚本上使用分布式练习
- 在Kubeflow上布置分布式练习
先从第二点讲起。
在Kubeflow上布置分布式练习
PytorchJob其实和Kaitb体验相似,但会更好上手一点。它的原生办法也是经过直接布置yaml文件来敞开分布式练习的,之后呈现了Python SDK,以及更高层的API对其封装。
先看它的yaml文件:
PytorchJob的yaml文件结构很明晰:分别需求界说Master节点和Worker节点;并在每个节点中界说一个容器,写明容器的镜像及运转命令行、命令参数、所需资源等。
经过Python SDK,咱们能够更清楚地看懂该结构:
当然,咱们还能够使用高级API create_pytorchjob_from_func
直接布置分布式练习:
from train_model import main
from kubeflow.training import PyTorchJobClient
# lr,momentum等自己设值
parameters = {
"lr": lr,
"momentum": momentum,
"epochs": epochs,
"no_cuda": no_cuda,
"endpoint": endpoint,
"access_key": access_key,
"secret_key": secret_key,
"bucket_name": bucket_name,
"object_name": object_name,
"model_name": model_name,
"save_model": save_model,
"backend": backend,
}
job_name = "train-pytorch"
job_client = PyTorchJobClient()
job_client.create_pytorchjob_from_func(
name=job_name,
func=main,
parameters=parameters,
base_image="...",
num_worker_replicas=3, #界说work节点数量
)
敞开练习后,咱们还能够调用其他API检查练习情况:
在练习脚本上使用分布式练习
在练习脚本上使用分布式练习其实也并非难事。
def main(parameters):
import ...
RANK = int(os.environ.get("RANK", 0))
WORLD_SIZE = int(os.environ.get("WORLD_SIZE", 1))
class Net(nn.Module):
...
class Mnistset(Dataset):
...
def train(...):
...
def test(...):
...
def should_distribute():
return dist.is_available() and WORLD_SIZE > 1
def is_distributed():
return dist.is_available() and dist.is_initialized()
logging.basicConfig(...)
logging.info("...")
lr = float(parameters["lr"])
momentum = float(parameters["momentum"])
...
if dist.is_available():
backend = parameters["backend"]
use_cuda = not no_cuda and torch.cuda.is_available()
if use_cuda:
print("Using CUDA")
device = torch.device("cuda" if use_cuda else "cpu")
if should_distribute():
print("Using distributed PyTorch with {} backend".format(backend))
dist.init_process_group(backend=backend, rank=RANK, world_size=WORLD_SIZE)
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
client = Minio(endpoint,access_key,secret_key,secure=False)
client.fget_object(bucket_name,object_name,object_name)
with np.load(object_name) as f:
x_train, y_train = f['x_train'], f['y_train']
x_test, y_test = f['x_test'], f['y_test']
train_set = Mnistset(x_train, y_train)
train_loader = DataLoader(train_set,...)
test_set = Mnistset(x_test, y_test)
test_loader = DataLoader(test_set,...)
model = Net().to(device)
if is_distributed():
Distributor = nn.parallel.DistributedDataParallel
model = Distributor(model)
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
logging.info(f"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}")
for epoch in range(1, epochs + 1):
train(...)
test(...)
if save_model:
...
if the distributed package is available
- 首要,获取RANK和WORLD_SIZE值;Kubeflow Training Operator 会基于环境装备自动设置合理的RANK和WORLD_SIZE值;
- 编写should_distribute函数检验能否敞开分布式;编写is_distributed函数检验当时是否处于分布式状况;
- 若dist.is_available(),则设置backend值;
- 若should_distribute(),则初始化分布式练习;
dist.init_process_group(backend=backend, rank=RANK, world_size=WORLD_SIZE)
- 若is_distributed(),则经过
model = Distributor(model)
使模型处于分布式练习的状况下;
注:
- world_size 为整个job的进程数;上文中咱们装备了1个master和3个node,所以这儿world_size应为4;
- rank 用于表示当时进程的序号,值应在0~world_size-1之间;
- backend 装备各进程间的通信方法,首要有nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI推出),一般默认为”gloo”;从测验作用来看,若显卡支撑nccl,建议挑选nccl,若为其他硬件(非NVIDIA卡),则能够考虑gloo、mpi。
不过这儿还有个问题,便是调用PytorchJob API后,我练习完后的模型保存在哪就不方便找了;所以在脚本中,我完成模型练习后,就直接将它传上本地Minio了。
if save_model:
dummy_input = torch.randn(1, 28, 28)
torch.onnx.export(model, dummy_input, model_name)
client.fput_object(bucket_name, model_name, model_name)
logging.info("save model done!")
Artifacts
Ariifacts,简单来说,便是可视化。 当咱们依据一定要求装备mlpipeline-ui-metadata.json文件后,Kubeflow能够依据该装备进行可视化。
选用v1 SDK 编写指定元数据的Json文件
当咱们按一定要求编写好mlpipeline-ui-metadata.json文件,在source中指定好csv途径,就能够将其可视化;
container_op_hopeVisual = partial(
components.func_to_container_op,
base_image= ...,
)
@container_op_hopeVisual
def roc(data_dir):
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_curve,roc_auc_score
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split, cross_val_predict
import pandas as pd
import json
import os
from pathlib import Path
X, y = load_wine(return_X_y=True)
y = y == 1
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
rfc = RandomForestClassifier(n_estimators=10, random_state=42)
rfc.fit(X_train, y_train)
y_scores = cross_val_predict(rfc, X_train, y_train, cv=3, method='predict_proba')
y_predict = cross_val_predict(rfc, X_train, y_train, cv=3, method='predict')
roc_auc = roc_auc_score(y_train, y_scores[:,1])
fpr, tpr, thresholds = roc_curve(y_true=y_train, y_score=y_scores[:,1], pos_label=True)
df_roc = pd.DataFrame({'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds})
roc_file = "roc.csv"
os.makedirs(os.path.join(data_dir,"csv_file"),exist_ok=True)
file_path = os.path.join("csv_file",roc_file)
df_roc.to_csv(os.path.join(data_dir,file_path),columns=['fpr', 'tpr', 'thresholds'], header=False, index=False)
metadata = {
'outputs': [{
'type': 'roc',
'format': 'csv',
'schema': [
{'name': 'fpr', 'type': 'NUMBER'},
{'name': 'tpr', 'type': 'NUMBER'},
{'name': 'thresholds', 'type': 'NUMBER'},
],
'source': file_path,
}]
}
ui_metadata_output_path = 'mlpipeline-ui-metadata.json'
Path(os.path.join(data_dir,ui_metadata_output_path)).parent.mkdir(parents=True, exist_ok=True)
Path(os.path.join(data_dir,ui_metadata_output_path)).write_text(json.dumps(metadata))
metrics = {
'metrics': [{
'name': 'roc-auc-score',
'numberValue': roc_auc,
}]
}
metrics_output_path = 'mlpipeline-metrics.json'
Path(os.path.join(data_dir,metrics_output_path)).parent.mkdir(parents=True, exist_ok=True)
Path(os.path.join(data_dir,metrics_output_path)).write_text(json.dumps(metrics))
print("list outputDir: ", os.listdir(data_dir))
return
@dsl.pipeline(
name='hope_visual pipeline',
description='',
)
def pipeline(pvcMountDir:str = "/tmp/outputs"):
createPvc = dsl.VolumeOp(
name="create-pvc",
resource_name="my-pvc-visual",
modes=dsl.VOLUME_MODE_RWO,
size='100M',)
roc_op = roc(pvcMountDir)
roc_op.add_pvolumes({pvcMountDir:createPvc.volume})
roc_op.after(createPvc)
注意,artifacts首要是经过访问pipeliine的文件系统,获取mlpipeline-ui-metadata.json文件,来烘托可视化;因而,咱们有必要为pipeline装备PV卷,或许选用Erlya的方法,将文件系统放在MinIO上;若未装备持久化存储,仅寄存在临时的pod空间内,将无法烘托可视化输出。
因而,在pipeline函数中,我特别为该组件挂上了PV卷,用于保存mlpipeline-ui-metadata.json等文件。
可是,csv文件(source)又有必要放在云端上…,所以就蛮费事的就。 (或许将csv文件解析为string,storage挑选”inline”)
支撑的可视化模块:
- 混杂矩阵 confusion_matrix
- ROC曲线
- 表格
- tensorboard
注: 精确来说,artifacts并不会可视化出一个叫做tensorboard的东西;tensorboard是tensorflow的可视化东西;artifacts经过json文件在source中装备tensorboard的文件途径,在可视化界面中将会提供一个Start Tensorboard的按钮,点击可跳转至Kubeflow的Tensorboard界面:
因而,在json文件中装备Tensorboard时,只需求装备source特点就够了:
选用v2 SDK:使用开发东西包可视化API
有意思的的是,尽管当时我用的dsl版本是v1的,可是也能够经过v2兼容方法,使用方便的SDK API用于方针可视化。
例如,在定制组件时,使用
from kfp.v2.dsl import component
@component
def func(...)
...
在编译Pipeline时,增加mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE
:
compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)
.compile(pipeline_func=add_pipeline, package_path='pipeline.yaml')
现在kfp支撑roc、混杂矩阵 confusion_matrix、标量衡量格局Scalar Metrics formats(类似表格).
import os
from kfp.v2 import dsl
from kfp.v2.dsl import (
component,
Output,
ClassificationMetrics,
Metrics,
HTML,
Markdown
)
@component(
base_image='...'
)
def iris_sgdclassifier(test_samples_fraction: float, metrics: Output[ClassificationMetrics]):
from sklearn import datasets, model_selection
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import confusion_matrix
iris_dataset = datasets.load_iris()
train_x, test_x, train_y, test_y = model_selection.train_test_split(
iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)
classifier = SGDClassifier()
classifier.fit(train_x, train_y)
predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=3)
metrics.log_confusion_matrix(
['Setosa', 'Versicolour', 'Virginica'],
confusion_matrix(train_y, predictions).tolist() # .tolist() to convert np array to list.
)
@dsl.pipeline(name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.3)
经过metrics.log_confusion_matrix函数,即可简单地完成可视化输出。
当然,现在我还没很看明白metrics: Output[ClassificationMetrics]参数,为甚恶魔不赋值也不会报错;一起,这个API貌似也还不是很安稳,可视化输出有时行有时不行:
就很奇怪。
关于metrics.log_confusion_matrix等内容,源码首要寄存在:pipelines/artifact_types.py at sdk/release-1.8 kubeflow/pipelines (github.com)中。
我还没有细看,可是稍微瞥了一眼,严峻置疑是因为v2的可视化输出sdk是把json文件的source放在云端上?
然后这些云端又是放在外网上,所以可视化烘托不安稳?
总结
artifacts现在在我看来,还是比较鸡肋:首要支撑的可视化太少了:v2 SDK首要支撑的仅有:roc、混杂矩阵 confusion_matrix、表格;而V1经过装备json文件,也仅仅只多了一个跳转Tensorboard。
所以甚至很难提起大的兴趣去揣摩怎样解决v2可视化不安稳的问题。
相比于把功夫放在artifacts上,可能把咱们的试验结果等文件直接存入MinIO,或许揣摩揣摩怎样使用成熟的可视化东西Tensorboard,会显得更好一点?
然而Tensorboard又是TensorFlow自带的可视化东西,妈的,那是不是还得准备下TensorBoard?
Elyra自界说组件及启示
在技能群里,有人说到Elyra与自界说组件的结合:在Elyra中直接增加自界说组件,就能够为渠道提供一些通用的组件,将渠道的各个能力贯通起来,以低代码或许无代码的方法提供?
忽然发现这的确可能是个能搞的技能点?
我后续又去研究了一下,这事儿是这样:
咱们在搭管道的时候,需求由一个一个component组成pipeline:
比如这个,便是个标准的component,咱们能够把它拿去搭pipeline:
但其实,咱们还能够在构建component时,把它输出为yaml文件,复用起来:
详细怎样复用呢?在原生办法里,经过调用API:kfp.components.load_component_from_file
,就能够经过编译的yaml文件获得该组件了;
而在Elyra里,咱们需求这么干:
这yaml文件现在在这儿:
该文件夹的绝对途径是这儿:
现在点这儿:
然后首要装备下这个:
在这儿,咱们就能够调用这个组件了:
咱们能够设置该组件的自界说参数,挂载已存在的PV卷,等:
除了该方法外,还有两种方法:
- 经过URL,也便是将yaml文件传云端上;然后path就要改成url途径了;
- 加载一整个目录下的yaml文件,这样咱们就能够把一组功用齐全的组件放在同一目录下,然后Path填该目录的途径就行。
所以,咱们能够经过第二种方法,制造一系列功用齐全的组件或管道、全编译成yaml文件,放在同一文件夹下,传到云端上;然后别人git下来整个目录,用第二种方法装备一波,就能够得到一堆组件,进行低代码开发了,我觉得。
OK,到这儿,我觉得整个系列也该做个完结了。先不论模型监控,数据、模型版本控制啥的,咱们总算能够说,咱们能够在k8s上做点AutoML的开发了。