SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践

news/2025/2/25 0:45:07/文章来源:https://www.cnblogs.com/Fzeng/p/18735256

近期DeepSeek等国产大模型热度持续攀升,其关注度甚至超过了OpenAI(被戏称为CloseAI)。在SpringBoot3.x环境中,可以使用官方的Spring AI轻松接入,但对于仍在使用JDK8SpringBoot2.7.3的企业级应用来说,往往需要自定义实现。特别是当大模型团队返回的数据格式不符合标准SSE规范时,更需要灵活处理。本文将分享我们的实战解决方案。


📦 引入Gradle依赖

核心依赖说明:

  • spring-boot-starter-web:基础Web支持
  • spring-boot-starter-webflux:响应式编程支持(WebClient所在模块)
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

🌐 WebClient配置要点

初始化时特别注意Header配置:

@Bean
public WebClient init() {return WebClient.builder().baseUrl(baseUrl).defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)// ⚠️ 必须设置为JSON格式.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();
}

🚨 关键踩坑点:初始设置MediaType.TEXT_EVENT_STREAM_VALUE会导致请求失败,必须使用APPLICATION_JSON_VALUE


🧠 核心处理逻辑

流式请求入口

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {// 请求体构建String requestBody = String.format("""{"model": "%s","messages": [{"role": "user", "content": "%s"}],"stream": true}""", model, prompt);return webClient.post()// 请求配置.uri("/v1/chat/completions").bodyValue(requestBody).accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(DataBuffer.class)  // 🔑 关键配置点.transform(this::processStream)// 重试和超时配置.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))).timeout(Duration.ofSeconds(180));// 错误处理.doOnError(e -> log.error("Stream error", e)).doFinally(signal -> log.info("Stream completed: {}", signal));
}

技术原理说明

当使用bodyToFlux(DataBuffer.class)时:

  • ✅ 获得原始字节流控制权
  • ❌ 避免自动SSE格式解析(适用于非标准响应)
  • 📡 动态数据流处理:类似Java Stream,但数据持续追加

🔧 非标准SSE数据处理

核心处理流程

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {return dataBufferFlux.transform(DataBufferUtils::join)          // 字节流合并.map(buffer -> {                          // 字节转字符串String content = buffer.toString(StandardCharsets.UTF_8);DataBufferUtils.release(buffer);return content;}).flatMap(content ->                       // 处理粘包问题Flux.fromArray(content.split("\\r?\\n\\r?\\n"))).filter(event -> !event.trim().isEmpty()) // 过滤空事件.map(event -> {                           // 格式标准化处理String trimmed = event.trim();if (trimmed.startsWith("data:")) {String substring = trimmed.substring(5);return substring.startsWith(" ") ? substring.substring(1) : substring;}return trimmed;}).filter(event -> !event.startsWith("data:")); // 二次过滤
}

三大关键技术点

  1. 粘包处理
    通过split("\\r?\\n\\r?\\n")解决网络传输中的消息边界问题,示例原始数据:

    data:{response1}\n\ndata:{response2}\n\n
    
  2. 格式兼容处理
    自动去除服务端可能返回的data:前缀,同时保留Spring自动添加SSE前缀的能力

  3. 双重过滤机制
    确保最终输出不包含任何残留的SSE格式标识


⚠️ 特别注意

当接口设置produces = MediaType.TEXT_EVENT_STREAM_VALUE时:

  • Spring WebFlux会自动添加data: 前缀

  • 前端收到的格式示例:

    data: {实际内容}
    
  • 若手动添加

    data: 
    

    前缀会导致重复:

    data: data: {错误内容}  // ❌ 错误格式
    

🛠️ 完整实现代码

