经过一个月的尽力,围绕Mnist手写数字识别任务,咱们已经逐渐在Kubeflow上搭建了一个完好可用的Pipeline,并摸清了Kubeflow一些核心组件的基础用法。

本章将解决之前的一点遗留问题,并摸清Kubeflow的最后几个核心功用,对该系列做一个收尾。

Elyra私有镜像的构建

首要是关于ELyra私有镜像的构建。

上星期已经说到,Elyra在建立每个组件容器时,都要进行如下两个操作:

  1. 从github上下载bootstrapper.py,requirements-elyra.txt,requirements-elyra-py37.txt文件
  2. pip install -r requirements-elyra.txt(或requirements-elyra-py37.txt);

咱们对ELrya做了这两处修正:

  1. 将三个文件寄存至本地Minio,更改下载途径;

  2. 将pip源改为清华源;

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

重构后,Elyra成功跑通,速度嘎嘎快。

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

Katib 自动调参

第二件事,也是上星期说到的,使用katib tune API 超参调优pytorch练习mnist任务进程;

为了代码结构明晰,我将练习进程单独放在一个脚本中:

------------------train_model.py--------------------
def main(parameters):
    import torch
    import numpy as np
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torch.utils.data import Dataset, DataLoader
    import torch.distributed as dist
    import logging
    from minio import Minio
    import os
# 界说网络结构
    class Net(nn.Module):
        def __init__(self):
            ...
        def forward(self, x):
            ...
# 界说Dataset,便于DataLoader
    class Mnistset(Dataset):
        def __init__(self, x, y):
            ...
        def __getitem__(self, index):
            ...
        def __len__(self):
            ...
# 界说练习进程 
    def train(...):
        ...
# 重点,经过logging.info输出固定格局的方针,便于tune API捕捉
    def test(model, device, test_loader, epoch):
        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                data = data.float()
                output = model(data)
                test_loss += F.nll_loss(output, target, reduction="sum").item()  # sum up batch loss
                pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
                correct += pred.eq(target.view_as(pred)).sum().item()
        test_loss /= len(test_loader.dataset)
        test_accuracy = float(correct) / len(test_loader.dataset)
        logging.info("Epoch {}. accuracy={:.4f} - loss={:.4f}".format(epoch, test_accuracy, test_loss))
    logging.basicConfig(
        format="%(asctime)s %(levelname)-8s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%SZ",
        level=logging.INFO,
    )
    logging.info("--------------------------------------------------------------------------------------")
    logging.info(f"Input Parameters: {parameters}")
    logging.info("--------------------------------------------------------------------------------------\n\n")
    lr = float(parameters["lr"])
    momentum = float(parameters["momentum"])
    epochs = int(parameters["epochs"])
    no_cuda = parameters["no_cuda"]
    endpoint = parameters["endpoint"]
    access_key = parameters["access_key"]
    secret_key = parameters["secret_key"]
    bucket_name = parameters["bucket_name"]
    object_name = parameters["object_name"]
    model_name = parameters["model_name"]
    save_model = parameters["save_model"]
    use_cuda = not no_cuda and torch.cuda.is_available()
    if use_cuda:
        print("Using CUDA")
    device = torch.device("cuda" if use_cuda else "cpu")
    kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
# 从本地MinIO下载数据
    client = Minio(endpoint,access_key,secret_key,secure=False)
    client.fget_object(bucket_name,object_name,object_name)
    with np.load(object_name) as f:
        x_train, y_train = f['x_train'], f['y_train']
        x_test, y_test = f['x_test'], f['y_test']
# 制造Dataloader
    train_set = Mnistset(x_train, y_train)
    train_loader = DataLoader(train_set,batch_size=64,shuffle=True,**kwargs)
    test_set = Mnistset(x_test, y_test)
    test_loader = DataLoader(test_set,batch_size=1000,shuffle=False,**kwargs)
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
# 练习
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader, epoch)

然后使用 katib.KatibClient().tune(),最能够愉快地调优了:

from kubeflow import katib
from train_model import main
exp_name = "tune-pytorch-mnist"
katib_client = katib.KatibClient()
parameters = {
    "lr": katib.search.double(min=0.01, max=0.2),
    "momentum": katib.search.double(min=0.1, max=1),
    "epochs": katib.search.int(min=1, max=5),
    "no_cuda": True,
    "endpoint": "47.96.106.97:9000",
    "access_key": "minio",
    "secret_key": "minio123",
    "bucket_name": "lifu963",
    "object_name": "mnist.npz",
    "model_name": "model.onnx",
}
katib_client.tune(
    name=exp_name,
    objective=main, 
    parameters=parameters,
    algorithm_name="cmaes", 
    objective_metric_name="accuracy", 
    additional_metric_names=["loss"],
    max_trial_count=12, 
    parallel_trial_count=2,
    base_image='...',
)

tune API的函数参数如下:

  • name: str, 试验名
  • objective: Callable, 方针函数
  • parameters: Dict[str, Any], 方针函数需求调优的参数
  • base_image: str = ‘docker.io/tensorflow/tensorflow:2.9.1’, 方针函数运转的基础镜像
  • namespace: str = ‘kubeflow-user-example-com’,
  • algorithm_name: str = ‘random’, 调优算法
  • objective_metric_name: str = None, 调优首要方针
  • additional_metric_names: List[str] = [], 调优次要方针
  • objective_type: str = ‘maximize’,
  • objective_goal: float = None,
  • max_trial_count: int = None,
  • parallel_trial_count: int = None,
  • max_failed_trial_count: int = None,
  • retain_trials: bool = False,
  • packages_to_install: List[str] = None, pip所需安装包
  • pip_index_url: str = ‘pypi.org/simple’, pip安装途径

能够说非常全面,当然也有不足之处,首要便是,输出的log格局是要求固定的: {param_1}={:.4f} - {param_2}={:.4f} ...,无法自界说格局。

调优结果:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

还能够使用Katib 其他API打印调优相关信息;

在这儿,咱们每10秒轮询一次当时的调优最佳参数,若当时最优参数不为空,则记载该参数,用于真实的模型练习:

# 记载当时调优状况
status = katib_client.get_experiment_status(exp_name)
print(f"Katib Experiment status: {status}\n")
best_hps = {}
while best_hps == {}:
    best_hps = katib_client.get_optimal_hyperparameters(exp_name) #当时最优参数
    time.sleep(10)
    continue
if best_hps != {}:
    import json
    print("Current Optimal Trial\n")
    print(json.dumps(best_hps, indent=4))
    for hp in best_hps["currentOptimalTrial"]["parameterAssignments"]:
        if hp["name"] == "lr":
            best_lr = hp["value"]
        elif hp["name"] == "momentum":
            best_momentum = hp["value"]
        elif hp["name"] == "epochs":
            best_epochs = hp["value"]

收集最优参数后,咱们就能够将该参数用于下一步的模型练习。

PytorchJob分布式练习

Kubeflow作为一个布置在k8s上的机器学习渠道,若不能分布式练习,那还有什么意义?

关于在Kubeflow上进行分布式练习,有两点需求做:

  1. 在练习脚本上使用分布式练习
  2. 在Kubeflow上布置分布式练习

先从第二点讲起。

在Kubeflow上布置分布式练习

PytorchJob其实和Kaitb体验相似,但会更好上手一点。它的原生办法也是经过直接布置yaml文件来敞开分布式练习的,之后呈现了Python SDK,以及更高层的API对其封装。

先看它的yaml文件:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

PytorchJob的yaml文件结构很明晰:分别需求界说Master节点和Worker节点;并在每个节点中界说一个容器,写明容器的镜像及运转命令行、命令参数、所需资源等。

经过Python SDK,咱们能够更清楚地看懂该结构:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

当然,咱们还能够使用高级API create_pytorchjob_from_func 直接布置分布式练习:

from train_model import main
from kubeflow.training import PyTorchJobClient
# lr,momentum等自己设值
parameters = {
    "lr": lr,
    "momentum": momentum,
    "epochs": epochs,
    "no_cuda": no_cuda,
    "endpoint": endpoint,
    "access_key": access_key,
    "secret_key": secret_key,
    "bucket_name": bucket_name,
    "object_name": object_name,
    "model_name": model_name,
    "save_model": save_model,
    "backend": backend,
}
job_name = "train-pytorch"
job_client = PyTorchJobClient()
job_client.create_pytorchjob_from_func(
    name=job_name,
    func=main,
    parameters=parameters,
    base_image="...",
    num_worker_replicas=3, #界说work节点数量
)

