利用流处理实现实时生成式 AI 应用

news/2025/2/22 18:15:43/文章来源:https://www.cnblogs.com/jellyai/p/18715843

在过去的十年里,我们已经显著转向实时、数据中心化的应用程序。无论是在电子商务推荐、欺诈检测还是物联网事件分析中,用户现在都期望通过持续的数据流获得即时且符合上下文的响应。MongoDB 灵活的文档模型和实时能力使其非常适用于这些动态工作负载,尤其是当它与 Kafka 等流处理解决方案配对时。

这种对实时 AI 的需求在即时生成向量嵌入时尤为重要,而向量嵌入对于语义搜索、个性化推荐和生成式 AI 助手等应用至关重要。流处理解决方案允许嵌入随着新数据的到来而更新,从而确保 AI 驱动的洞察始终保持新鲜和准确。

在本文中,我们将探讨如何利用 MongoDB 流处理来支持 AI 应用程序的实时嵌入生成,确保模型能够随最新的产品、用户和内容更新保持同步。

                                    照片由 Felix Dubois-Robert 拍摄,发布于 Unsplash

为什么实时 AI 和嵌入生成很重要?

生成式 AI 已成为主流,推动着高质量文本生成、智能搜索以及个性化推荐等应用。一个关键技术——检索增强生成(Retrieval-Augmented Generation,RAG),通过在生成过程中注入最新的、特定领域的数据(以嵌入的形式)来增强 AI 模型。嵌入充当 AI 系统的语义记忆,使其能够理解产品之间的关系、用户偏好和上下文查询。

然而,AI 驱动的应用程序只有在使用最新可用数据时才能提供相关且准确的响应。如果缺乏实时更新,推荐系统可能会推荐已售罄的产品,或者 AI 聊天机器人可能会提供过时的定价信息。要支持这些 AI 驱动的功能,就需要一个能够处理实时更新和高吞吐量操作工作负载的基础架构。

通过利用流处理,我们可以在数据更新发生时立即将原始数据转换为嵌入。这确保了 AI 模型在生成响应之前检索到最新的信息,从而提高准确性、可靠性和用户体验。

所有图片均由作者创建

                                                  图 1:检索增强生成(RAG)的简单结构

为什么选择 MongoDB 进行实时操作数据处理?

MongoDB 不仅是一款强大的操作型数据库,而且在实时 AI 工作负载(尤其是即时嵌入生成)方面也是天然契合的。它能够处理高频率更新、灵活的模式变更,并支持实时索引,使其非常适用于嵌入随新数据持续演变的流式应用。

对于以快速插入、更新和查询为特征的操作型(OLTP)工作负载,MongoDB 提供高吞吐量和低延迟,使其非常适用于以下需求的应用程序:

• 高效的读/写模式 —— 其文档模型能够适应复杂、不断变化的数据(例如产品目录),而不受刚性模式的限制。

• 水平扩展能力 —— 分片机制可在数据量和请求增长时保持一致的性能。

• 多功能查询能力 —— 除了标准的 CRUD 操作外,它还支持临时查询、先进的索引技术以及用于 AI 搜索和推荐的向量索引(可在 MongoDB Atlas 中使用)。

此外,MongoDB 可以通过连接器(如 Kafka Connect 或 Atlas 集群)无缝集成到流处理架构中,以摄取、转换并存储从产品、用户或事件更新中生成的实时嵌入。这使得低延迟 AI 应用程序成为可能,确保向量搜索和 RAG 工作流能够使用最新数据运行。

将 RAG 与实时数据管道结合

检索增强生成(RAG)的强大之处在于使用最相关、最新的数据来指导大语言模型的响应。如果缺乏最新的嵌入,AI 驱动的应用程序可能会导致以下问题:

• 错误的推荐 —— 建议已过时的商品或已售罄的产品。

• 不准确的产品信息 —— 显示旧的价格、商品规格或库存情况。

• 降低用户满意度 —— 当系统“知道的”比网站的静态列表还少时,用户会感到沮丧。

MongoDB 流处理可确保新的嵌入能够立即用于 AI 驱动的搜索、问答和推荐,从而避免这些问题。