// 包声明和导入...@Service
@Slf4j
public class OpenAiService {// 配置项和初始化private String openAiApiKey = "sk-xxxxxx";private String baseUrl = "https://openai.com/xxxx";private String model = "gpt-4o";private WebClient webClient;@PostConstructpublic void init() {webClient = WebClient.builder().baseUrl(baseUrl).defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();}@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {// 构建请求体String requestBody = String.format("""{"model": "gpt-4o-mini","messages": [{"role": "user", "content": "%s"}],"stream": true}""", prompt);// 发送流式请求return webClient.post().uri("/v1/chat/completions").bodyValue(requestBody).retrieve().onStatus(HttpStatusCode::isError, response ->response.bodyToMono(String.class).flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))).bodyToFlux(DataBuffer.class).transform(this::processStream).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))).timeout(Duration.ofSeconds(180)).doOnError(e -> log.error("Stream error", e)).doFinally(signal -> log.info("Stream completed: {}", signal));}private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {return dataBufferFlux// 使用字节流处理.transform(DataBufferUtils::join).map(buffer -> {String content = buffer.toString(StandardCharsets.UTF_8);DataBufferUtils.release(buffer);return content;})// 按 SSE 事件边界,防止粘包的问题.flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n")))// 过滤空事件.filter(event -> !event.trim().isEmpty())// 规范 SSE 事件格式.map(event -> {String trimmed = event.trim();// 由于webflux设置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",// 所以在返回数据时会自动添加“data:”,因此如果返回的格式带了“data:”需要手动去除if (trimmed.startsWith("data:")) {trimmed = trimmed.replaceFirst("data:","").trim();}return trimmed;}).filter(event -> !event.startsWith("data:"));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/889276.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

持续分享干货!清华出品《DeepSeek如何赋能职场》PDF可下载

🔥今天分享的是清华出品《DeepSeek如何赋能职场》的PPT,内容涵盖了DeepSeek运行模式的讲解,如何向DeepSeek提问,如何构建提示词,结合实际多个实际应用场景,详细的介绍了DeepSeek结合其他多模态AI模型持续赋能职场的方法。提示词框架如何使用DeepSeek制作可视化图表如何使…

「跟着渡一学前端」并发请求实现

学习资源 并发请求 【渡一教育】 完整代码 function concurRequest(urls, maxNum) {if (urls.length === 0) {return Promise.resolve([]);}return new Promise((resolve) => {let nextIndex = 0;let finishCount = 0;const result = [];async function _request() {if (nex…

软工作业1

作业相关信息这个作业属于哪个课程 软件工程 这个作业要求在哪里 自我介绍+软工5问 这个作业的目标 自我介绍,了解软件工程基本概念个人介绍 Im YiLaiL YiLaiL/YiLaiL is a ✨ special ✨ repository because its README.md (this file) appears on your GitHub profile.🔭 …

EmEdit设置缓存目录临时文件夹

前言全局说明一、说明 1.1 环境: Windows 11 家庭版 23H2 22631.3737 EmEditor Professional (64-bit) Version 17.2.4二、打开大文本控制器三、点击右边大文本控制器上 自定义四、选择比较大的磁盘空间作为缓存空间免责声明:本号所涉及内容仅供安全研究与教学使用,如出现其他…

谈谈 ES 6.8 到 7.10 的功能变迁(3)- 查询方法篇

上一篇咱们了解了 ES 7.10 相较于 ES 6.8 新增的字段类型,这一篇我们继续了解新增的查询方法。 Interval 间隔查询: 功能介绍 Interval 查询,词项间距查询,可以根据匹配词项的顺序、间距和接近度对文档进行排名。主要解决的查询场景“创建一个多搜索词匹配的查询,同时保留…

【蓝牙小程序】在微信小程序中使用 ECharts

echarts-for-weixin 项目提供了一个小程序组件,用这种方式可以方便地使用 ECharts。 使用方式下载该项目 如有必要,将 ec-canvas 目录下的 echarts.js 替换为最新版的 ECharts。如果希望减小包体积大小,可以使用自定义构建生成并替换 echarts.js pages 目录下是使用的示例文…

Virtual Box设置双网卡

一、硬件 1.为虚拟机添加网卡2.配置网卡二、软件 3.获取mac地址 命令:ip link [root@vbox network-scripts]# ip link 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000link/loopback 00:00:00:00:00:00 brd…

简单逆向Java程序

前置 来源 这个程序是我同学编写的一个学生分数管理系统,我将对这个已经编译的程序进行测试、逆向,找出其中的问题,并进行改进。 运行环境macOS 15.4 IntelliJ IDEA 2024.2.3 OpenJDK 23.0.2 TomCat 11.0.4 Safari 15.4运行结果主要问题 在使用了这个程序之后,我发现了以下…

信息论概述

1 信息与消息 1.1 信息 1.1.1 信息的定义 信息是信息论中最基本、最重要的概念 香农信息的定义:信息是事物运动状态或存在方式的不确定描述 1.1.2 (香农)信息的度量样本空间:对于我们需要描述的事物中,事物可能存在不同的状态,即事物展现出来的多种状态。那么为了便于形容事…

JUC并发—12.ThreadLocal源码分析

大纲 1.ThreadLocal的特点介绍 2.ThreadLocal的使用案例 3.ThreadLocal的内部结构 4.ThreadLocal的核心方法源码 5.ThreadLocalMap的核心方法源码 6.ThreadLocalMap的原理总结1.ThreadLocal的特点介绍 (1)ThreadLocal的注释说明 (2)ThreadLocal的常用方法 (3)ThreadLocal的使用…

前端Vue创建

一、创建Vue项目二、导入idea 复制景区 三、设置main.js点击查看代码 import Vue from vue import App from ./App.vue import ElementUI from element-ui; import element-ui/lib/theme-chalk/index.css; import ./assets/golbal.css; import axios from axios; // 正确的模块…

朋友说喊搞个简单的微信对接的封装搞外包,不要那么多的方法拿来就用的的那种,来看看Simple.Wechat吧

朋友说喊搞个简单的微信对接的封装搞外包,不要那么多的方法拿来就用的的那种,来看看Simple.Wechat吧😂不知道大家有没有和我朋友一样,很多时候做外包总免不了去对接微信,最简单的微信用户信息获取、微信支付、微信模板消息发送,要是不熟悉总是要去找这个那个的包,但是人…