敞开练习后,咱们还能够调用其他API检查练习情况:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

在练习脚本上使用分布式练习

在练习脚本上使用分布式练习其实也并非难事。

def main(parameters):
    import ...
    RANK = int(os.environ.get("RANK", 0))
    WORLD_SIZE = int(os.environ.get("WORLD_SIZE", 1))
    class Net(nn.Module):
          ...
    class Mnistset(Dataset):
          ...
    def train(...):
        ...
    def test(...):
        ...
    def should_distribute():
        return dist.is_available() and WORLD_SIZE > 1
    def is_distributed():
        return dist.is_available() and dist.is_initialized()
    logging.basicConfig(...)
    logging.info("...")
    lr = float(parameters["lr"])
    momentum = float(parameters["momentum"])
    ...
    if dist.is_available():
        backend = parameters["backend"]
    use_cuda = not no_cuda and torch.cuda.is_available()
    if use_cuda:
        print("Using CUDA")
    device = torch.device("cuda" if use_cuda else "cpu")
    if should_distribute():
        print("Using distributed PyTorch with {} backend".format(backend))
        dist.init_process_group(backend=backend, rank=RANK, world_size=WORLD_SIZE)
    kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
    client = Minio(endpoint,access_key,secret_key,secure=False)
    client.fget_object(bucket_name,object_name,object_name)
    with np.load(object_name) as f:
        x_train, y_train = f['x_train'], f['y_train']
        x_test, y_test = f['x_test'], f['y_test']
    train_set = Mnistset(x_train, y_train)
    train_loader =  DataLoader(train_set,...)
    test_set = Mnistset(x_test, y_test)
    test_loader =  DataLoader(test_set,...)
    model = Net().to(device)
    if is_distributed():
        Distributor = nn.parallel.DistributedDataParallel
        model = Distributor(model)
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    logging.info(f"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}")
    for epoch in range(1, epochs + 1):
        train(...)
        test(...)
    if save_model:
       ...

if the distributed package is available

  1. 首要,获取RANK和WORLD_SIZE值;Kubeflow Training Operator 会基于环境装备自动设置合理的RANK和WORLD_SIZE值;
  2. 编写should_distribute函数检验能否敞开分布式;编写is_distributed函数检验当时是否处于分布式状况;
  3. 若dist.is_available(),则设置backend值;
  4. 若should_distribute(),则初始化分布式练习;dist.init_process_group(backend=backend, rank=RANK, world_size=WORLD_SIZE)
  5. 若is_distributed(),则经过model = Distributor(model)使模型处于分布式练习的状况下;

  1. world_size 为整个job的进程数;上文中咱们装备了1个master和3个node,所以这儿world_size应为4;
  2. rank 用于表示当时进程的序号,值应在0~world_size-1之间;
  3. backend 装备各进程间的通信方法,首要有nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI推出),一般默认为”gloo”;从测验作用来看,若显卡支撑nccl,建议挑选nccl,若为其他硬件(非NVIDIA卡),则能够考虑gloo、mpi。

不过这儿还有个问题,便是调用PytorchJob API后,我练习完后的模型保存在哪就不方便找了;所以在脚本中,我完成模型练习后,就直接将它传上本地Minio了。

    if save_model:
        dummy_input = torch.randn(1, 28, 28)
        torch.onnx.export(model, dummy_input, model_name)
        client.fput_object(bucket_name, model_name, model_name)
        logging.info("save model done!")

Artifacts

Ariifacts,简单来说,便是可视化。 当咱们依据一定要求装备mlpipeline-ui-metadata.json文件后,Kubeflow能够依据该装备进行可视化。

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

选用v1 SDK 编写指定元数据的Json文件

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

当咱们按一定要求编写好mlpipeline-ui-metadata.json文件,在source中指定好csv途径,就能够将其可视化;

