Upsert api写s3的流程源码分析

Upsert api写s3的流程

milvus版本:v2.3.2

实现:先insert再delete,并限制不能修改主键列。

整体架构:

在这里插入图片描述

Upsert 的数据流向

在这里插入图片描述

upsert写入s3的流程

upsert先insert,再delete。从proxy的execute()方法可以看出。

func (it *upsertTask) Execute(ctx context.Context) (err error) {ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")defer sp.End()log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))// 拿到stream,类型为msgstream.mqMsgStreamstream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)if err != nil {return err}// 创建msgPackmsgPack := &msgstream.MsgPack{BeginTs: it.BeginTs(),EndTs:   it.EndTs(),}// 添加insertMsgPackerr = it.insertExecute(ctx, msgPack)if err != nil {log.Warn("Fail to insertExecute", zap.Error(err))return err}// 添加deleteMsgPackerr = it.deleteExecute(ctx, msgPack)if err != nil {log.Warn("Fail to deleteExecute", zap.Error(err))return err}tr.RecordSpan()// 发送数据至mqerr = stream.Produce(msgPack)if err != nil {it.result.Status = merr.Status(err)return err}sendMsgDur := tr.RecordSpan()metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))totalDur := tr.ElapseSpan()log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),zap.Duration("total duration", totalDur))return nil
}

将insertmsg和deletemsg加入msgPack,然后datanode进行消费。

insert和delete流程分别可以参考对应写入s3的流程。产生insertlog和deletelog。

// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {......// 遍历msMsgfor _, msg := range msMsg.TsMessages() {switch msg.Type() {case commonpb.MsgType_DropCollection:......case commonpb.MsgType_DropPartition:......// 处理insert消息case commonpb.MsgType_Insert:......// 处理delete消息case commonpb.MsgType_Delete:......}}......
}

s3文件

upsert结合了insert和delete操作。因此在s3对应的文件也即insert和delete对应的文件。

主要涉及delta_log和stats_log。

insert:

files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)

delete:

files/delta_log/{collID}/{partID}/{segmentID}/{logID}
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/{logID}(not flushed)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/512476.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Cloud Alibaba一一SentinelResource

SentinelResource 在定义了资源点之后,我们可以通过Dashboard控制台页面来设置限流和降级策略来对资源点进行保护。同时还能通过[**SentinelResource**](/SentinelResource)****注解来制定出现异常时的处理策略 1、属性说明 value 资源名称、必须项、因为需要通过…

基于遗传优化的协同过滤推荐算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 最后得到推荐的商品ID号: 推荐商品的ID号:ans 9838175822191114902149021235224732230712349911790154716550165501655011…

快速上手:在 Android 设备上运行 Pipy

Pipy 作为一个高性能、低资源消耗的可编程代理,通过支持多种计算架构和操作系统,Pipy 确保了它的通用性和灵活性,能够适应不同的部署环境,包括但不限于云环境、边缘计算以及物联网场景。它能够在 X86、ARM64、海光、龙芯、RISC-V …

Java中Sleep和Wait的区别

目录 1、所属类不同 2、作用不同 3、使用场景不同 4、异常处理不同 总结 在Java编程中,我们经常会遇到需要让线程暂停执行的情况。这时,我们可以使用Thread类的sleep()方法和Object类的wait()方法来实现线程的暂停。尽管它们都可以达到暂停线程的目的…

Linux——自写一个简易的shell

目录 前言 一、打印提示信息 二、分割字符串 三、替换程序 前言 之前学习了很多进程相关的知识,包括环境变量、进程的创建与退出、进程等待、进程替换。现在可以用所学的作一个小总结,手撕一个shell解释器,大致的思路是先通过环境变量获…

Java对接快递100实时快递单号查询API接口

目录 1.引入依赖 2.定义配置信息 3.模块结构 4.Controller 5.Service实现类 6.返回数据dto以及dto中的数据dto 7.测试运行 今天也是接到了这个任务,官网有小demo,可以下载下来参考test中代码 官方文档地址: 实时快递查询接口技术文档…

docker的网络配置

文章目录 1、网络模式1.1、bridge模式(默认模式)1.2、host模式 2、bridge模式3、自定义网络 1、网络模式 Docker在创建容器时有四种网络模式:bridge/host/container/none,bridge为默认不需要用–net去指定,其他三种模式需要在创建容器时使用…

【QT】创建第一个QT程序

下面的前7个可以先不看,直接从8开始看 1. 创建Qt程序 一个Qt程序的组成部分:应用程序类,窗口类应用程序类个数:有且只有一个QApplication a;如何查看类对应的模块:光标移动到类上,F1qmake模块的名字 2. …

【易经】-- 伏羲八卦次序图

1、伏羲八卦次序图 ☷☶☵☴☳☲☱☰八卦坤艮坎巽震离兑乾四象太阴少阳少阴太阳两仪阴阳太极太极 2、八“单卦”(经卦) 符号卦名自然象征1☰乾qin天2☱兑du泽3☲离l火4☳震zhn雷5☴巽xn风6☵坎kǎn水7☶艮gn山8☷坤kūn地 3、八卦及所代表的意像

二,几何相交----2,区间相交检测IID

一,算法 对于空间的线段是否相交,假设都是与x平行,则需要三步 1,对各线段左右端点设置为L,R标志 2,从小到大进行排序 3,线性扫描,从小到大,根据模式判断是否相交,假设不相…

⭐每天一道leetcode:27.移除元素(简单;vector)

⭐今日份题目 给你一个数组 nums 和一个值 val,你需要 原地 移除所有数值等于 val 的元素,并返回移除后数组的新长度。 不要使用额外的数组空间,你必须仅使用 O(1) 额外空间并 原地 修改输入数组。 元素的顺序可以改变。你不需要考虑数组中…

录屏、截屏好工具

踩过的坑:用了win11之后,截屏工具就无法安装了,还有sqlcompare也无法安装了。 解决:关闭系统自带的安全机制“实时保护”,再重新安装工具。