milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (connections,Collection,
)num_entities, dim = 4, 3print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")hello_milvus = Collection("hello_milvus")print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [[0,1,2,4000],[10,11,12,4000],rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {......// request封装为upsertTaskit := &upsertTask{baseMsg: msgstream.BaseMsg{HashValues: request.HashKeys,},ctx:       ctx,Condition: NewTaskCondition(ctx),req:       request,result: &milvuspb.MutationResult{Status: merr.Success(),IDs: &schemapb.IDs{IdField: nil,},},idAllocator:   node.rowIDAllocator,segIDAssigner: node.segAssigner,chMgr:         node.chMgr,chTicker:      node.chTicker,}......// 将task压入dmQueue队列if err := node.sched.dmQueue.Enqueue(it); err != nil {......}......// 等待任务执行完if err := it.WaitToFinish(); err != nil {......}......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/496339.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AD9851——FPGA调试(并行模式)

AD9851——FPGA调试(并行模式) 工程功能:使用FPGA来调试AD9851芯片,使用的是并行模式 芯片手册:AD9851 CMOS 180 MHz DDS/DAC Synthesizer Data Sheet (Rev. D) (analog.com) 管脚功能 管脚名称管脚功能D0-D78位数据输…

水库安全监测方案(福建地区水库安全监测案例分享)

我司星创易联最近在福建省受到了一个水库安全监测系统项目的委托。该水库位于福建中部山区,作为该地区的重要防洪与供水工程,对下游数十万人的生活产生重大影响。但是因为水库附近地质情况复杂,水库大坝在多次洪水冲击下出现一定病害,亟须全面加强对水库大坝安全状况的监测,以确…

Rocky Linux 运维工具yum

一、yum的简介 ​​yum​是用于在基于RPM包管理系统的包管理工具。用户可以通过 ​yum​来搜索、安装、更新和删除软件包,自动处理依赖关系,方便快捷地管理系统上的软件。 二、yum的参数说明 1、install 用于在系统的上安装一个或多个软件包 2、seach 用…

常用git 打tag命令

1.查看所有tag git tag 2.创建 v5.0.0的tag git tag v5.0.0 git tag (创建后查看) 3.推送到远程tag git push origin v5.0.0 4.删除远程tag git push origin --delete v5.0.0 5.删除本地tag git tag -d v5.0.0 6.添加带有备注信息的tag git tag v5.…

golang学习5,glang的web的restful接口

1. //返回json r.GET("/getJson", controller.GetUserInfo) package mainimport (/*"net/http"*/"gin/src/main/controller""github.com/gin-gonic/gin" )func main() {r : gin.Default()r.GET("/get", func(ctx *…

Oracle中序列

1. Sequence 定义 在Oracle中可以用SEQUENCE生成自增字段。Sequence序列是Oracle中用于生成数字序列的对象,可以创建一个唯一的数字作为主键。 2. 为什么要用 Sequence 你可能有疑问为什么要使用序列? 不能使用一个存储主键的表并每次递增吗&#xf…

golang使用gorm操作mysql1

1.mysql连接配置 package daoimport ("fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger" )var DB *gorm.DB// 连接数据库,启动服务的时候,init方法就会执行 func init() {username : "roo…

【virtual Box】功能速通:安装 Windows 和 Ubuntu

文章目录 一、虚拟机1.1 概述1.2 virtual box概述 二、新建虚拟机、删除、注册三、虚拟机内部设置3.1 安装增强功能驱动3.2 分辨率问题3.3 网络链接方式 一、虚拟机 1.1 概述 虚拟机(Virtual Machine,VM)是一种软件实现的计算机系统&#x…

EMR StarRocks实战——Mysql数据实时同步到SR

文章摘抄阿里云EMR上的StarRocks实践:《基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks》 前言 CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步。下文主要介绍如何使用Flink平台和E-MapRed…

【appium】App类型、页面元素|UiAutomator与appium|App元素定位

目录 一、App前端基础知识 1、App类型划分 2、App类型对比 3、App页面元素 App页面元素分为布局和控件两种 常见布局: 常见控件:定位软件:appium和sdk自带的uiautomatorviewer都可以定位 二、App元素定位 1、id定位 2、text定位 3…

Pytorch添加自定义算子之(5)-配置GPU形式的简单add自定义算子

参考:https://zhuanlan.zhihu.com/p/358778742 一、头文件 命名为:add2.h void launch_add2(float *c,const float *a,const float *b,int n);

docker desktop windows 下载

下载地址 xDocker: Accelerated Container Application Development 安装步骤:下一步,下一步即可