SSE
项目中遇到竞价功能,用户停留在竞价页面,服务端需要实时把竞价信息推送给每一个客户
什么是SSE
SSE是一种服务器推送技术,允许服务器实时向客户端发送数据流。它是一种轻量级的单向通信机制,特别适合于实时性要求高的场景,如实时更新日志、实时新闻推送等。在Java中,SSE的实现依赖于Servlet 3.0及以上版本,通过使用Java的特定库和框架,可以轻松地向客户端推送实时数据。
业务上实现单向通信有以下两种:
特性 | Spring WebFlux + SSE | Spring MVC + SseEmitter |
---|---|---|
并发能力 | 高(单线程支持数千连接) | 低(受限于线程池大小) |
线程占用 | 无独占线程(EventLoop共享) | 每个推送占用线程池线程 |
代码复杂度 | 简单(内置广播机制) | 复杂(需手动管理连接和线程) |
适用场景 | 新项目/高并发需求 | 旧项目/低并发需求 |
Spring WebFlux + SSE 简单案例
1.接收器管理类
实际业务中,竞价不可能只是单一的一个物品竞价,可能同时存在多个物品竞价
ConcurrentMap<String, Sinks.Many
> dynamicSinks 一个物品竞价map就添加一条
业务结束(某一个竞价结束)调用closeSink
package com.dem.framework.config.sse;import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** @program: dem-template-pc* @description: 接收器管理类* @author: lcb* @create: 2025-04-01**/
@Component
public class DynamicSinkManager {private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();/*** 根据sinkKey获取Sink,如果不存在则创建一个* @param sinkKey Sink的唯一key* @author lcb* @date 2025/4/1* @return**/public Sinks.Many<String> getOrCreateSink(String sinkKey) {return dynamicSinks.computeIfAbsent(sinkKey,k -> Sinks.many().multicast().directBestEffort());}/*** 关闭指定Sink* @param sinkKey** @author lcb* @date 2025/4/1* @return***/public void closeSink(String sinkKey) {Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);if (sink != null) {sink.tryEmitComplete();}}public Flux<String> getFlux(String sinkKey) {return getOrCreateSink(sinkKey).asFlux();}}
2.消息生产者
package com.dem.framework.config.sse;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.util.List;/*** @program: dem-template-pc* @description: sse 消息生产者* @author: lcb* @create: 2025-04-01**/
@Component
@Slf4j
public class MessageProducer {@Autowiredprivate DynamicSinkManager dynamicSinkManager;/*** 订阅 广播模式* @param sinkKey 订阅一个** @author lcb* @date 2025/4/1 15:26* @return**/public Flux<String> asFluxShare(String sinkKey) {return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();}//public void sendMessage(String sinkKey,String message) {Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);// 处理发送结果if (result.isFailure()) {// 根据业务需求处理失败情况handleEmitFailure(result, message);}}// 批量发送public void sendBatch(String sinkKey,List<String> messages) {messages.forEach(msg->sendMessage(sinkKey, msg));}// 关闭消息流public void shutdown(String sinkKey) {dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();}private void handleEmitFailure(Sinks.EmitResult result, String message) {// 实现你的失败处理逻辑log.error("消息发送失败,原因: {}", result);}
}
3.后端测试入口
package com.dem.web.controller.business;import com.dem.common.annotation.Anonymous;
import com.dem.framework.config.sse.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;/*** @program: dem-template-pc* @description: 测试* @author: lcb* @create: 2025-03-28 10:21**/@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController {@Autowiredprivate MessageProducer messageProducer;/*** 与客户端建立链接** @param sinkKey* @return* @author lcb* @date 2025/4/1**/@GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> linkEvents(@PathVariable String sinkKey) {return messageProducer.asFluxShare(sinkKey);}/*** 模拟添加数据** @param sinkKey* @param str* @return* @author lcb* @date 2025/4/1 15:50**/@GetMapping(value = "/addData/{sinkKey}/{str}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {/** 处理业务。。。* */log.info("数据:{}", str);messageProducer.sendMessage(sinkKey, str);}/*** 模拟竞价结束** @param sinkKey* @return* @author lcb* @date 2025/4/1 15:50**/@GetMapping(value = "/shutdown/{sinkKey}")public void guanb(@PathVariable String sinkKey) {/** 竞价结束* */messageProducer.shutdown(sinkKey);}
}
4.测试
-
前端代码
const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123'); eventSource.onmessage = function(event) {console.log('Received:', event.data);// 在这里处理接收到的数据,比如更新UI };
-
也可使用浏览器直接请求
其他参数
-
Sinks.many():
- 创建一个可以发射多个元素的 Sink (接收器)
- 与 Sinks.one()(单元素)和 Sinks.empty()(无元素)相对
-
.multicast():
- 指定这是一个多播 Sink
- 意味着多个订阅者可以订阅同一个源
- 与 .unicast()(单播)相对
-
.directBestEffort():
因为这个业务是 竞价,后来订阅的用户只需要获取最新的数据即可
如果后来订阅的用户需要获取之前的历史数据可以参考
.onBackpressureBuffer()
.replay()
direct
: 表示直接传递元素,不进行任何缓冲bestEffort
: 表示"尽力而为"的传递策略,如果下游跟不上,可能会丢弃元素