本文简要介绍kubeflow,以及他的部署使用方式。最近在调研kubeflow平台,发现网上资料较少,加上它版本迭代较快有些调用使用方式变化,由于工作内容相关跑通了一些他的主要功能如katib参数调优,kubeflow pipline搭建,多用户创建等具体案例。
kubeflow介绍
Kubeflow 是一个专为在 Kubernetes 上部署、管理和扩展机器学习(ML)工作流而设计的开源平台,旨在简化机器学习项目在生产环境中的部署和操作。基于Kubernetes的容器编排和资源管理功能。通过将机器学习工作流拆分为一系列的容器化任务,Kubeflow可以利用Kubernetes的自动扩展、容错和调度功能,确保机器学习任务的高效执行,Kubeflow 提供了一套完整的工具和服务,支持从数据准备、模型训练、调优到部署的整个机器学习生命周期。这种端到端的解决方案能够帮助数据科学家和工程师快速实现从开发到生产的转换。
整体来说kubeflow有以下组件构成
- Kubeflow 中央控制面板为访问 Kubeflow 及其生态系统组件提供了一个经过验证的 Web 界面。作为一个集中式中心,它聚合了集群内各种工具和服务的用户界面,为管理机器学习平台提供了一个统一的接入点。
- Kubeflow 与 Jupyter Notebooks 集成,为数据探索、实验和模型开发提供了一个交互式环境。Notebooks 支持各种编程语言,包括 Python、R 和 Scala,允许用户以协作且可再现的方式创建和执行 ML 工作流。
- Kubeflow Pipelines 让用户能够以有向无环图(DAG)的形式定义和执行复杂的 ML 工作流。Kubeflow Pipelines 提供了一种方法,可编排并自动执行数据预处理、模型训练、评估和部署的端到端流程,从而促进了 ML 项目的可重现性、可扩展性和协作性。Kubeflow Pipelines SDK 是一组 Python 软件包,允许用户精确而高效地定义和执行机器学习工作流。
- Kubeflow Training Operator 为大规模训练机器学习模型提供了工具。这包括支持使用 TensorFlow、PyTorch 和 XGBoost 等框架进行分布式训练。用户可以利用 Kubernetes 的可扩展性和资源管理功能,跨机器集群高效地训练模型。
- Kubeflow Serving 支持用户将经过训练的 ML 模型部署为可扩展的生产就绪型服务。它为使用 TensorFlow Serving、Seldon Core 等流行框架或自定义推理服务器部署模型提供了一致的界面。模型可在实时或批处理场景中部署,通过 HTTP 端点提供预测。
- Kubeflow Metadata 是一个集中式存储库,用于跟踪和管理与 ML 实验、运行和工件相关的元数据。它为整个工作流提供了一致的 ML 元数据视图,可在 ML 项目中实现可重现性、协作和治理。
环境准备
部署kubelfow的前提是有一个集群,我使用的是1.26的kubernetes,用kubeadm安装部署。对应的kubeflow版本为1.8.1
NFS安装
由于安装部署kubeflow的要求是需要有一个default的storage class所以现在给出如何安装部署使用nfs文件系统与nfs-subdir-external-provisioner。
在官方文档中,使用kubeflow mainfest安装的需求是有默认的storageclass,kustomize,与kubectl工具。
由于要使用默认的storage class因此部署使用nfs-subdir-external-provisioner。可以自动的创建和管理pv,pvc关系。因此介绍下对应的nfs,nfs插件的安装部署流程。
通过命令apt install -y nfs-kernel-serve
r安装下载nfs服务。
创建一个数据存储目录 mkdir -p /data/redis
。
修改/etc/exports文件下的内容,添加新的配置目录/data/redis 192.168.0.0/24(rw,sync,no_all_squash,no_subtree_check,no_root_squash)
完成后重启nfs服务
部署NFS-Subdir-External-Provisioner
NFS-Subdir-External-Provisioner是一个自动配置卷程序,它使用现有的和已配置的 NFS 服务器来支持通过持久卷声明动态配置 Kubernetes 持久卷。
在部署使用NFS-subdir服务前你需要一个有nfs服务的节点,并且知道对应的存储目录,在上述示例中他所对应的节点和目录为192.168.0.208,/data/redis
- 创建serviceaccount
apiVersion: v1kind: ServiceAccountmetadata: name: nfs-client-provisioner namespace: default _#_ _替换成你要部署的_ _Namespace_---kind: ClusterRoleapiVersion: rbac.authorization.k8s.io/v1metadata: name: nfs-client-provisioner-runnerrules: - apiGroups: [""] resources: ["persistentvolumes"] verbs: ["get", "list", "watch", "create", "delete"] - apiGroups: [""] resources: ["persistentvolumeclaims"] verbs: ["get", "list", "watch", "update"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["events"] verbs: ["create", "update", "patch"]---kind: ClusterRoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata: name: run-nfs-client-provisionersubjects: - kind: ServiceAccount name: nfs-client-provisioner namespace: defaultroleRef: kind: ClusterRole name: nfs-client-provisioner-runner apiGroup: rbac.authorization.k8s.io---kind: RoleapiVersion: rbac.authorization.k8s.io/v1metadata: name: leader-locking-nfs-client-provisioner namespace: defaultrules: - apiGroups: [""] resources: ["endpoints"] verbs: ["get", "list", "watch", "create", "update", "patch"]---kind: RoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata: name: leader-locking-nfs-client-provisioner namespace: defaultsubjects: - kind: ServiceAccount name: nfs-client-provisioner namespace: defaultroleRef: kind: Role name: leader-locking-nfs-client-provisioner apiGroup: rbac.authorization.k8s.io
- l 部署 NFS-Subdir-External-Provisioner,根据实际情况修改配置参数
apiVersion: apps/v1kind: Deploymentmetadata: name: nfs-client-provisioner labels: app: nfs-client-provisionerspec: replicas: 1 strategy: type: Recreate _##_ _设置升级策略为删除再创建__(__默认为滚动更新__)_ selector: matchLabels: app: nfs-client-provisioner template: metadata: labels: app: nfs-client-provisioner spec: serviceAccountName: nfs-client-provisioner containers: - name: nfs-client-provisioner _#image: gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0_ image: registry.cn-beijing.aliyuncs.com/xngczl/nfs-subdir-external-provisione:v4.0.0 volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME _## Provisioner__的名称__,__以后设置的__storageclass__要和这个保持一致_ value: nfs-client - name: NFS_SERVER _## NFS__服务器地址__,__需和__valumes__参数中配置的保持一致_ value: 192.168.0.208 - name: NFS_PATH _## NFS__服务器数据存储目录__,__需和__valumes__参数中配置的保持一致_ value: /data/redis volumes: - name: nfs-client-root nfs: server: 192.168.0.208 _## NFS__服务器地址_ path: /data/redis _## NFS__服务器数据存储目录_
- 创建NFS StorageClass
apiVersion: storage.k8s.io/v1kind: StorageClassmetadata: name: nfs-storage annotations: storageclass.kubernetes.io/is-default-class: "true" _##_ _是否设置为默认的__storageclass_provisioner: nfs-client _##_ _动态卷分配者名称,必须和上面创建的__"provisioner"__变量中设置的__Name__一致_parameters: archiveOnDelete: "true" _##_ _设置为__"false"__时删除__PVC__不会保留数据__,"true"__则保留数据_
之后kubectl apply -f
上述几个文件。同时给NFS的存储目录加上权限chmod 777 /data/redis
安装kubeflow
在官方gitbub仓库的kubeflow manifest项目中下载对应的二进制文件安装包skustomize_v5.0.3_linux_amd64.tar.gz,解压到/usr/local/bin目录下。
下载官方kubeflow maiinfest项目包mainfests-1.8.1.tar.gz
修改默认存储StorageClass
修改yaml,下面每个文件里面添加 storageClassName: nfs-storage,在mainfests-1.8.1目录下的
apps/katib/upstream/components/mysql/pvc.yaml
common/oidc-client/oidc-authservice/base/pvc.yaml
apps/pipeline/upstream/third-party/minio/base/minio-pvc.yaml
apps/pipeline/upstream/third-party/mysql/base/mysql-pv-claim.yaml
修改镜像拉取策略
由于部分组件的镜像拉取策略为Always,所以修改他们为IfNotPresent,在当前目录下执行命令。find ./ -type f -exec grep -l "imagePullPolicy: Always" {} ;
查找到所有关于imagePullPolicy为Always的文件。在这一步可以不修改,**在后续部署pod的过程中如果有pod所在节点存在镜像但是还是无法拉取镜像并且运行的话,就修改对应服务的镜像拉取策略。
root@master:~/huhu/kubeflow/manifests-1.8.1# find ./ -type f -exec grep -l "imagePullPolicy: Always" {} \;./docs/KustomizeBestPractices.md./contrib/kserve/models-web-app/base/deployment.yaml./contrib/kserve/kserve/kserve.yaml./contrib/kserve/kserve/kserve_kubeflow.yaml./apps/kfp-tekton/upstream/third-party/tekton-custom-task/driver-controller/500-controller.yaml./apps/kfp-tekton/upstream/third-party/kfp-csi-s3/csi-s3-deployment.yaml./apps/kfp-tekton/upstream/v1/third-party/kfp-csi-s3/csi-s3-deployment.yaml./apps/kfp-tekton/upstream/v1/base/cache-deployer/cache-deployer-deployment.yaml./apps/kfp-tekton/upstream/v1/base/cache/cache-deployment.yaml./apps/kfp-tekton/upstream/v1/base/pipeline/ml-pipeline-apiserver-deployment.yaml./apps/kfp-tekton/upstream/v1/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml./apps/kfp-tekton/upstream/base/cache-deployer/cache-deployer-deployment.yaml./apps/kfp-tekton/upstream/base/cache/cache-deployment.yaml./apps/kfp-tekton/upstream/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml./apps/pipeline/upstream/base/cache-deployer/cache-deployer-deployment.yaml./apps/pipeline/upstream/base/cache/cache-deployment.yaml./apps/pipeline/upstream/base/pipeline/ml-pipeline-viewer-crd-deployment.yaml./common/oidc-client/oidc-authservice/base/statefulset.yaml
修改APP_SECURE_COOKIES
修改对应配置文件将APP_SECURE_COOKIES的值设置为false表示不使用加密cookies交互。
Vim ./apps/jupyter/jupyter-web-app/upstream/base/params.env
将其修改为false,否则部署起来后无法通过dashbord访问kubeflow。
同样可以执行find ./ -type f -exec grep -l "APP_SECURE_COOKIES" {} ;命令来查看是否还有其他需要修改的APP_SECURE_COOKIES=false的配置,在我的部署过程中目前看来只需要修改jwp的即可
部署kubeflow
在修改完上述对应的配置后,在mainfest目录下执行命令如下可以自动的部署kubeflow,由于它所依赖的组件过多,因此安装过程中大概率会出现问题,上述的配置应该能解决大部分遇到的问题,但是如果遇到安装失败,pod无法正常Running请自行排查解决。
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
等待pod全部running,期间会出现错误,逐步排查解决对应的pod问题,大多都是镜像无法拉取,可以手动拉取并且分发上传到对应节点上。同时看需求修改对应deployment,statusfulset的镜像拉取策略为IfNotPresent,就可以修复异常pod。
镜像拉取失败
由于国内使用网络环境的问题,在拉取大部分的镜像的时候可能都会出现失败的情况,因此需要自行解决镜像拉取的问题,如可以使用代理的方式手动下载所需的镜像。下面给出一个可以使用的脚本用于将master节点所缺少的镜像同步下载分发到不同节点。
前提是需要sshpass工具。并且节点配置了代理。该工具通过查看所有pod不是running状态并且所需的镜像名字,通过脚本的方式对所需镜像进行下载和打包转发到其他node节点。在脚本中需要配置workernode和用户登录的账户密码。
#!/bin/bash# 定义节点IPWORKER_NODES=("192.168.0.xxx" "192.168.0.xxx")# 登录凭证USER="xxx"PASS="xxx"# 检查是否安装了sshpassif ! command -v sshpass &> /dev/nullthen echo "sshpass could not be found. Please install it first." exit 1fi# 函数:显示进度条show_progress() { local duration=$1 local sleep_interval=0.1 local progress=0 local bar_size=40 while [ $progress -lt 100 ]; do local num_bars=$((progress * bar_size / 100)) printf "\r[%-${bar_size}s] %d%%" $(printf "#%.0s" $(seq 1 $num_bars)) $progress progress=$((progress + 1)) sleep $sleep_interval done echo}echo "==============="echo "检查需要下载的镜像"echo "==============="# 获取所有Pod状态pods=$(kubectl get po -A -o wide)# 提取处于ImagePullBackOff状态的Pod的镜像failed_pods=$(echo "$pods" | awk '$4 ~ /ImagePullBackOff/ {print $2","$1}')failed_images=""for pod_info in $failed_podsdo IFS=',' read -r pod namespace <<< "$pod_info" images=$(kubectl get pod $pod -n $namespace -o jsonpath='{.spec.containers[*].image}') failed_images="$failed_images $images"donefailed_images=$(echo $failed_images | tr ' ' '\n' | sort | uniq)# 检查并添加缺失的标签corrected_images=""for image in $failed_images; do if [[ "$image" != *:* ]]; then # 如果镜像没有标签,默认添加 :latest image="${image}:latest" fi corrected_images="$corrected_images $image"doneif [ -z "$corrected_images" ]; then echo "没有找到需要下载的镜像。" exit 0fiecho "列出所需的镜像:"echo "$corrected_images"echoecho "==============="echo "下载所需镜像"echo "==============="# 在主节点上下载镜像for image in $corrected_imagesdo if [ -f "/tmp/${image##*/}.tar" ]; then echo "跳过下载镜像 $image, 因为它已经存在于/tmp目录中" else # 检查镜像是否包含 .io 域名 if [[ "$image" == *".io"* ]]; then prefixed_image="$image" else echo "镜像 $image 不包含前缀,尝试添加 docker.io" prefixed_image="docker.io/$image" fi echo "正在拉取镜像 $prefixed_image" ctr -n k8s.io i pull "$prefixed_image" if [ $? -eq 0 ]; then echo "导出镜像 $prefixed_image" ctr -n k8s.io i export /tmp/${prefixed_image##*/}.tar "$prefixed_image" show_progress 2 else if [[ "$image" != *".io"* ]]; then echo "尝试使用 gcr.io 前缀拉取镜像 $image" prefixed_image="gcr.io/$image" ctr -n k8s.io i pull "$prefixed_image" if [ $? -eq 0 ]; then echo "导出镜像 $prefixed_image" ctr -n k8s.io i export /tmp/${prefixed_image##*/}.tar "$prefixed_image" show_progress 2 else echo "拉取镜像 $image 失败, 跳过" fi else echo "拉取镜像 $image 失败, 跳过" fi fi fidonefor i in "${!WORKER_NODES[@]}"; do worker=${WORKER_NODES[$i]} node_num=$((i+1)) echo "===========" echo "传输到 node${node_num} 上 (${worker})" echo "===========" for image in $corrected_images do if [ -f "/tmp/${image##*/}.tar" ]; then echo "传输镜像 ${image##*/} 到 node${node_num}" sshpass -p ${PASS} scp -o StrictHostKeyChecking=no /tmp/${image##*/}.tar ${USER}@${worker}:/tmp/ if [ $? -eq 0 ]; then show_progress 3 echo "在 node${node_num} 上导入镜像 ${image##*/}" sshpass -p ${PASS} ssh -o StrictHostKeyChecking=no ${USER}@${worker} "ctr -n k8s.io i import /tmp/${image##*/}.tar && rm /tmp/${image##*/}.tar" show_progress 2 else echo "传输镜像 ${image##*/} 到 node${node_num} 失败, 跳过" fi else echo "跳过传输镜像 ${image##*/} 到 node${node_num}, 因为它不在/tmp目录中" fi donedoneecho "镜像拉取和分发完成。"
修改svc为nodeport模式
由于部署起来的svc全都是cluser ip的类型所以无法直接被外部访问,因此需要手动修改istio-ingressgateway的svc为nodeport类型
k edit svc -n istio-system istio-ingressgateway
通过修改后暴露出来的端口进行访问
默认的登录账号密码为
user@example.com
12341234
使用kubeflow
这里按照模块介绍下 Kubeflow 的几个核心组件。
- Notebook Servers,作为一个管理线上交互实验的记录工具,可以帮助算法人员快速完成算法实验,同时notebook server 提供了统一的文档管理能力。
- AutoML,提供自动化的服务,对特征处理、特征选择、模型选择、模型参数的配置、模型训练和评估等方面,实现了全自动建模,降低算法人员手动实验次数。
- Pipeline,提供一个算法流水线的工程化工具,将算法各流程模块以拓扑图的形式组合起来,同时结合 argo 可以实现 MLOps。
- Serverless,将模型直接发布成一个对外的服务,缩短从实验到生产的路径。
模型开发-Notebook
Kubeflow Notebooks 是 Kubeflow 平台中一个关键的组件,它为数据科学家和机器学习工程师提供了在 Kubernetes 上运行 Jupyter Notebooks 的能力。这一功能的出现极大地简化了在云环境中管理和使用 Jupyter Notebook 的复杂性。通过 Kubeflow Notebooks,用户能够在 Kubernetes 集群中轻松创建和管理多个 Jupyter Notebook 实例。这些实例可以针对特定用户进行资源配置,如 CPU、内存和 GPU,以确保在多用户环境中能够实现高效的资源隔离和使用
通过notebook新建一个task
在配置页面可以设置名你在,对应的cpu,gpu的调配,创建好之后会出现对应的pod。关于gpu的使用可以参考官方文档的介绍https://v1-8-branch.kubeflow.org/docs/components/notebooks/quickstart-guide/
点击connect链接到对应的pod中,在这一步会遇到镜像拉取失败的问题,需要手动拉取镜像到指定节点等待pod运行起来。
**ctr -n k8s.io i pull xxx**
模型训练
在创建notebook的时候可以进行镜像的选择,在这里我们选择带有tensorflow的镜像,就可以直接在里面使用对应的框架。同时还有不同的如pytorch-cuda镜像等提供。创建好后通过如下的示例来跑一个简单的训练代码。
import numpy as npimport tensorflow as tffrom tensorflow.keras.datasets import mnistfrom tensorflow.keras.utils import to_categoricalfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPooling2D_#_ _加载__MNIST__数据集_(train_images, train_labels), (test_images, test_labels) = mnist.load_data()_#_ _预处理数据:调整形状并归一化_train_images = train_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0test_images = test_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0_#_ _将标签转换为_ _one-hot_ _编码_train_labels = to_categorical(train_labels, 10)test_labels = to_categorical(test_labels, 10)_#_ _构建模型_model = Sequential([ Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)), MaxPooling2D(pool_size=(2, 2)), Conv2D(64, kernel_size=(3, 3), activation='relu'), MaxPooling2D(pool_size=(2, 2)), Flatten(), Dense(128, activation='relu'), Dropout(0.5), Dense(10, activation='softmax')])_#_ _编译模型_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])_#_ _训练模型_model.fit(train_images, train_labels, epochs=10, batch_size=64, validation_data=(test_images, test_labels))_#_ _评估模型_test_loss, test_accuracy = model.evaluate(test_images, test_labels, verbose=0)print(f"Test accuracy: {test_accuracy:.4f}")# 保存模型model.save('mnist_cnn_model.keras')print("模型保存成功")
运行后得到结果
在新的jupyter文件中通过import模型来导入。使用自己手写的图片来进行结果的预测。
import numpy as npimport tensorflow as tffrom PIL import Image, ImageOps, ImageFilterimport matplotlib.pyplot as plt_#_ _加载保存的模型_loaded_model = tf.keras.models.load_model('mnist_cnn_model.keras')定义你自己的图片文件名列表image_files = [ 'mnist_test_0_label_9.png', 'mnist_test_2_label_8.png', 'mnist_test_4_label_2.png', 'mnist_test_1_label_0.png', 'mnist_test_3_label_7.png']_#image_files = ['__图片__.jpg', '123.jpg', 'third.jpg', '9.jpg']_def preprocess_image(img): _#_ _转换为灰度图像_ img = img.convert('L') _#_ _自动对比度增强_ img = ImageOps.autocontrast(img) _#_ _裁剪数字的边缘并居中_ img = img.crop(img.getbbox()) _#_ _裁剪非空白区域_ img = img.resize((20, 20), Image.Resampling.LANCZOS) _#_ _调整图像大小,保持最大信息_ background = Image.new('L', (28, 28), 0) _#_ _创建黑色背景_ offset = ((28 - img.size[0]) // 2, (28 - img.size[1]) // 2) background.paste(img, offset) _#_ _将图像粘贴到背景上使其居中_ return background_#_ _创建一个图形,包含__5__行__2__列的子图_fig, axs = plt.subplots(5, 2, figsize=(10, 25))_#fig, axs = plt.subplots(4, 2, figsize=(10, 25))_for i, file in enumerate(image_files): _#_ _加载图像_ img = Image.open(file) _#_ _对图像进行预处理_ processed_img = preprocess_image(img) _#_ _将图像转换为数组并进行标准化,确保形状为_ _(28, 28, 1)_ img_array = np.array(processed_img).reshape(1, 28, 28, 1).astype('float32') / 255.0 _#_ _进行预测_ predictions = loaded_model.predict(img_array) _#_ _获取预测结果_ predicted_digit = np.argmax(predictions[0]) _#_ _显示原始图片_ axs[i, 0].imshow(img, cmap='gray') axs[i, 0].set_title(f'原始图片: {file}') axs[i, 0].axis('off') _#_ _显示处理后的图片_ axs[i, 1].imshow(processed_img, cmap='gray') axs[i, 1].set_title(f'预测结果: {predicted_digit}') axs[i, 1].axis('off') print(f"{file} 预测的数字是: {predicted_digit}")plt.tight_layout()plt.show()
GPU训练
使用gpu镜像会要求我们的集群中存在GPU资源如下所示
当我们选中gpu镜像,想要添加gpu使会提示集群中不存在gpu,所以需要在某个节点插上物理gpu然后再集群中添加operator来使用gpu资源
下面给出安装nvidia-gpu operator的方法:
参考官方安装nvidia-operator链接
下载准备helm3curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 \ && chmod 700 get_helm.sh \ && ./get_helm.sh确保NFD模式是关闭的,如果有开启的那么手动关闭它kubectl get nodes -o json | jq '.items[].metadata.labels | keys | any(startswith("feature.node.kubernetes.io"))'添加helm仓库helm repo add nvidia https://helm.ngc.nvidia.com/nvidia \ && helm repo update部署gpu-operatorhelm install --wait --generate-name \ -n gpu-operator --create-namespace \ nvidia/gpu-operator \ --set driver.version=535
其中需要注意的是对应的驱动版本我用的是A800因此他是535.根据自己的nvidia-gpu型号确定自己的驱动版本。
运行完成后会出现
并且节点出现可调度资源nvidia.com/gpu
打开kubelfow开始使用gpu训练任务。需要再这个页面指定含有cuda的镜像,并且在gpu配置中选择集群中可用的gpu nvidia。
创建好后进入jupyter内创建一个python3工具。在其中可以进行机器学习代码的开发。同时可以使用nvidia-smi命令在pod内部查看和适用到我们的GPU
下面给出使用gpu训练的代码
import osimport tensorflow as tfimport timeimport numpy as npfrom tensorflow.keras import layers, modelsimport matplotlib.pyplot as pltprint("====检查 GPU 可用性====")os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'_#_ _检查_ _GPU_ _是否可用_if tf.test.is_gpu_available(): print("\033[1;32m[GPU 可用] 将进行 GPU 和 CPU 训练对比\033[0m") gpu_device = tf.config.list_physical_devices('GPU')[0] print(f"可用的 GPU: {gpu_device}")else: print("\033[1;31m[GPU 不可用] 只能使用 CPU 进行训练\033[0m") exit()print("\n====加载和预处理数据====")(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255def create_model(): model = models.Sequential([ layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.MaxPooling2D((2, 2)), layers.Conv2D(64, (3, 3), activation='relu'), layers.Flatten(), layers.Dense(64, activation='relu'), layers.Dense(10, activation='softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) return model_# GPU_ _训练_print("\n====开始 GPU 训练====")with tf.device('/GPU:0'): gpu_model = create_model() start_time = time.time() gpu_history = gpu_model.fit(train_images, train_labels, epochs=10, validation_split=0.2, batch_size=64, verbose=1) gpu_time = time.time() - start_time_# CPU_ _训练_print("\n====开始 CPU 训练====")os.environ['CUDA_VISIBLE_DEVICES'] = '-1' _#_ _禁用_ _GPU_with tf.device('/CPU:0'): cpu_model = create_model() start_time = time.time() cpu_history = cpu_model.fit(train_images, train_labels, epochs=10, validation_split=0.2, batch_size=64, verbose=1) cpu_time = time.time() - start_time_#_ _结果对比_print("\n====训练时间对比====")print(f"\033[1;34mGPU 训练时间: {gpu_time:.2f} 秒\033[0m")print(f"\033[1;34mCPU 训练时间: {cpu_time:.2f} 秒\033[0m")print(f"\033[1;32mGPU 加速比: {cpu_time / gpu_time:.2f}x\033[0m")_#_ _绘制训练过程的损失和准确率曲线_def plot_history(history, title): acc = history.history['accuracy'] val_acc = history.history['val_accuracy'] loss = history.history['loss'] val_loss = history.history['val_loss'] epochs = range(1, len(acc) + 1) plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.plot(epochs, loss, 'bo-', label='Training loss') plt.plot(epochs, val_loss, 'ro-', label='Validation loss') plt.title(f'{title} - Training and validation loss') plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() plt.subplot(1, 2, 2) plt.plot(epochs, acc, 'bo-', label='Training accuracy') plt.plot(epochs, val_acc, 'ro-', label='Validation accuracy') plt.title(f'{title} - Training and validation accuracy') plt.xlabel('Epochs') plt.ylabel('Accuracy') plt.legend() plt.show()print("\n====可视化 GPU 训练过程====")plot_history(gpu_history, "GPU Training")print("\n====可视化 CPU 训练过程====")plot_history(cpu_history, "CPU Training")_#_ _评估_ _GPU_ _模型_print("\n====评估 GPU 训练的模型====")test_loss, test_acc = gpu_model.evaluate(test_images, test_labels, verbose=0)print(f'\n\033[1;32mTest accuracy: {test_acc:.4f}\033[0m')_#_ _保存_ _GPU_ _训练的模型_print("\n====保存 GPU 训练的模型====")gpu_model.save('mnist_model_gpu.keras')print("模型已保存为 mnist_model_gpu.keras")
这段代码的主要功能是通过对比 GPU 和 CPU 在相同任务上的训练表现,来体现 GPU 的强大计算能力,尤其是在深度学习任务中的显著优势。代码首先检查当前环境中是否可用 GPU,并根据设备的可用性分别在 GPU 和 CPU 上训练一个简单的卷积神经网络(CNN),该网络用于对 MNIST 手写数字数据集进行分类。
同时在训练的过程中使用nvidia-smi命令可以查看到硬件GPU的使用情况
比如上图中可以看出在训练过程中GPU的利用率分别在9%左右,说明我们的任务成功的调用了GPU进行计算。
pipline
Kubeflow Pipelines 是 Kubeflow 项目中的一个核心模块,专注于构建、部署和管理复杂的机器学习工作流。它提供了一整套用于设计和自动化机器学习流水线的工具,使数据科学家和工程师能够更加高效地构建、管理和监控机器学习模型的训练和部署过程。通过 Kubeflow Pipelines,用户可以轻松地定义、分享、重用和自动化复杂的工作流。
在pipline中可以上传制作好的.gz文件yaml文件等。创建好流程图后会有如下图形界面上显示。具体的制作过程可以参考官方文档https://www.kubeflow.org/docs/components/pipelines/getting-started/
import kfpfrom kfp import dslfrom kfp.dsl import component, Input, Output, Dataset, Model_# Step 1:_ _数据下载和预处理_@component( base_image='python:3.8-slim', packages_to_install=[ 'pandas', 'scikit-learn', 'joblib', 'numpy', 'requests' ])def preprocess_data_op(output_data: Output[Dataset]): print("开始执行 preprocess_data_op...") try: import pandas as pd print("成功导入 pandas 模块。") except ImportError as e: print(f"导入 pandas 失败: {e}") raise e from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import os print("正在下载数据集...") url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv" columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome'] data = pd.read_csv(url, names=columns) print("数据集下载完成。") _#_ _数据清洗和特征工程_ print("正在进行数据清洗和特征工程...") X = data.drop('Outcome', axis=1) y = data['Outcome'] _#_ _标准化特征_ print("正在标准化特征...") scaler = StandardScaler() X_scaled = scaler.fit_transform(X) _#_ _划分训练集和测试集_ print("正在划分训练集和测试集...") X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42) _#_ _保存预处理后的数据_ print("正在保存预处理后的数据...") os.makedirs(output_data.path, exist_ok=True) pd.DataFrame(X_train).to_csv(os.path.join(output_data.path, 'X_train.csv'), index=False) pd.DataFrame(X_test).to_csv(os.path.join(output_data.path, 'X_test.csv'), index=False) pd.DataFrame(y_train).to_csv(os.path.join(output_data.path, 'y_train.csv'), index=False) pd.DataFrame(y_test).to_csv(os.path.join(output_data.path, 'y_test.csv'), index=False) print(f"数据预处理完成并已保存到 {output_data.path}。")_# Step 2:_ _模型训练_@component( base_image='python:3.8-slim', packages_to_install=[ 'pandas', 'scikit-learn', 'joblib', 'numpy' ])def train_model_op(input_data: Input[Dataset], output_model: Output[Model]): print("开始执行 train_model_op...") try: import pandas as pd print("成功导入 pandas 模块。") except ImportError as e: print(f"导入 pandas 失败: {e}") raise e from sklearn.linear_model import LogisticRegression import joblib import os print("正在加载训练数据...") X_train = pd.read_csv(os.path.join(input_data.path, 'X_train.csv')) y_train = pd.read_csv(os.path.join(input_data.path, 'y_train.csv')) _#_ _训练模型_ print("正在训练模型...") model = LogisticRegression() model.fit(X_train, y_train.values.ravel()) _#_ _创建输出目录并保存模型_ os.makedirs(output_model.path, exist_ok=True) _#_ _确保输出目录存在_ model_path = os.path.join(output_model.path, 'trained_model.joblib') joblib.dump(model, model_path) print(f"模型训练完成并已保存到 {model_path}。")_# Step 3:_ _模型评估_@component( base_image='python:3.8-slim', packages_to_install=[ 'pandas', 'scikit-learn', 'joblib', 'numpy' ])def evaluate_model_op(input_data: Input[Dataset], input_model: Input[Model]): print("开始执行 evaluate_model_op...") try: import pandas as pd print("成功导入 pandas 模块。") except ImportError as e: print(f"导入 pandas 失败: {e}") raise e from sklearn.metrics import accuracy_score import joblib import os # 添加os模块的导入 print("正在加载测试数据和模型...") X_test = pd.read_csv(os.path.join(input_data.path, 'X_test.csv')) y_test = pd.read_csv(os.path.join(input_data.path, 'y_test.csv')) model = joblib.load(os.path.join(input_model.path, 'trained_model.joblib')) _#_ _预测和评估_ print("正在进行模型预测和评估...") y_pred = model.predict(X_test) accuracy = accuracy_score(y_test, y_pred) print(f"模型准确率: {accuracy}")# Step 4: Pipeline 定义@dsl.pipeline( name='Diabetes Classifier Pipeline', description='A pipeline to train and evaluate a diabetes classifier model')def diabetes_pipeline(): preprocess = preprocess_data_op() train = train_model_op(input_data=preprocess.outputs['output_data']) evaluate = evaluate_model_op(input_data=preprocess.outputs['output_data'], input_model=train.outputs['output_model'])# Compile the pipelineif __name__ == "__main__": kfp.compiler.Compiler().compile(diabetes_pipeline, 'diabetes_pipeline.yaml')
上述是一个可以使用的pipline文件,在pipline中打开主要流程如下所示
运行之后可能还需要一个pvc文件
apiVersion: v1kind: PersistentVolumeClaimmetadata: name: kubeflow-test-pv namespace: kubeflow-user-example-comspec: accessModes: - ReadWriteOnce resources: requests: storage: 128Mi
运行之后的结果
这个Python文件主要定义了一个基于Kubeflow Pipelines (KFP) 的数据处理与机器学习模型训练与评估的Pipeline,用于处理糖尿病分类任务。
- 文件中首先定义了 preprocess_data_op 组件,用于下载糖尿病数据集并进行数据预处理。具体操作包括数据下载、数据清洗、特征标准化以及训练集和测试集的划分。预处理后的数据被保存到指定的输出路径中,以便后续的模型训练和评估使用。
- 接下来,文件定义了 train_model_op 组件,用于使用预处理后的训练数据来训练一个逻辑回归模型。训练好的模型会被保存到指定的输出路径中。通过使用 scikit-learn 库,组件能够快速实现模型的训练,并为后续的模型评估步骤提供一个已训练的模型文件。
- 文件中还定义了 evaluate_model_op 组件,用于加载测试数据和训练好的模型,并使用测试数据对模型进行预测和评估。该组件计算并输出模型的准确率,作为模型性能的衡量标准。评估结果可以用于判断模型的有效性。
- 最后,文件定义了一个名为 diabetes_pipeline 的Pipeline,将上述三个组件串联在一起。通过 dsl.pipeline 装饰器,将数据预处理、模型训练和模型评估的步骤组合成一个完整的工作流。该Pipeline最终被编译为一个YAML文件(diabetes_pipeline.yaml),可以在Kubeflow环境中部署和运行。
总体而言,这个示例展示了 Argo Workflow 的基本用法,通过定义简单的任务和依赖关系,实现了在 Kubernetes 环境中的自动化工作流管理
对于需要多次执行,或者不同训练任务有共同的预处理项目。可以根据名字来使用之前处理好的缓存。提升训练效率
katib
AutoML 是机器学习比较热的领域,主要用来模型自动优化和超参数调整,这里其实是用的 Katib来实现的,一个基于k8s的 AutoML 项目,详细见https://github.com/kubeflow/katib
Katib 主要提供了 超参数调整(Hyperparameter Tuning),早停法(Early Stopping)和神经网络架构搜索(Neural Architecture Search) Katib 支持多种超参数优化算法,包括随机搜索、贝叶斯优化和 HyperBand。这些算法可以帮助开发者在给定的参数空间中高效地搜索最佳超参数配置。通过定义实验,用户可以指定要调整的超参数及其范围、优化目标(如准确率或损失),以及使用的搜索算法。Katib 还支持提前停止策略,允许在训练过程中根据模型性能动态终止不佳的实验,从而节省计算资源。此外,Katib 不仅限于超参数调优,还提供了神经网络结构搜索(Neural Architecture Search, NAS)的功能。尽管这一功能仍在不断完善中,但它为用户提供了探索不同网络架构的可能性,进一步提升模型的表现。
新建文件notebooks如6.1节所示。创建好一个py文件后,在terminal中执行下面命令下载所需的库。
pip install kubeflow-katib
import kubeflow.katib as katib_# Step 1. Create an objective function._def objective(parameters): _# Import required packages._ import time time.sleep(5) _# Calculate objective function._ result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2 _# Katib parses metrics in this format: <metric-name>=<metric-value>._ print(f"result={result}")_# Step 2. Create HyperParameter search space._parameters = { "a": katib.search.int(min=10, max=20), "b": katib.search.double(min=0.1, max=0.2)}_# Step 3. Create Katib Experiment._katib_client = katib.KatibClient()name = "tune-experiment"katib_client.tune( name=name, objective=objective, parameters=parameters, objective_metric_name="result", max_trial_count=12)_# Step 4. Get the best HyperParameters._print(katib_client.get_optimal_hyperparameters(name))
运行后可以在autoML中查看参数优化文件和整体流程。
这段代码通过定义一个目标函数来自动进行超参数优化,使用随机搜索算法对该目标函数的形式是 result=4a - b^2,其中参数 a 是整数,范围为 10 到 20,参数 b 是浮点数,范围为 0.1 到 0.2。首先,代码定义了这个目标函数,然后创建了超参数的搜索空间。接着,使用Kubeflow Katib的 KatibClient 启动实验,并指定目标函数、参数范围和最大实验次数(12次)。最后,通过 katib_client.get_optimal_hyperparameters(name) 方法获取并输出最优的超参数组合,从而实现自动化的超参数调优,以优化指定的目标函数
a的取值范围为10-20,b的取值范围为0.1-0.2。在这个条件下使用随机搜索算法找到F的最大值。
下面给出一个更加复杂的计算式与使用其他算法计算的示例:
import kubeflow.katib as katib_# Step 1. Create an objective function._def objective(parameters): _# Import required packages._ import time import math time.sleep(5) _# Calculate a more complex objective function._ a = int(parameters["a"]) b = float(parameters["b"]) _#_ _复杂的目标函数示例:结合多项式、对数和三角函数的组合_ result = (3 * a ** 2 + 2 * b - math.sin(a * b)) / (1 + math.log(b + 0.1)) + math.sqrt(abs(a - b)) _# Katib parses metrics in this format: <metric-name>=<metric-value>._ print(f"result={result}")_# Step 2. Create HyperParameter search space._parameters = { "a": katib.search.int(min=10, max=20), "b": katib.search.double(min=0.1, max=0.2)}_# Step 3. Create Katib Experiment._katib_client = katib.KatibClient()name = "tune-experiment-for-kalibbase"katib_client.tune( name=name, objective=objective, parameters=parameters, algorithm_name="bayesianoptimization",objective_metric_name="result",#objective_type="minimize", # 指定最小化目标函数 max_trial_count=12)_# Step 4. Get the best HyperParameters._print(katib_client.get_optimal_hyperparameters(name))
用更加复杂的函数去优化参数,同时使用了贝叶斯搜索算法
多用户使用kubeflow
Kubelfow支持使用profile的方式创建新用户。编辑一下文件创建用户
apiVersion: kubeflow.org/v1beta1kind: Profilemetadata: name: test _#_ _用户__namespace_spec: owner: kind: User name: huhu@test.com _#_ _用户名_
通过Kubectl get profile来查看创建状态
之后由于kubeflow是通过configmap来管理用户信息的。所以需要修改dex组件的configmap
Kubectl edit cm -n auth dex
- email: huhu@test.com hash: $xxxxxordASFdg3Vmi username: xxx
其中email为刚刚创建的用户的账户,hash为登录的密码但是以hash的方式展示。
需要apt install python3-pip 安装pip。
创建密码方式前提为pip install passlib bcrypt 之后执行一下的python代码
python3 -c 'from passlib.hash import bcrypt; import getpass; print(bcrypt.using(rounds=12, ident="2y").hash(getpass.getpass()))'
执行命令后输入想要加密的登录密码,如huhu,之后会生成一段hash值,将其填入上述的cm配置中。保存退出后,重启pod就可以用新用户登录kubeflow
将kubeflow与向量数据库milvus结合使用
对于大量模型训练来说,有一个好的向量数据库可以更加方便的处理数据的输入和输出,因此现在给出将kubeflow与milvus向量数据库结合起来使用。
部署milvus
详见我的另一篇技术文档”向量化数据库milvus”
在jupyter中使用k8s集群中的milvus
由于milvus是部署在k8s集群中的,因此在kubeflow中的jupyter中使用需要将milvus准备好。下面给出一个直接的示例
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataTypeimport numpy as np_#_ _连接到_ _Milvus_ _实例_connections.connect(alias="default", host="192.168.0.208", port="31011")print("已连接到 Milvus")_#_ _创建集合_fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128)]schema = CollectionSchema(fields, description="用于演示的集合")collection = Collection(name="demo_collection_2", schema=schema) _#_ _使用新的集合名称_print(f"集合 {collection.name} 已创建")_#_ _插入向量数据_vectors = np.random.random((20, 128)).astype(np.float32) _#_ _插入__20__个向量_collection.insert([list(range(20)), vectors])print("已将数据插入集合")_#_ _为向量字段创建索引_index_params = { "metric_type": "L2", "index_type": "IVF_FLAT", _#_ _选择适合的索引类型_ "params": {"nlist": 128}}collection.create_index(field_name="vector", index_params=index_params)print("索引已创建")_#_ _加载集合_collection.load()print("集合已加载到内存")_#_ _查询向量数据_search_params = {"metric_type": "L2", "params": {"nprobe": 10}}query_vectors = np.random.random((5, 128)).astype(np.float32) _#_ _查询__5__个向量_search_results = collection.search(query_vectors, "vector", search_params, limit=3)for i, result in enumerate(search_results): print(f"查询向量 {i+1} 的前三个结果: {result}")
在jupyter中新建一个python文件,将上述代码复制进去。处理好依赖相关的库运行。
milvus存储非结构化数据
由于milvus是向量数据库,因此下面给出一个可以将图片作为非结构数据存储到数据库中的方式
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataTypeimport numpy as npfrom tensorflow.keras.applications.resnet50 import ResNet50, preprocess_inputfrom tensorflow.keras.preprocessing import imagefrom tensorflow.keras.models import Modelfrom PIL import Imageimport matplotlib.pyplot as pltimport time# 连接到 Milvus 实例connections.connect(alias="default", host="192.168.0.208", port="31011")print("已连接到 Milvus")# 加载预训练的ResNet50模型并去掉最后的分类层,用于特征提取base_model = ResNet50(weights='imagenet')model = Model(inputs=base_model.input, outputs=base_model.get_layer('avg_pool').output)# 动态生成集合名称,避免冲突collection_name = f"image_collection_{int(time.time())}"# 定义集合fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 使用 auto_id 自动生成ID FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=2048), # ResNet50的输出是2048维向量 FieldSchema(name="img_path", dtype=DataType.VARCHAR, max_length=255) # 存储图片路径]schema = CollectionSchema(fields, description="用于存储图片向量的集合")collection = Collection(name=collection_name, schema=schema)print(f"集合 {collection.name} 已创建")# 图片列表image_paths = ["3.jpg", "7.jpg", "9.jpg", "third.jpg"]# 提取并存储图片特征向量和路径all_features = []all_paths = []for img_path in image_paths: # 加载图片并进行预处理 img = image.load_img(img_path, target_size=(224, 224)) x = image.img_to_array(img) x = np.expand_dims(x, axis=0) x = preprocess_input(x) # 提取图片的特征向量 features = model.predict(x).flatten().tolist() # 转换为列表以插入 Milvus # 收集数据 all_features.append(features) all_paths.append(img_path)# 将所有图片的特征向量和路径存储到 Milvus 中entities = [ all_features, # 向量数据 all_paths # 路径数据]collection.insert(entities)print(f"图片已存储")# 为向量字段创建索引index_params = { "metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 128}}collection.create_index(field_name="vector", index_params=index_params)print("索引已创建")# 加载集合collection.load()print("集合已加载到内存")# 查询与某张图片最相似的图片query_img_path = "third.jpg" # 要查询的图片# 加载查询图片并提取特征向量img = image.load_img(query_img_path, target_size=(224, 224))x = image.img_to_array(img)x = np.expand_dims(x, axis=0)x = preprocess_input(x)query_features = model.predict(x).flatten().tolist()# 查询与这张图片最相似的图片(只返回一个结果)search_params = {"metric_type": "L2", "params": {"nprobe": 10}}search_results = collection.search([query_features], "vector", search_params, limit=1, output_fields=["img_path"])# 读取查询结果并展示图片for hits in search_results: for hit in hits: result_img_path = hit.entity.get('img_path') if result_img_path: try: result_img = Image.open(result_img_path) plt.imshow(result_img) plt.title(f"检索到的图片: {result_img_path}") plt.axis('off') plt.show() except Exception as e: print(f"无法打开图片 {result_img_path}: {e}") else: print("未找到 'img_path' 字段")
运行后得到结果如下所示,我们通过上述的代码存储了4张图片进入milvus由于是非结构化向量存储,所以我们首先需要对图片进行向量化处理,代码中通过一个模型的最后一层输入,将一个图片分解为2048维的向量。通过将4张图片都分解后用一个2048维度的数组表示一张图片
最后通过查询的方式,将我们想要的照片从数据库中查询出来。通过这个例子很好的展示了如何使用向量化数据库milvus以及如何将一个图片进行向量化存储与查询的方式。
总结
一个简单的machine learning运行流程如上所示
整个流水线包括以下几部分:
- 构建快速算法实验的环境(experimentation),这里的步骤已经过编排,各个步骤之间的转换是自动执行的,这样可以快速迭代实验,并更好地准备将整个流水线移至生产环境,在这个环境中算法研究员只进行模块内部的工作。
- 构建可复用的生产环境流水线,组件的源代码模块化,实验环境模块化流水线可以直接在 staging 环境和 production 环境中使用。
- 持续交付模型,生产环境中的机器学习流水线会向使用新数据进行训练的新模型持续交付预测服务。
基于上述功能描述我们其实可以基于 kubeflow 的 pipeline 和 kfserving 功能轻松实现一个简单的 MLOps 流水线发布流程。Kubeflow 是一个开源的机器学习平台,专为 Kubernetes 设计,旨在简化机器学习工作流的部署和管理。它将多种机器学习工具和框架整合到一个统一的生态系统中,提供了从数据准备到模型训练、优化和部署的全生命周期管理。
Kubeflow 的设计理念是提供一个全面、易用、可扩展的机器学习平台,利用 Kubernetes 的核心优势,如自动化部署、扩展和管理容器化应用程序。对于机器学习项目来说,Kubeflow 不仅提高了开发和部署的效率,还确保了解决方案的可移植性和可维护性。对于希望在 Kubernetes 上运行机器学习工作负载的团队而言,Kubeflow 提供了强大的工具和资源,使得机器学习的创新和实施更加便捷和高效。
Kubeflow的工作全都可以在jupyter中完成,如可以在jupyter中创建pipline,创建kalib等。这些通过调用api的方式都可以直接创建生成对应的workflow。让机器学习工程师在使用的过程中方便的管理自己的模型并且可以便利的进行参数调优以及通过搭建构建pipline的方式让其完成一系列流水线的操作,如数据清洗,批量操作。