想象一下,某位顾客在电子商务网站上搜索“无线降噪耳机”。如果 AI 助手依赖于过时的嵌入,它可能会推荐一款已停产的产品,或者无法展示最新发布的型号。实时流处理确保嵌入始终保持最新,以便 AI 助手能够检索到最相关的产品。

真实案例:AI 驱动目录的即时嵌入生成

在快速变化的电子商务领域,产品目录不断演变。新产品发布,价格波动,库存情况实时变化。AI 驱动的用户体验必须跟上这些变化,以确保客户获得准确的推荐和搜索结果。

许多 MongoDB 客户将 MongoDB 用作电子商务平台的核心,支持动态产品目录:

• 频繁更新:新产品不断上线,现有产品被修改或下架。

• 即时用户交互:购物者寻找产品、期待个性化推荐,或与 AI 助手互动获取指导。

• AI 增强功能:生成式 AI 将产品信息转换为营销材料、聊天回复或优化搜索体验。

MongoDB 流处理在管理这些动态变化方面发挥着关键作用。当产品更新发生时,MongoDB 立即触发实时嵌入生成,确保 AI 模型即时访问最新的向量表示,以支持搜索、推荐和生成式响应。

例如,假设某零售商新增了一款智能手表,那么该产品的详细信息(品牌、功能、价格)必须立即转换为嵌入,以便通过语义搜索发现或与类似商品一起推荐。如果没有实时嵌入,AI 可能需要数小时甚至数天才能识别这款新产品。

为了确保精准度,实时嵌入生成至关重要——它能将更新的产品信息即时转换为向量嵌入,使 AI 模型随时可用。

流处理自动嵌入生成:工作原理

随着产品和数据的演变,嵌入必须实时刷新。MongoDB 流处理自动化这一流程,确保 AI 驱动的应用始终保持最新。其工作流程如下:

                                                端到端即时嵌入生成
  1. 实时事件 => Kafka 或 MongoDB Atlas 集群

当新增产品或现有产品发生更新时,这些变化会作为实时事件发布到 Kafka,或者直接插入 MongoDB Atlas 集群,等待处理。

  1. MongoDB Atlas 流处理

配置好 Atlas 流处理实例后(按照上述步骤),它会持续监控这些事件。无论数据是来自 Kafka 还是直接存入 Atlas 集群,任何新增或修改的产品文档都会触发流处理管道。

  1. 调用外部 REST API($https 操作符)

当管道检测到更新时,Atlas 可以调用外部 REST 端点(使用 $https 操作符),通常是一个专门用于生成嵌入的微服务。这一步通常会将产品描述或其他元数据传递给该服务。

  1. 嵌入模型

外部服务利用嵌入模型或其他算法来生成数值向量嵌入。这些向量捕捉了每个产品的语义特征,如功能、类别或文本描述,从而支持高级 AI 搜索和推荐。

  1. 实时目录更新

新计算的嵌入会被直接附加到存储在 MongoDB Atlas 中的产品文档上。这确保了产品新增或更新后几秒内,其嵌入即可用于下游处理。

  1. 前端集成

有了这些嵌入,前端应用(如电子商务网站)可以解锁高级功能:

o 个性化推荐——基于向量相似度识别相似或互补的产品。

o 检索增强生成(RAG)——为 AI 模型提供最新的产品详情,支持实时问答或内容生成。

o 向量/混合搜索——让用户通过语义搜索(例如“环保跑鞋”)或结合关键词 + 向量筛选来优化结果。

实现即时嵌入生成

本节介绍如何配置 MongoDB 流处理(Stream Processing),以自动化 AI 驱动应用的嵌入生成。我们将配置 MongoDB 以:

• 通过流处理捕获实时产品更新

• 触发嵌入生成 API(例如 OpenAI)

• 将嵌入存储到 MongoDB 以用于向量搜索和推荐

前置条件

在开始之前,请确保具备以下条件:

• 已启用 MongoDB Atlas 流处理

• Kafka 或 MongoDB 集群(用于实时事件捕获)

• 一个外部嵌入生成服务(例如 OpenAI、Hugging Face 或自定义模型)

• 已启用 MongoDB Atlas 向量搜索(请参考此教程了解更多关于向量搜索的详细信息)

步骤 1:创建 MongoDB 流处理实例