container_op_hopeVisual = partial(
    components.func_to_container_op,
    base_image= ...,
)
@container_op_hopeVisual
def roc(data_dir):
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import roc_curve,roc_auc_score
    from sklearn.datasets import load_wine
    from sklearn.model_selection import train_test_split, cross_val_predict
    import pandas as pd
    import json
    import os
    from pathlib import Path
    X, y = load_wine(return_X_y=True)
    y = y == 1
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
    rfc = RandomForestClassifier(n_estimators=10, random_state=42)
    rfc.fit(X_train, y_train)
    y_scores = cross_val_predict(rfc, X_train, y_train, cv=3, method='predict_proba')
    y_predict = cross_val_predict(rfc, X_train, y_train, cv=3, method='predict')
    roc_auc = roc_auc_score(y_train, y_scores[:,1])
    fpr, tpr, thresholds = roc_curve(y_true=y_train, y_score=y_scores[:,1], pos_label=True)
    df_roc = pd.DataFrame({'fpr': fpr, 'tpr': tpr, 'thresholds': thresholds})
    roc_file = "roc.csv"
    os.makedirs(os.path.join(data_dir,"csv_file"),exist_ok=True)
    file_path = os.path.join("csv_file",roc_file)
    df_roc.to_csv(os.path.join(data_dir,file_path),columns=['fpr', 'tpr', 'thresholds'], header=False, index=False)
    metadata = {
    'outputs': [{
      'type': 'roc',
      'format': 'csv',
      'schema': [
        {'name': 'fpr', 'type': 'NUMBER'},
        {'name': 'tpr', 'type': 'NUMBER'},
        {'name': 'thresholds', 'type': 'NUMBER'},
      ],
      'source': file_path,
    }]
    }
    ui_metadata_output_path = 'mlpipeline-ui-metadata.json'
    Path(os.path.join(data_dir,ui_metadata_output_path)).parent.mkdir(parents=True, exist_ok=True)
    Path(os.path.join(data_dir,ui_metadata_output_path)).write_text(json.dumps(metadata))
    metrics = {
    'metrics': [{
      'name': 'roc-auc-score',
      'numberValue':  roc_auc,
    }]
    }
    metrics_output_path = 'mlpipeline-metrics.json'
    Path(os.path.join(data_dir,metrics_output_path)).parent.mkdir(parents=True, exist_ok=True)
    Path(os.path.join(data_dir,metrics_output_path)).write_text(json.dumps(metrics))
    print("list outputDir: ", os.listdir(data_dir))
    return
@dsl.pipeline(
    name='hope_visual pipeline',
    description='',
)
def pipeline(pvcMountDir:str = "/tmp/outputs"):
    createPvc = dsl.VolumeOp(
        name="create-pvc",
        resource_name="my-pvc-visual",
        modes=dsl.VOLUME_MODE_RWO,
        size='100M',)
    roc_op = roc(pvcMountDir)
    roc_op.add_pvolumes({pvcMountDir:createPvc.volume})
    roc_op.after(createPvc)

注意,artifacts首要是经过访问pipeliine的文件系统,获取mlpipeline-ui-metadata.json文件,来烘托可视化;因而,咱们有必要为pipeline装备PV卷,或许选用Erlya的方法,将文件系统放在MinIO上;若未装备持久化存储,仅寄存在临时的pod空间内,将无法烘托可视化输出。

因而,在pipeline函数中,我特别为该组件挂上了PV卷,用于保存mlpipeline-ui-metadata.json等文件。

可是,csv文件(source)又有必要放在云端上…,所以就蛮费事的就。 (或许将csv文件解析为string,storage挑选”inline”)

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

支撑的可视化模块:

  1. 混杂矩阵 confusion_matrix

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

  1. ROC曲线

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

  1. 表格

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

  1. tensorboard

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

注: 精确来说,artifacts并不会可视化出一个叫做tensorboard的东西;tensorboard是tensorflow的可视化东西;artifacts经过json文件在source中装备tensorboard的文件途径,在可视化界面中将会提供一个Start Tensorboard的按钮,点击可跳转至Kubeflow的Tensorboard界面:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

因而,在json文件中装备Tensorboard时,只需求装备source特点就够了:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

选用v2 SDK:使用开发东西包可视化API

有意思的的是,尽管当时我用的dsl版本是v1的,可是也能够经过v2兼容方法,使用方便的SDK API用于方针可视化。

例如,在定制组件时,使用

from kfp.v2.dsl import component
@component
def func(...)
    ...

在编译Pipeline时,增加mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE

compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)
    .compile(pipeline_func=add_pipeline, package_path='pipeline.yaml')

