用 Higress AI 网关降低 AI 调用成本 - 阿里云天池云原生编程挑战赛参赛攻略

作者介绍:杨贝宁,爱丁堡大学博士在读,研究方向为向量数据库

《Higress  AI 网关挑战赛》正在火热进行中,Higress 社区邀请了目前位于排行榜 top5 的选手杨贝宁同学分享他的心得。下面是他整理的参赛攻略:

背景

我们要在 Higress 网关中编写 WebAssembly(wasm)插件,使得在 http 请求的各个阶段(requestHeader,requestBody,responseHeader,responseBody)能够将相应的请求或返回捕获进行业务逻辑的处理。具体到本比赛,主要需要实现的是缓存对大模型的请求(openai 接口的形式)在本地(或云数据库),并设计语义级别的缓存命中逻辑来实现降低响应请求且减少 token 费用的目的。

AI Cache 示例

以上图为例,本比赛主要的问题可以归纳为:(1)如何根据 Query 字符串生成合适的 Query 向量 ⇒ 向量生成器选型。(2)如何根据 Query 向量进行语义级别的查找,快速找到合适的缓存向量 ⇒ 缓存命中逻辑设计。(3)如何管理大量的缓存⇒向量数据库选型及重复初始化逻辑。

实际上 Redis 也具备 Vector Store 能力,这里的 Cache Store 和 Vector Store 是可以合并的。不过本 Demo 将二者分开了,Cache Store 使用 Redis,Vector Store 使用阿里云 DashVector 服务。

网关环境搭建

首先我们需要在线上搭建网关环境以供测试和评测使用,本文也提供了本地搭建网关环境的方法供读者参考。注意这里的线上搭建环境是以赛题介绍为基础展开的,若读者已经搭建好线上的开源网关或企业网关可跳过本节。

1.1 搭建线上企业版 Higress 环境**

赛题支持开源 Higress 和企业版 Higress 两种不同的配置,本文以企业版 Higress 为例进行展示。

1.1.1 申请免费试用并创建相应资源

1.1.2 创建服务 (通义千问的地址、Redis 服务、Dashscope 服务)

1.1.3 创建路由转发给通义千问

1.1.4 开启 Higress 插件市场中的 AI Proxy 插件

网关需要 AI Proxy 插件作为处理 AI 请求的支撑,我们可以采用插件市场中已有的 ai-proxy 插件。从源码编译的命令和上次如下所示。最后,配置环节需要提供大模型服务商的 api 和 token key 等,注意比赛需要使用通义千问的 qwen_long 模型。

1.1.5 编译上传 AI Cache 插件* (注意可能需要修改 “ai-cache” 名称防止和插件市场已有插件重复)

本比赛的核心在于自定义 AI Cache 插件以实现更鲁棒的缓存逻辑。首先还是以 Higress 源码提供的基础代码为例进行编译和上传,配置环节需要提供 redis 服务的名称。注意此步骤会在后续迭代代码中反复使用。

git clone https://github.com/alibaba/higress.git
cd higress/plugins/wasm-go
PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100  make build

1.2 本地测试环境搭建和代码更新逻辑

由于线上环境的测试的成本较高,我们也可以采用 Higress + LobeChat 快速搭建私人 GPT 助理 [ 1] 的方式起两个 Docker 容器进行本地测试,参考 dockerfile 如下:

version: '3.9'networks:higress-net:external: falseservices:higress:image: registry.cn-hangzhou.aliyuncs.com/ztygw/aio-redis:1.4.1-rc.1environment:- GATEWAY_COMPONENT_LOG_LEVEL=misc:error,wasm:debug # 重要,开启日志- CONFIG_TEMPLATE=ai-proxy- DEFAULT_AI_SERVICE=qwen- DASHSCOPE_API_KEY= [YOUR_KEY]networks:- higress-netports:- "9080:8080/tcp"- "9001:8001/tcp"volumes:- 本地data目录:/data- 本地log目录:/var/log/higress/ # 重要,方便在容器restrat之后查看日志restart: alwayslobechat:image: lobehub/lobe-chatenvironment:- CODE=123456ed- OPENAI_API_KEY=unused- OPENAI_PROXY_URL=http://higress:8080/v1networks:- higress-netports:- "3210:3210/tcp"restart: always

主要更改了 Higress 的 image,environment 以及 volumes 的配置,启动和重启就是 docker compose up -d docker compose restart。