点击 创建实例 按钮,创建一个流处理器。


步骤 2:创建流处理连接

首先,我们需要设置 MongoDB 流处理 与 嵌入生成 API 之间的连接。

建立到 OpenAI 嵌入 API 的连接

运行以下 curl 命令,以创建 MongoDB Atlas 与 OpenAI 之间的安全 HTTPS 连接:

curl - user ":" - digest \

  • header "Content-Type: application/json" \

  • header "Accept: application/vnd.atlas.2023–02–01+json" \

  • include \

  • data '{ "name": "OpenAIEmbedConnection", "type": "Https", "url": "https://api.openai.com/v1/embeddings", "headers": { "Authorization": "Bearer YOUR_OPENAI_API_KEY" } }' \

  • request POST https://cloud.mongodb.com/api/atlas/v2/groups//streams//connections

建立与 MongoDB 集群的连接


步骤 2:连接到 MongoDB 流处理

使用 mongosh 连接到你的 MongoDB Atlas Stream Processing 实例:

mongosh "mongodb://atlas-stream-67acd00c1f602e74fd99b167-bofm7.westus.z.query.mongodb.net/" - tls - authenticationDatabase admin - username <db_username> - password <db_password>

步骤 3:定义流处理管道

现在,我们配置 MongoDB 流处理以:

• 监视产品集合的更新

• 触发嵌入 API

• 将生成的嵌入存储到 vectoredProduct

流处理配置

将以下 JSON 保存到 pipeline.json:

[

{

"$source": {

"connectionName": "atlas_connection",

"db": "retail",

"coll": "product",

"config": { "fullDocument": "whenAvailable" }

}

},

{

"$https": {

"connectionName": "OpenAIEmbedConnection",

"path": "",

"method": "POST",

"headers": { "Content-Type": "application/json" },

"payload": [

{

"$project": {

"model": { "$literal": "text-embedding-ada-002" },

"input": "$fullDocument.description"

}

}

],

"as": "embedding_response"

}

},

{

"$addFields": {

"embedding": {

"$getField": {

"field": "embedding",

"input": { "$arrayElemAt": [ "$embedding_response.data", 0 ] }

}

}

}

},

{

"$unset": "embedding_response"

},

{

"$merge": {

"into": {

"connectionName": "atlas_connection",

"db": "retail",

"coll": "vectoredProduct"

}

}

}

]

测试流处理配置

sp.process(pipeline)

步骤 5:测试集成

要测试端到端的工作流程,向产品集合插入一个新产品:

db.product.insertOne({

"name": "Eco-Friendly Running Shoes",

"description": "Lightweight running shoes made from sustainable materials.",

"category": "Footwear",

"price": 129.99

})

预期行为:

• MongoDB 变更流检测到新产品。

• 流处理管道触发嵌入请求。

• 嵌入存储到 vectoredProduct。

步骤 6:在 MongoDB Atlas 中查询向量搜索

现在,我们可以使用向量搜索来检索相似产品:

db.vectoredProduct.aggregate([

{

"$vectorSearch": {

"index": "product_embeddings",

"queryVector": [0.3412, -0.9281, 0.1123, …],

"path": "embedding",

"numCandidates": 5,

"limit": 3

}

}

])

这是什么功能:

• 使用向量相似性找到最相关的前三个产品。

• 针对电商、推荐系统和问答的实时 AI 进行了优化。

结论:释放流处理在 AI 等领域的全部潜力

AI 驱动的应用程序在实时数据处理方面表现出色。在超个性化推荐、智能搜索、欺诈检测和预测分析等领域取得成功,依赖于对数据的即时处理。

借助 MongoDB 流处理,企业不仅仅能实时生成嵌入,还能通过实时事件管道、AI 驱动的自动化和动态数据增强,实现更多可能性:

• 通过实时推荐提供超个性化的用户体验

• 通过持续更新的嵌入强化检索增强生成(RAG)

• 通过实时分析交易数据,提高欺诈检测和异常识别能力

• 通过响应实时数据更新,提高供应链和运营效率

• 通过持续更新的模型,促进预测分析和趋势预测

实时流处理不仅增强 AI 能力,还提升系统的速度、响应性和对业务需求的适应性。

