milvus对象存储和消息中间件的工厂设计模式分析

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:type: pulsarcommon:storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone          boolchunkManagerFactory storage.FactorymsgStreamFactory    msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{persistentStorage: persistentStorage,config:            c,}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer             func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize    int64MQBufSize         int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig            *paramtable.KafkaConfigReceiveBufSize    int64MQBufSize         int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress    stringPulsarWebAddress stringReceiveBufSize   int64MQBufSize        int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant     stringPulsarNameSpace  stringRequestTimeout   time.DurationmetricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {ctx              context.Contextclient           mqwrapper.Clientproducers        map[string]mqwrapper.ProducerproducerChannels []stringconsumers        map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc    RepackFuncunmarshal     UnmarshalDispatcherreceiveBuf    chan *MsgPackcloseRWMutex  *sync.RWMutexstreamCancel  func()bufSize       int64producerLock  *sync.RWMutexconsumerLock  *sync.Mutexclosed        int32onceChan      sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf         map[mqwrapper.Consumer][]TsMsgchanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan       map[mqwrapper.Consumer]chan boolchanTtMsgTime      map[mqwrapper.Consumer]TimestampchanMsgBufMutex    *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup      *sync.WaitGrouplastTimeStamp      TimestampsyncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client//	ctx        context.ContextbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage//	ctx        context.ContextbucketName stringrootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/642182.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何编写有效的接口测试?

导读&#xff1a;在所有的开发测试中&#xff0c;接口测试是必不可少的一项。有效且覆盖完整的接口测试&#xff0c;不仅能保障新功能的开发质量&#xff0c;还能让开发在修改功能逻辑的时候有回归的能力&#xff0c;同时也是能优雅地进行重构的前提。编写接口测试要遵守哪些原…

把 WordPress 变成 BaaS 服务:API 调用指南

有了前面两篇内容的铺垫&#xff0c;我们来聊聊 WordPress 作为 CMS / BaaS 服务使用时绕不开的问题&#xff0c;API 调用。 这篇内容同样的&#xff0c;会尽量少贴代码&#xff0c;简单的讲清楚一件事&#xff0c;降低阅读负担。 写在前面 首先&#xff0c;我们需要进行清晰…

开发同城O2O跑腿系统源码:构建高效便捷的本地服务平台教程

为了满足用户对便捷的需求&#xff0c;今天我们将一同探讨如何开发一个高效便捷的同城O2O跑腿系统&#xff0c;以构建一个功能全面、操作简单的本地服务平台。 一、确定需求和功能 在开发同城O2O跑腿系统之前&#xff0c;首先需要明确系统的需求和功能。用户可以通过该系统发布…

智能仓储物流系统(Wms)系列

好的应用系统应是细分简单&#xff0c;界面简洁易操作&#xff0c;程序代码简洁易懂的。 大模块划分&#xff1a; 入库&#xff0c;收货&#xff0c;上架、出库&#xff0c;分配&#xff0c;发货、管理&#xff0c;查询&#xff0c;调整、基础数据、系统管理 入库&#xff0…

一维递归:递去

示例&#xff1a; /*** brief how about recursive-forward-1? show you here.* author wenxuanpei* email 15873152445163.com(query for any question here)*/ #define _CRT_SECURE_NO_WARNINGS//support c-library in Microsoft-Visual-Studio #include <stdio.h>…

国内首个48小时大模型极限挑战赛落幕,四位“天才程序员”共同夺冠

4月21日晚&#xff0c;第四届ATEC科技精英赛&#xff08;ATEC2023&#xff09;线下赛落幕。本届赛事以大模型为技术基座&#xff0c;围绕“科技助老”命题&#xff0c;是国内首个基于真实场景的大模型全链路应用竞赛。ATEC2023线下赛采用48小时极限挑战的形式&#xff0c;来自东…

世界读书日 | 开发者必读书单重磅来袭,华为云DTSE专家天团力荐

春色恰如许&#xff0c;读书正当时。 读书&#xff0c;就像解锁一把神秘钥匙&#xff0c;为开发者洞开新世界的大门&#xff0c;赋予他们破译复杂难题的能力、挑战未知领域的勇气。书页翻动间&#xff0c;开发者得以站在巨人的肩膀上&#xff0c;汲取前人经验&#xff0c;积蓄…

嵌入式学习59-ARM8(中断,ADC,内核定时器和传感器)

什么是中断顶半部和底半部 &#xff1f; &#xff08;部分记忆&#xff09;背 上半部&#xff1a; …

循环队列中学习

循环队列中&#xff0c;由于入队时尾指针向前追赶头指针;出队时头指针向前追赶尾指针&#xff0c;造成队空和队满时头尾指针均相等。因此&#xff0c;无法通过条件frontrear来判别队列是"空"还是"满"。 解决这个问题的方法至少有两种: ① 另设一布尔变量…

Unity的旋转实现一些方法总结(案例:通过输入,玩家进行旋转移动)

目录 1. Transform.Rotate 方法 使用 2. Transform.rotation 或 Transform.localRotation 属性与四元数 使用方式&#xff1a; 小案例 &#xff1a;目标旋转角度计算&#xff1a;targetRotation&#xff08;Quaternion类型&#xff09; 玩家发现敌人位置&#xff0c;玩家…

Qt绘制边框有阴影兼容性问题

在Qt开发过程中&#xff0c;有时候我们要显示一个有阴影的对话框&#xff0c;这时一般采用自定义实现&#xff0c;然而最近在开发时软件时&#xff0c;Win11上显示正常&#xff0c;Win10或其他Win11电脑显示不正常&#xff0c;存在兼容性问题吗&#xff1f; 下面是具体的源码 …