进一步地,我们需要了解本地代码编写的逻辑如何能反馈到测试环境中。和线上网关环境直接上传编译后的二进制 wasm 插件不同的是,这里需要采用的是:本地编写代码 ⇒ 本地编译 wasm 插件 ⇒ Docker 打包镜像并上传 ⇒ 修改本地测试环境配置中的镜像版本 ⇒ 开始测试并打印日志的流程。具体参考代码如下:

cd ${workspaceFolder}/higress/plugins/wasm-go
PLUGIN_NAME=ai-cache EXTRA_TAGS=proxy_wasm_version_0_2_100 make build 
// 修改版本号(version.txt)
export cur_version=$(cat ${workspaceFolder}/version.txt) && docker build -t [YOUR IMAGE_BASE_URL]:$cur_version -f Dockerfile . && docker push [YOUR_IMAGE_BASE_URL]:$cur_version
// 修改本地测试环境配置中的镜像版本
sudo bash -c \"sed -i 's|oci://registry.cn-hangzhou.aliyuncs.com/XXX:[0-9]*\\\\.[0-9]*\\\\.[0-9]*|oci://registry.cn-hangzhou.aliyuncs.com/XXX:$(cat version.txt)|g' data/wasmplugins/ai-cache-1.0.0.yaml\

文本向量请求逻辑及缓存命中逻辑编写

在本节中,我们将通过一个简单的示例来说明如何在网关中编写请求外部服务的缓存逻辑。

当查询到达时,与 Redis 中存储的键进行匹配(`redisSearchHandler`)。如果完全一致,则直接返回结果(`handleCacheHit`)。
如果不匹配,则请求 `text_embedding` 接口将查询转换为 `query_embedding`(`fetchAndProcessEmbeddings`)。
使用 `query_embedding` 与向量数据库中的向量进行 ANN 搜索,返回最接近的键,并通过阈值进行过滤(`performQueryAndRespond`)。
如果返回结果为空或距离大于阈值,则丢弃结果,本轮缓存未命中,最后将 `query_embedding` 存入向量数据库(`uploadQueryEmbedding`)。
如果距离小于阈值,则再次调用 Redis 对最相似的键进行匹配(`redisSearchHandler`)。
在响应阶段,请求 Redis 新增键值对,键为查询的问题,值为LLM 返回结果。

可以看到,除了 Redis 服务外,我们还需要请求文本向量化服务和向量数据库服务,这里我们分别选取向量生成器:阿里灵积通用文本向量接口 [ 2] 和向量数据库:阿里向量检索服务 DashVector [ 3] 作为服务商。

注意:由于 wasm 插件不支持协程等特性,调用外部服务需遵循:如何在插件中请求外部服务 [ 4]

2.1 外部服务声明和注册

为了实现思路 1-5 的连续外部服务调用。在 Higress 相关的配置上,我们首先需要声明外部服务:

DashVectorClient      wrapper.HttpClient `yaml:"-" json:"-"`
DashScopeClient       wrapper.HttpClient `yaml:"-" json:"-"`
redisClient    wrapper.RedisClient `yaml:"-" json:"-"`

并且在 ParseConfig 函数中注册外部服务:

c.DashVectorInfo.DashVectorClient = wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName: c.DashVectorInfo.DashVectorServiceName,Port:        443,Domain:      c.DashVectorInfo.DashVectorAuthApiEnd,
})
c.DashVectorInfo.DashScopeClient = wrapper.NewClusterClient(wrapper.DnsCluster{ServiceName: c.DashVectorInfo.DashScopeServiceName,Port:        443,Domain:      "dashscope.aliyuncs.com",
})

这里的 ParseConfig 函数是在 http 请求的各个阶段回调函数(requestHeader,requestBody,responseHeader,responseBody)之前的注册函数。

2.2 AI Cache 配置文件

在增加了上述外部服务的基础上,对应的 AI Cache 的配置文件也需要进行修改,此处对应 1.1.5 节的配置。示例配置如下:

Dash:dashScopeKey: "YOUR_DASHSCOPE_KEY" # 这个是文本向量的keydashScopeServiceName: "qwen" # 重要,需要和scope对应的服务名匹配dashVectorCollection: "YOUR_CLUSTER_NAME"dashVectorEnd: "YOUR_VECTOR_END" dashVectorKey: "YOUR_DASHVECTOR_KEY" # 这个是DASHVECTOR的keydashVectorServiceName: "DashVector.dns" # 重要,需要新建一个vector对应的DNS服务 sessionID: "XXX" # 可用可不用,主要用于重复初始化逻辑
redis: # 重要serviceName: "redis.static"timeout: 2000

2.3 连续 callback 实现连续服务调用

基于上述思路,实现的核心代码如下。其中的核心难点仍在于如何实现服务间的连续调用问题,以 onHttpRequestBody 函数为例,代码需要实现并发逻辑,而不是简单的顺序逻辑。因此,主函数代码必须返回 types.Action,即声明是阻塞还是继续执行。当前逻辑要求在处理完缓存命中逻辑后才能继续操作,因此主函数需要返回 types.Pause。最后,根据我们的处理逻辑,在调用外部服务的回调函数中,根据是否命中缓存执行 proxywasm.ResumeHttpRequest() 或直接返回 proxywasm.SendHttpResponse()。