你准备好释放实时 AI、分析和自动化的全部潜力了吗?立即开始使用 MongoDB 流处理,改变你处理、分析和利用数据的方式!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/883804.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

连续学习论文A Comprehensive Survey of Continual Learning:Theory, Method and Application阅读:

连续学习是指智能系统能够在其生命周期内逐步获取、更新、积累和利用知识的能力。主要挑战是灾难性遗忘,即学习新任务时旧任务性能急剧下降。CL 的目标包括平衡稳定性和可塑性、任务内与任务间的泛化能力以及资源效率。 2.2节对连续学习不同的种类进行了分类 包括不同任…

2025 氧化铝

跌势快要结束了

打靶记录29——dawn

靶机: https://www.vulnhub.com/entry/sunset-dawn,341/ 下载(镜像):https://download.vulnhub.com/sunset/dawn.zip 难度:中目标:获得 Root 权限攻击方法:主机发现 端口扫描 信息收集 SAMBA 漏洞 任意文件上传 日志信息泄露 调度任务 提权方法 1 提权方法 2 潜在提权方…

【APP逆向31】rpc小应用之DYM

1.目标:逆向登录接口中的sign参数2.入口3.hook校验3.1:通过hook与抓包对比,我们可以确定我们的猜测是正确的。Java.perform(function () {var Crypt = Java.use("com.yoloho.libcore.util.Crypt");Crypt.encrypt_data.implementation = function (j2,str,i3) {con…

第三章笔记

双精度浮点数类型用64位、单精度浮点数类型用32位来表示全体小数。 浮点数是指用符号、尾数、基数和指数这四部分来表示的小数。 符号部分是指使用一个数据位来表示数值的符号。该数据位是1时表示负,为0时则表示“正或者0”。 尾数部分用的是“将小数点前面的值固定为1的正则表…

分合之道:最小生成树的 Kruskal 与 Prim 算法

最小生成树问题 想象你是一位城市规划师,面前摊开一张地图,标记着散落的村庄。你的任务是用最经济的成本,在村庄间铺设道路,让所有村庄互通。这个问题看似简单,却隐藏着一个经典的数学命题:如何在一张“带权图”中,找到一棵总权重最小的树,连接所有节点? 数学定义 给定…

@Transactional中异步方法使用注意事项

如果使用了@Async异步方法上面添加了@Transactional,那这个事务是不会生效的 场景复现: @Transactional 基于MethodInterceptor实现的,所以在方法执行完毕之后才会提交事务上面代码前面操作位正常保存或者更新操作,代码最后调用了一个异步方法,这个异步方法为了避免主从延…

子查询和连表查询的比较

在需求中,要求对一个查询sql中根据主表的id展示该主表id关联的研发项目信息。一开始我使用了连表查询,但是存在一对多的关系,使用了group by进行分组。但是造成了数据分组后,原sql查询的数据量不对。故不能直接连表查询,而使用了子查询。 连表查询:在连表查询中,可能存…

【入门必看】人工智能就该这样学!一文盘点人工智能全栈工程师学习路径

随着人工智能技术的不断发展,人工智能应用场景越来越多,企业人才需求也越来越大。很多人都想进入AI这个高薪领域,包括理工科背景的学生、程序员、工程师、甚至是非科班跨领域的从业人员等等。但AI知识体系庞杂,网上资料零散,很多初学者不知道从哪儿下手,又担心自己学不会…

教会你如何使你的桌面炫酷起来

CTM工具是一款专为Windows用户设计的系统优化工具,它能够显著提升桌面美观度和系统性能。任务栏透明化:用户可以一键切换任务栏为全透明或毛玻璃效果,提升桌面美观度。资源监控:该功能与鲁大师的硬件监控功能相似,显示当前硬件使用状况,同时支持对字体大小、位置以及透明…

MySQL技术公开课:Mysql-Server-8.4.4 Innodb 集群搭建与维护

MySQL技术公开课 - Mysql-Server-8.4.4 Innodb 集群搭建与维护 讲课内容: 1、Innodb集群框架介绍 2、Innodb集群部署(mysql-Server、mysql-shell、mysql-router安装配置) 3、Innodb集群维护(主备切换、启动与关闭、故障排除) Mysql-server商业版目前最新的是8.4.4,增加了新功…