现在kfp支撑roc、混杂矩阵 confusion_matrix、标量衡量格局Scalar Metrics formats(类似表格).

import os
from kfp.v2 import dsl
from kfp.v2.dsl import (
    component,
    Output,
    ClassificationMetrics,
    Metrics,
    HTML,
    Markdown
)
@component(
    base_image='...'
)
def iris_sgdclassifier(test_samples_fraction: float, metrics: Output[ClassificationMetrics]):
    from sklearn import datasets, model_selection
    from sklearn.linear_model import SGDClassifier
    from sklearn.metrics import confusion_matrix
    iris_dataset = datasets.load_iris()
    train_x, test_x, train_y, test_y = model_selection.train_test_split(
        iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)
    classifier = SGDClassifier()
    classifier.fit(train_x, train_y)
    predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=3)
    metrics.log_confusion_matrix(
        ['Setosa', 'Versicolour', 'Virginica'],
        confusion_matrix(train_y, predictions).tolist() # .tolist() to convert np array to list.
    )
@dsl.pipeline(name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
    iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.3)

经过metrics.log_confusion_matrix函数,即可简单地完成可视化输出。

当然,现在我还没很看明白metrics: Output[ClassificationMetrics]参数,为甚恶魔不赋值也不会报错;一起,这个API貌似也还不是很安稳,可视化输出有时行有时不行:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

就很奇怪。

关于metrics.log_confusion_matrix等内容,源码首要寄存在:pipelines/artifact_types.py at sdk/release-1.8 kubeflow/pipelines (github.com)中。

我还没有细看,可是稍微瞥了一眼,严峻置疑是因为v2的可视化输出sdk是把json文件的source放在云端上?

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

然后这些云端又是放在外网上,所以可视化烘托不安稳?

总结

artifacts现在在我看来,还是比较鸡肋:首要支撑的可视化太少了:v2 SDK首要支撑的仅有:roc、混杂矩阵 confusion_matrix、表格;而V1经过装备json文件,也仅仅只多了一个跳转Tensorboard。

所以甚至很难提起大的兴趣去揣摩怎样解决v2可视化不安稳的问题。

相比于把功夫放在artifacts上,可能把咱们的试验结果等文件直接存入MinIO,或许揣摩揣摩怎样使用成熟的可视化东西Tensorboard,会显得更好一点?

然而Tensorboard又是TensorFlow自带的可视化东西,妈的,那是不是还得准备下TensorBoard?

Elyra自界说组件及启示

在技能群里,有人说到Elyra与自界说组件的结合:在Elyra中直接增加自界说组件,就能够为渠道提供一些通用的组件,将渠道的各个能力贯通起来,以低代码或许无代码的方法提供?

忽然发现这的确可能是个能搞的技能点?

我后续又去研究了一下,这事儿是这样:

咱们在搭管道的时候,需求由一个一个component组成pipeline:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

比如这个,便是个标准的component,咱们能够把它拿去搭pipeline:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

但其实,咱们还能够在构建component时,把它输出为yaml文件,复用起来:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

详细怎样复用呢?在原生办法里,经过调用API:kfp.components.load_component_from_file,就能够经过编译的yaml文件获得该组件了;

而在Elyra里,咱们需求这么干:

这yaml文件现在在这儿:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

该文件夹的绝对途径是这儿:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

现在点这儿:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

然后首要装备下这个:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

在这儿,咱们就能够调用这个组件了:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

咱们能够设置该组件的自界说参数,挂载已存在的PV卷,等:

论如何在KubeFlow上跑通第一个Pipeline(五):尾章

除了该方法外,还有两种方法:

  1. 经过URL,也便是将yaml文件传云端上;然后path就要改成url途径了;
  2. 加载一整个目录下的yaml文件,这样咱们就能够把一组功用齐全的组件放在同一目录下,然后Path填该目录的途径就行。

所以,咱们能够经过第二种方法,制造一系列功用齐全的组件或管道、全编译成yaml文件,放在同一文件夹下,传到云端上;然后别人git下来整个目录,用第二种方法装备一波,就能够得到一堆组件,进行低代码开发了,我觉得。

OK,到这儿,我觉得整个系列也该做个完结了。先不论模型监控,数据、模型版本控制啥的,咱们总算能够说,咱们能够在k8s上做点AutoML的开发了。