// ===================== 以下是主要逻辑 =====================
// 主handler函数,根据key从redis中获取value ,如果不命中,则首先调用文本向量化接口向量化query,然后调用向量搜索接口搜索最相似的出现过的key,最后再次调用redis获取结果
// 可以把所有handler单独提取为文件,这里为了方便读者复制就和主逻辑放在一个文件中了
// 
// 1. query 进来和 redis 中存的 key 匹配 (redisSearchHandler) ,若完全一致则直接返回 (handleCacheHit)
// 2. 否则请求 text_embdding 接口将 query 转换为 query_embedding (fetchAndProcessEmbeddings)
// 3. 用 query_embedding 和向量数据库中的向量做 ANN search,返回最接近的 key ,并用阈值过滤 (performQueryAndRespond)
// 4. 若返回结果为空或大于阈值,舍去,本轮 cache 未命中, 最后将 query_embedding 存入向量数据库 (uploadQueryEmbedding)
// 5. 若小于阈值,则再次调用 redis对 most similar key 做匹配。(redisSearchHandler)
// 7. 在 response 阶段请求 redis 新增key/LLM返回结果func redisSearchHandler(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool, ifUseEmbedding bool) error {err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) {if err := response.Error(); err == nil && !response.IsNull() {log.Warnf("cache hit, key:%s", key)handleCacheHit(key, response, stream, ctx, config, log)} else {log.Warnf("cache miss, key:%s", key)if ifUseEmbedding {handleCacheMiss(key, err, response, ctx, config, log, key, stream)} else {proxywasm.ResumeHttpRequest()return}}})return err
}// 简单处理缓存命中的情况, 从redis中获取到value后,直接返回
func handleCacheHit(key string, response resp.Value, stream bool, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) {log.Warnf("cache hit, key:%s", key)ctx.SetContext(CacheKeyContextKey, nil)if !stream {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1)} else {proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1)}
}// 处理缓存未命中的情况,调用fetchAndProcessEmbeddings函数向量化query
func handleCacheMiss(key string, err error, response resp.Value, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {if err != nil {log.Warnf("redis get key:%s failed, err:%v", key, err)}if response.IsNull() {log.Warnf("cache miss, key:%s", key)}fetchAndProcessEmbeddings(key, ctx, config, log, queryString, stream)
}// 调用文本向量化接口向量化query, 向量化成功后调用processFetchedEmbeddings函数处理向量化结果
func fetchAndProcessEmbeddings(key string, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, queryString string, stream bool) {Emb_url, Emb_requestBody, Emb_headers := ConstructTextEmbeddingParameters(&config, log, []string{queryString})config.DashVectorInfo.DashScopeClient.Post(Emb_url,Emb_headers,Emb_requestBody,func(statusCode int, responseHeaders http.Header, responseBody []byte) {// log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))log.Infof("Successfully fetched embeddings for key: %s", key)if statusCode != 200 {log.Errorf("Failed to fetch embeddings, statusCode: %d, responseBody: %s", statusCode, string(responseBody))ctx.SetContext(QueryEmbeddingKey, nil)proxywasm.ResumeHttpRequest()} else {processFetchedEmbeddings(key, responseBody, ctx, config, log, stream)}},10000)
}// 先将向量化的结果存入上下文ctx变量,其次发起向量搜索请求
func processFetchedEmbeddings(key string, responseBody []byte, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {text_embedding_raw, _ := ParseTextEmbedding(responseBody)text_embedding := text_embedding_raw.Output.Embeddings[0].Embedding// ctx.SetContext(CacheKeyContextKey, text_embedding)ctx.SetContext(QueryEmbeddingKey, text_embedding)ctx.SetContext(CacheKeyContextKey, key)performQueryAndRespond(key, text_embedding, ctx, config, log, stream)
}// 调用向量搜索接口搜索最相似的key,搜索成功后调用redisSearchHandler函数获取最相似的key的结果
func performQueryAndRespond(key string, text_embedding []float64, ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, stream bool) {vector_url, vector_request, vector_headers, err := ConstructEmbeddingQueryParameters(config, text_embedding)if err != nil {log.Errorf("Failed to perform query, err: %v", err)proxywasm.ResumeHttpRequest()return}config.DashVectorInfo.DashVectorClient.Post(vector_url,vector_headers,vector_request,func(statusCode int, responseHeaders http.Header, responseBody []byte) {log.Infof("statusCode:%d, responseBody:%s", statusCode, string(responseBody))query_resp, err_query := ParseQueryResponse(responseBody)if err_query != nil {log.Errorf("Failed to parse response: %v", err)proxywasm.ResumeHttpRequest()return}if len(query_resp.Output) < 1 {log.Warnf("query response is empty")uploadQueryEmbedding(ctx, config, log, key, text_embedding)return}most_similar_key := query_resp.Output[0].Fields["query"].(string)log.Infof("most similar key:%s", most_similar_key)most_similar_score := query_resp.Output[0].Scoreif most_similar_score < 0.1 {ctx.SetContext(CacheKeyContextKey, nil)redisSearchHandler(most_similar_key, ctx, config, log, stream, false)} else {log.Infof("the most similar key's score is too high, key:%s, score:%f", most_similar_key, most_similar_score)uploadQueryEmbedding(ctx, config, log, key, text_embedding)proxywasm.ResumeHttpRequest()return}},100000)
}// 未命中cache,则将新的query embedding和对应的key存入向量数据库
func uploadQueryEmbedding(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log, key string, text_embedding []float64) error {vector_url, vector_body, err := ConsturctEmbeddingInsertParameters(&config, log, text_embedding, key)if err != nil {log.Errorf("Failed to construct embedding insert parameters: %v", err)proxywasm.ResumeHttpRequest()return nil}err = config.DashVectorInfo.DashVectorClient.Post(vector_url,[][2]string{{"Content-Type", "application/json"},{"dashvector-auth-token", config.DashVectorInfo.DashVectorKey},},vector_body,func(statusCode int, responseHeaders http.Header, responseBody []byte) {if statusCode != 200 {log.Errorf("Failed to upload query embedding: %s", responseBody)} else {log.Infof("Successfully uploaded query embedding for key: %s", key)}proxywasm.ResumeHttpRequest()},10000,)if err != nil {log.Errorf("Failed to upload query embedding: %v", err)proxywasm.ResumeHttpRequest()return nil}return nil
}// ===================== 以上是主要逻辑 =====================

此外,该逻辑只能在返回值为 types.Action 的函数中使用,例如 onHttpResponseBody 这样的流式处理函数无法以类似方式处理。尽管可以确保请求被发送出去,但由于没有阻塞操作,无法调用回调函数。如果有需要,可以参考 wasm-go/pkg/wrapper/http_wrapper.go,添加信号变量进行修改。

总结

本文的完整代码已发布在 GitHub [ 5] 。本文提供的思路仅为抛砖引玉,如何在实际场景中解决复杂的缓存需求仍需各位的智慧。祝大家在比赛中取得理想的成绩!

相关链接:

[1] Higress + LobeChat 快速搭建私人 GPT 助理

https://github.com/alibaba/higress/issues/1023

[2] 向量生成器:阿里灵积通用文本向量接口

https://help.aliyun.com/zh/dashscope/developer-reference/text-embedding-quick-start

[3] 向量数据库:阿里向量检索服务 DashVector

https://www.aliyun.com/product/ai/dashvector

[4] 如何在插件中请求外部服务

https://higress.io/docs/latest/user/wasm-go/

[5] GitHub

https://github.com/Suchun-sv/ai-cache-Demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/788458.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科研项目管理工具选型全攻略

国内外主流的 10 款科研院所项目管理系统对比:PingCode、Worktile、云效、Tower 、Zoho Projects、Notion、Wrike、ClickUp、Asana、Teambition。在科研院所的日常运营中,项目管理系统的选择显得尤为重要。选择不当可能导致资源浪费、进度延误甚至项目失败,这是每个科研团队…

ensp使用交换机配置svi连通网段

ensp使用交换机配置svi连通网段 实验目的 如下图所示,PC1、PC2、PC3分别位于不同网段,使用S5700型号交换机连接,目前需要配置交换机和主机,主机能够互相连通。常用命令un in en:关闭信息通知 dis ip int b:显示端口ip配置情况(brief模式) dis ip routing-table:显示路…

B 端产品未来几年的发展趋势

未来几年,B 端产品领域将面临着诸多挑战和机遇。人工智能与机器学习的深度融合、云计算与容器化技术的持续发展、用户体验与设计的重要性日益凸显、数据安全与隐私保护的挑战与机遇、行业垂直化与专业化发展以及敏捷开发与持续交付的普及等趋势,将对 B 端产品经理提出更高的要…

postgresql下Schema和DataBase

database —> schema —> table 1.同一个实例下,不同database是不能相互访问的,即独立的。 2.同一个数据库,不同模式下的表是可以相互访问,即可共享的 3.不同模式下,表名可以是一样。也就是表在模式下是独立。 ##授权某个库下的某个模式下有创建表的权限grant creat…

使用 nuxi add 快速创建 Nuxt 应用组件

title: 使用 nuxi add 快速创建 Nuxt 应用组件 date: 2024/8/28 updated: 2024/8/28 author: cmdragon excerpt: 通过使用 nuxi add 命令,你可以快速创建 Nuxt 应用中的各种实体,如组件、页面、布局等。这可以极大地提高开发效率,减少手动创建文件的工作量。希望本文的示例…

Apache RocketMQ 批处理模型演进之路

RocketMQ 的目标,是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求,而批量处理则是流计算领域对于极致吞吐量要求的经典解法,这当然也意味着 RocketMQ 也有一套属于自己风格的批处理模型。作者:谷乂 RocketMQ 的目标,是致…

nginx: 两个解析日志的脚本

一,解析日志得到访问量最高的100个ip地址:awk {print $1} www.access_log | sort | uniq -c | sort -n -k 1 -r | head -n 100 效果如图:二,解析日志得到访问量最高的10个url 命令 [root@blog 27]# awk {print $7} 20240827_access.log|sort|uniq -c|sort -rn|head -10 返回例…

安全:关闭nginx/php的对外版本显示

一,关闭nginx的版本显示: 1,关闭前2,关闭nginx版本显示: 编辑nginx.conf [root@blog conf]# vi nginx.conf 增加一行: server_tokens off; 重新服务: [root@blog conf]# systemctl reload nginx.service 3,再次查看:二,关闭php的版本显示 1,关闭前2,关闭 编辑php.ini [roo…

gstreamer教程(5)——构建应用之element的使用

Element 元素:对于应用程序程序员来说, GstElement 对象是GStreamer 中最重要的对象。element (元素)是媒体Pipeline的基本构建块。您使用的所有不同的高级组件都派生自 GstElement。每个解码器、编码器、解复用器、视频或音频输出事实上都是一个 GstElement。 什么是元素:…

【VMware VCF】VCF 5.2:挂载远程 vSAN 数据存储。

VMware vSAN 解决方案中,为了充分利用 vSAN HCI 集群内的存储资源, vSAN HCI 和 vSAN HCI 集群之间可以相互共享存储资源,这种解决方案早期叫 vSAN HCI Mesh,现在被称为具有数据存储共享的 vSAN HCI(vSAN HCI with datastore sharing)。VMware vSAN 集群根据主机磁盘的组…

适用于多语言的VScode配置教程:同一文件夹内支持C++, JAVA, Python

前言 VScode作为一款强大的文本编辑器,只要配置恰当,便可以同时在一个环境内编译多种语言的文件。本文简要给出一种同时支持C++, Python, Java的配置方式(windows平台)。 配置格式 1.创建工作区并建立如图的文件夹及文件结构其中包括vscode的配置文件夹.vscode, 以及其他三…