基于 Lambda 实现 Claude3 的流式响应

在如今的大语言模型推理输出场景中,流式响应基本已成为必备的功能之一。一方面符合大语言模型生成方式的本质,另一方面当模型推理效率不是很高时,流式响应比起全部 generate 后再输出、能大幅缩短从开始请求到输出第一个 Token 的时间,极大地提高用户体验。

从端到端的视角,可以将大语言模型的流式响应分为两个部分:模型本身的流式推理、后端程序将模型的流式响应输出到客户端,如下图所示。本文将以 Claude3 为例,阐述如何端到端的实现大语言模型的流式响应。


一、流式推理

当今的主流大语言模型,大都已支撑流式推理。我们以 Python 代码来实现 Claude3 的流式推理为例,给出如下示例。Claude3 采用了 Message API,支持多模态的数据输入。推理返回结果的内容结构,相比于此前 Claude2 的 Completions API 也发生了一些变化。

import json
import sys
import boto3client = boto3.client("bedrock-runtime", region_name="us-east-1")# 使用文本提示调用Claude 3
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
prompt = "你好"body = json.dumps({"anthropic_version": "bedrock-2023-05-31","max_tokens": 1024,"messages": [{"role": "user","content": [{"type": "text", "text": prompt}],}],
})response = client.invoke_model_with_response_stream(body=body,modelId=model_id,accept="application/json",contentType="application/json",
)stream = response["body"]
if stream:for event in stream:chunk = event.get("chunk")if chunk:chunk_obj = json.loads(chunk.get("bytes").decode())if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():if chunk_obj['type'] == 'content_block_delta':chunk_obj = chunk_obj['delta']sys.stdout.write(chunk_obj["text"])sys.stdout.flush()  # 正式使用时,这里可以用yield返回

输出效果如下所示:

二、流式响应的技术方案介绍

目前流式响应输出到客户端的技术,主要有长轮询、 WebSocket 与 SSE 。

长轮询

浏览器发出 XMLHttpRequest 请求,服务器端接收到请求后,会阻塞请求直到有数据或者超时才返回,浏览器 JS 在处理请求返回信息(超时或有效数据)后再次发出请求,重新建立连接。 这种方式用于流式响应,一方面增加了不必要的请求-响应的往返时间,还给客户端与服务端带来了额外的负载压力与资源浪费。

WebSocket

WebSocket 是 HTML5 定义的新协议,实现了服务器与客户端之间的全双工通信。WebSocket 连接一旦建立,客户端和服务器端处于平等地位,可以相互发送数据,不存在请求和响应的区别,其原理如下图所示。

WebSocket 通常应用于实时聊天、多人在线游戏等场景。在 AWS 上可以使用 API Gateway 与 Lambda 来实现 Websocket 的服务端。但在 Claude3 的流式响应场景中,采用 Websocket 方案显得有些大材小用,我们希望能有更轻量级的实现方式。

HTTP SSE

HTTP SSE 的全称是 HTTP Server-Sent Events,它提供了一种从服务器实时发送更新事件到客户端的技术。SSE 主要解决了客户端与服务器之间的单向实时通信需求(例如 Claude3 回答的流式响应),相较于 WebSocket(双向实时),它更加轻量级且易于实现。SSE 是基于 HTTP 协议实现的,所以更适用于服务器持续的向客户端发送文本。其原理示意如下图所示。

本文将主要使用 HTTP-SSE 技术来展开介绍,如何将 Claude3 的回答流式推向客户端。

三、使用 Python Flask 搭建 SSE Demo

一提到 SSE,我们首先考虑了使用 Python Flask 框架来实现一个原型(Demo)。Flask 是一个轻量级的 Web 应用框架,它提供了简单易用的工具和技术来构建 Web 服务器。特别是,我们利用了 Flask 的 stream_with_context 功能来实现服务器发送事件(SSE),这使得服务器能够以流的形式实时推送数据到客户端。

将 Claude3 的流式推理结果,通过 SSE 技术推送到客户端的代码如下:

#pip install Flask boto3 CORS
from flask import Flask, Response, stream_with_context, request
from flask_cors import CORS  # 导入CORS模块
import time
import json
import sys
import boto3app = Flask(__name__)
CORS(app)  # 为app添加跨域支持def generate_stream(prompt):client = boto3.client("bedrock-runtime", region_name="us-east-1")# 调用 Claude 3 并提供文本提示model_id = "anthropic.claude-3-sonnet-20240229-v1:0"body = json.dumps({"anthropic_version": "bedrock-2023-05-31","max_tokens": 1024,"messages": [{"role": "user","content": [{"type": "text", "text": prompt}],}],})response = client.invoke_model_with_response_stream(body=body,modelId=model_id,accept="application/json",contentType="application/json",)stream = response["body"]if stream:for event in stream:chunk = event.get("chunk")if chunk:chunk_obj = json.loads(chunk.get("bytes").decode())if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():if chunk_obj['type'] == 'content_block_delta':chunk_obj = chunk_obj['delta']yield f"data: {json.dumps(chunk_obj['text'], ensure_ascii=False)}\n\n"  # 修改为适合HTTP SSE的格式@app.route('/stream')
def stream():prompt = request.args.get('prompt', '你好')  # 从HTTP请求中提取prompt变量的值,默认值为"你好"response = Response(stream_with_context(generate_stream(prompt)), content_type='text/event-stream')response.headers['Access-Control-Allow-Origin'] = '*'  # 允许所有域名跨域访问return responseif __name__ == '__main__':app.run(debug=True, port=9000, host='0.0.0.0')

web 端代码如下:

<!DOCTYPE html>
<html lang="zh">
<head><meta charset="UTF-8"><title>Stream Demo</title>
</head>
<body><h2>输入提示词:</h2><input type="text" id="promptInput" placeholder="请输入提示词"><button onclick="startStream()">开始流式响应</button><div id="output" style="margin-top: 20px;"></div><script>function startStream() {const prompt = document.getElementById('promptInput').value;const outputDiv = document.getElementById('output');outputDiv.innerHTML = ''; // 清空之前的输出// 创建一个新的EventSource实例,连接到服务器端的/stream端点const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);eventSource.onmessage = function(event) {// 当接收到新的数据时,将其添加到页面上console.log('Received data:', event.data);// 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');outputDiv.innerHTML += decodedMessage; };eventSource.onerror = function() {// 如果发生错误,关闭连接eventSource.close();outputDiv.innerHTML += '<p>流已结束</p>';};}</script>
</body>
</html>

效果如下所示:

尽管使用 Flask API 来实现这个 Demo 相对简单直接,但在生产环境中,直接使用 Flask API 作为 API 服务器存在一些劣势,特别是当考虑到可扩展性、管理和成本效率时。例如,Flask 应用通常需要部署在一台或多台服务器上,这意味着你需要管理这些服务器的维护、监控和扩展。此外,对于流量波动较大的应用,服务器可能会在低峰时期闲置,造成资源浪费,或在高峰时期过载,影响服务质量。

为了克服这些劣势,我们转向了 AWS Lambda 来实现 streaming response。

四、基于 AWS  Lambda  来实现  SSE Demo

AWS Lambda 是一个无服务器计算服务,它允许你运行代码而无需预置或管理服务器。Lambda 只在代码执行时才收费,这使得它在成本效率上对于不定时或间歇性的工作负载非常有吸引力。通过 Lambda,我们可以实现一个更加弹性的架构,自动扩展以应对请求量的变化,同时减少了管理服务器的负担。

在客户端你可以基于 SSE 协议,调用 Lambda 函数 URL 来实时获取响应结果。

创建 Lambda 函数

AWS Lambda 默认支持使用 Node.js Runtime 支持流式响应。对于其他语言,你可以使用使用带有自定义 Runtime 方式来实现,或者使用 Lambda Web 适配器。

然后将如下代码粘贴到 Lambda 函数中:

import util from 'util';
import stream from 'stream';
import {BedrockRuntimeClient,InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";import querystring from 'querystring';const finished = util.promisify(stream.finished); // Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });
const modelId = "anthropic.claude-3-sonnet-20240229-v1:0";
let prompt = "您好";export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {console.log(event);responseStream.setContentType("text/event-stream");// 假设event.rawQueryString存在并包含查询字符串const rawQueryString = event.rawQueryString;// 使用querystring模块解析查询字符串const params = querystring.parse(rawQueryString);prompt=params['prompt'];// Prepare the payload for the model.const payload = {anthropic_version: "bedrock-2023-05-31",max_tokens: 1000,messages: [{role: "user",content: [{ type: "text", text: prompt }],},],};try {// 使用payload调用Claude并等待API响应const command = new InvokeModelWithResponseStreamCommand({contentType: "application/json",body: JSON.stringify(payload),modelId,});const apiResponse = await client.send(command);// 解码并处理响应流for await (const item of apiResponse.body) {const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));const chunk_type = chunk.type;if (chunk_type === "content_block_delta") {const text = chunk.delta.text;responseStream.write(`data: ${JSON.stringify(text)}\n\n`);}}} catch (error) {console.error("处理API响应时发生错误:", error);responseStream.write(`data: ${JSON.stringify({ error: "处理请求时发生错误" })}\n\n`);}responseStream.end();});
配置权限

在创建完 Lambda 后,会生成一个具备基础权限的默认角色。需要在该角色中增加 Bedrock 的调用权限。点击下图中的角色名称,打开 IAM 服务中的角色管理页面。

在该 IAM 角色中添加一个内联权限,权限内容如下所示:

配置 Lambda 响应超时时间

在流式响应场景中,Lambda 的整体响应时间范围可能从几秒中到几分钟,具体取决于输入/输出 Token 的大小。所以需要提前修改 Lambda 默认的超时时间。

设置 Lambda 函数 URL

在函数 URL 配置页面,建议 Auth Type 选择 NONE。因为如果选择 AWS_IAM 方式,则需要在客户端存储 AWS 身份认证信息(如 AK/SK),有较大的安全隐患。所以,在面向最终用户的场景中,建议 Auth Type 设置为 NONE,然后在 Lambda 的 Header 中自行实现一些认证方式(如 API_KEY)。

此前,Invoke mode 需要选择 RESPONSE_STREAM,并配置 CORS,允许跨域访问。

Web 端示例代码:
<!DOCTYPE html>
<html lang="zh">
<head><meta charset="UTF-8"><title>Stream Demo</title>
</head>
<body><h2>输入提示词:</h2><input type="text" id="promptInput" placeholder="请输入提示词"><button onclick="startStream()">开始流式响应(Lambda)</button><div id="output" style="margin-top: 20px;"></div><script>function startStream() {const prompt = document.getElementById('promptInput').value;const outputDiv = document.getElementById('output');outputDiv.innerHTML = ''; // 清空之前的输出// 创建一个新的EventSource实例,连接到服务器端的/stream端点const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);eventSource.onmessage = function(event) {// 当接收到新的数据时,将其添加到页面上console.log('Received data:', event.data);// 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');outputDiv.innerHTML += decodedMessage; };eventSource.onerror = function() {// 如果发生错误,关闭连接eventSource.close();outputDiv.innerHTML += '<p>流已结束</p>';};}</script>
</body>
</html>

效果如下:

Lambda 流式响应的限制
  • 单次调用的流式响应有默认为 20MB 的大小限制,但可以向后台申请提升限制。
  • 单次调用的流式响应的前 6MB 拥有不受限制的带宽。之后将以最大 2MBps 的速率进行流式响应。这在大语言推理场景,应该是完全足够了。
  • 通过 API Gateway 的 LAMBDA_PROXY 集成不支持流式响应。你可以在 API Gateway 和 Lambda 函数 URL 之间使用 HTTP_PROXY 集成,但你将受到 API Gateway 的 10MB 响应负载限制。此外,API Gateway 不支持分块传输编码,因此将无法实现流式响应的预期效果。

五、总结&综述

本文从端到端的视角,介绍了 Claude3 的流式推理,以及服务端流式响应的技术选型。通过比较分析,建议基于 Http-SSE 这种轻量级方式,来实现流式响应。最后,以 Claude3 为例,基于 Http-SSE 技术,分别介绍了使用 Python Flask、 AWS 云原生服务 Lambda 实现流式响应的实践。

本篇作者

张盼富

AWS 解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/615471.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【学习】软件信创测试中,如何做好兼容性适配

在软件信创测试的领域中&#xff0c;兼容性适配是至关重要的一环。如何确保软件在不同的操作系统、硬件设备和软件环境中稳定运行&#xff0c;是每个测试人员需要面对的挑战。本文将从几个方面探讨如何做好兼容性适配&#xff0c;以提高软件的稳定性和用户体验。 首先&#xf…

Excel中输入数字会改变怎么办?

一、数字显示不全&#xff0c;以“#”号代替 随着列宽的缩小&#xff0c;数字逐渐被“#”号代替&#xff08;首先数字的格式是“数值型&#xff0c;且只有整数”&#xff09; 原因分析&#xff1a;单元格中的数字无法完全显示&#xff0c;Excel会自动用“#”号填充剩余的空间 解…

EPSON L4160 Series打印机驱动安装

EPSON L4160 Series 官方网站下载 win64驱动 accept后自动下载。 安装 添加 网络打印机可以自动搜索并识别。 win11 设置里 -这里改名字 -比如我是192.168.50.115

计算机视觉——图像特征提取D2D先描述后检测特征提取算法原理

概述 局部特征提取是计算机视觉中的一个重要任务&#xff0c;它旨在从图像中提取出能够代表图像局部结构和外观信息的特征。这些特征通常用于图像匹配、物体识别、三维重建、跟踪和许多其他应用。传统方法&#xff0c;如尺度不变特征变换&#xff08;SIFT&#xff09;&#xf…

Docker容器嵌入式开发:Docker Ubuntu18.04配置mysql数据库

在 Ubuntu 18.04 操作系统中安装 MySQL 数据库的过程。下面是安装过程的详细描述&#xff1a; 首先&#xff0c;使用以下命令安装 MySQL 服务器&#xff1a; sudo apt install mysql-server系统会提示是否继续安装&#xff0c;按下 Y 键确认。 安装过程中&#xff0c;系统会…

【前缀合】Leetcode 连续数组

题目解析 525. 连续数组 寻找一个子数组&#xff0c;这个子数组中包含相同数目的0和1&#xff0c;但是这个子数组需要最长的 算法讲解 只需在[0,i]寻找一段区间使得这一段区间的和也等于sum即可 细节问题&#xff1a;1. 这里的哈希表的value存的是下标&#xff0c;因为需要找…

大米自动化生产线设备:现代粮食加工的核心力量

随着科技的不断进步和粮食加工行业的快速发展&#xff0c;大米自动化生产线设备在现代粮食加工中的地位愈发重要。这些设备不仅大大提高了生产效率&#xff0c;还保证了产品的质量和安全&#xff0c;成为了现代粮食加工行业不可或缺的核心力量。 一、自动化生产线设备助力效率提…

大数据实训进行时:数据标注项目

数据标注项目 培训目的 让同学们先熟悉理论知识&#xff0c;如&#xff1a;识别障碍物是否满足拉框的要求&#xff0c;如何进行拉框&#xff1b;熟悉标注操作&#xff0c;培养出能够进入正式项目的人员 培训地点 理论&#xff1a;学术报告厅、阶梯教室 实操&#xff1a;1实…

字符串函数的模拟实现(除strlen外,之前写过一篇专门的strlen)

文章目录 概要strcpy的模拟实现strcmp的模拟实现strcat的模拟实现小结 概要 字符串函数需要包含头文件<string.h> 为了更加了解字符串函数的原理&#xff0c;这里我们实现一些字符串函数&#xff0c;例如strcpy,strcmp,strcat; 要模拟实现字符串函数的功能&#xff0c;首…

QT drawPixmap和drawImage处理图片模糊问题

drawPixmap和drawImage显示图片时&#xff0c;如果图片存在缩放时&#xff0c;会出现模糊现象&#xff0c;例如将一个100x100 的图片显示到30x30的区域&#xff0c;这个时候就会出现模糊。如下&#xff1a; 实际图片&#xff1a; 这个问题就是大图显示成小图造成的像素失真。 当…

机器学习—数据集(二)

1可用数据集 公司内部 eg:百度 数据接口 花钱 数据集 学习阶段可用的数据集&#xff1a; sklearn:数据量小&#xff0c;方便学习kaggle&#xff1a;80万科学数据&#xff0c;真实数据&#xff0c;数据量大UCI&#xff1a;收录了360个数据集&#xff0c;覆盖科学、生活、经济等…

基于springboot+vue的汽车租赁管理系统

背景介绍: 网络发展的越来越迅速&#xff0c;它深刻的影响着每一个人生活的各个方面。每一种新型事务的兴起都是为了使人们的生活更加方便。汽车租赁管理系统是一种低成本、更加高效的电子商务方式&#xff0c;它已慢慢的成为一种全新的管理模式。人们不再满足于在互联网上浏览…