SSE

news/2025/4/3 0:38:56/文章来源:https://www.cnblogs.com/lichangben/p/18804351

SSE

项目中遇到竞价功能,用户停留在竞价页面,服务端需要实时把竞价信息推送给每一个客户

什么是SSE

SSE是一种服务器推送技术,允许服务器实时向客户端发送数据流。它是一种轻量级的单向通信机制,特别适合于实时性要求高的场景,如实时更新日志、实时新闻推送等。在Java中,SSE的实现依赖于Servlet 3.0及以上版本,通过使用Java的特定库和框架,可以轻松地向客户端推送实时数据。

业务上实现单向通信有以下两种:

特性 Spring WebFlux + SSE Spring MVC + SseEmitter
并发能力 高(单线程支持数千连接) 低(受限于线程池大小)
线程占用 无独占线程(EventLoop共享) 每个推送占用线程池线程
代码复杂度 简单(内置广播机制) 复杂(需手动管理连接和线程)
适用场景 新项目/高并发需求 旧项目/低并发需求

Spring WebFlux + SSE 简单案例

1.接收器管理类

实际业务中,竞价不可能只是单一的一个物品竞价,可能同时存在多个物品竞价

ConcurrentMap<String, Sinks.Many> dynamicSinks

一个物品竞价map就添加一条

业务结束(某一个竞价结束)调用closeSink

package com.dem.framework.config.sse;import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** @program: dem-template-pc* @description: 接收器管理类* @author: lcb* @create: 2025-04-01**/
@Component
public class DynamicSinkManager {private final ConcurrentMap<String, Sinks.Many<String>> dynamicSinks = new ConcurrentHashMap<>();/*** 根据sinkKey获取Sink,如果不存在则创建一个* @param sinkKey Sink的唯一key* @author lcb* @date 2025/4/1* @return**/public Sinks.Many<String> getOrCreateSink(String sinkKey) {return dynamicSinks.computeIfAbsent(sinkKey,k -> Sinks.many().multicast().directBestEffort());}/*** 关闭指定Sink* @param sinkKey** @author lcb* @date 2025/4/1* @return***/public void closeSink(String sinkKey) {Sinks.Many<String> sink = dynamicSinks.remove(sinkKey);if (sink != null) {sink.tryEmitComplete();}}public Flux<String> getFlux(String sinkKey) {return getOrCreateSink(sinkKey).asFlux();}}

2.消息生产者

package com.dem.framework.config.sse;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.util.List;/*** @program: dem-template-pc* @description: sse 消息生产者* @author: lcb* @create: 2025-04-01**/
@Component
@Slf4j
public class MessageProducer {@Autowiredprivate DynamicSinkManager dynamicSinkManager;/*** 订阅 广播模式* @param sinkKey 订阅一个** @author lcb* @date 2025/4/1 15:26* @return**/public Flux<String> asFluxShare(String sinkKey) {return dynamicSinkManager.getOrCreateSink(sinkKey).asFlux().share();}//public void sendMessage(String sinkKey,String message) {Sinks.EmitResult result = dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitNext(message);// 处理发送结果if (result.isFailure()) {// 根据业务需求处理失败情况handleEmitFailure(result, message);}}// 批量发送public void sendBatch(String sinkKey,List<String> messages) {messages.forEach(msg->sendMessage(sinkKey, msg));}// 关闭消息流public void shutdown(String sinkKey) {dynamicSinkManager.getOrCreateSink(sinkKey).tryEmitComplete();}private void handleEmitFailure(Sinks.EmitResult result, String message) {// 实现你的失败处理逻辑log.error("消息发送失败,原因: {}", result);}
}

3.后端测试入口

package com.dem.web.controller.business;import com.dem.common.annotation.Anonymous;
import com.dem.framework.config.sse.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;/*** @program: dem-template-pc* @description: 测试* @author: lcb* @create: 2025-03-28 10:21**/@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController {@Autowiredprivate MessageProducer messageProducer;/*** 与客户端建立链接** @param sinkKey* @return* @author lcb* @date 2025/4/1**/@GetMapping(value = "/linkEvents/{sinkKey}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> linkEvents(@PathVariable String sinkKey) {return messageProducer.asFluxShare(sinkKey);}/*** 模拟添加数据** @param sinkKey* @param str* @return* @author lcb* @date 2025/4/1 15:50**/@GetMapping(value = "/addData/{sinkKey}/{str}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public void addData(@PathVariable("sinkKey") String sinkKey, @PathVariable("str") String str) {/** 处理业务。。。* */log.info("数据:{}", str);messageProducer.sendMessage(sinkKey, str);}/*** 模拟竞价结束** @param sinkKey* @return* @author lcb* @date 2025/4/1 15:50**/@GetMapping(value = "/shutdown/{sinkKey}")public void guanb(@PathVariable String sinkKey) {/** 竞价结束* */messageProducer.shutdown(sinkKey);}
}

4.测试

  • 前端代码

    const eventSource = new EventSource('http://localhost:8080/dem-vue-template-api/sse/linkEvents/dem123');
    eventSource.onmessage = function(event) {console.log('Received:', event.data);// 在这里处理接收到的数据,比如更新UI
    };
    
  • 也可使用浏览器直接请求

其他参数

  1. Sinks.many():

    • 创建一个可以发射多个元素的 Sink (接收器)
    • 与 Sinks.one()(单元素)和 Sinks.empty()(无元素)相对
  2. .multicast():

    • 指定这是一个多播 Sink
    • 意味着多个订阅者可以订阅同一个源
    • 与 .unicast()(单播)相对
  3. .directBestEffort():

    因为这个业务是 竞价,后来订阅的用户只需要获取最新的数据即可

    如果后来订阅的用户需要获取之前的历史数据可以参考

    .onBackpressureBuffer()

    .replay()

    • direct: 表示直接传递元素,不进行任何缓冲
    • bestEffort: 表示"尽力而为"的传递策略,如果下游跟不上,可能会丢弃元素

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/909557.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

天下拍-艺术品拍卖经典案例分享

在当今快速发展的数字化时代,艺术品拍卖行业正经历着前所未有的变革。天下拍作为一款功能齐全的拍卖产品,凭借先进的技术平台和专业的服务团队,为艺术品拍卖提供了全新的解决方案。同步拍卖的模式和互联网运营工具的加持可以帮助您实现艺术品的高效变现和价值最大化。线上线…

[T.4] 团队项目:团队代码管理准备

项目 内容这个作业属于哪个课程 2025年春季软件工程(罗杰、任健)这个作业的要求在哪里 [T.4] 团队项目:团队代码管理准备我在这个课程的目标是 掌握代码管理与修复流程,完善团队协作机制这个作业在哪个具体方面帮助我实现目标 团队协作,软件开发代码管理基础团队代码仓库地…

asio使用async_connect,连接127.0.0.1,函数回调函数errorcode连接成功

编译运行rest_rpc库的客户端出现问题 asio使用async_connect,连接127.0.0.1,函数回调函数errorcode连接成功(没有启动服务器的情况下),需要等到调用发送函数,才会失败。调试发现是使用9000端口被本机另外的程序占用了-_- 环境:win10 netstat -ano | findstr "9000&…

算法备案没产品可以申请吗?

算法备案复审阶段涉及产品信息填报,所以一度让一些开发者有这样的错误认知:只有等产品要上线了,才能火急火燎地去申请算法备案。但这个观点其实是错误的,其实开发者也可以在没有具体产品的情况下发起算法备案申请。只要材料合法合规,也能取得备案号。下面是一些具体信息介…

工业通信协议“牵手密码”,Ethernet IP转Profinet网关的桥梁魔法

在当前工业自动化领域,实时以太网技术已经成为至关重要的通信标准之一。Profinet和EtherNetIP作为两种广泛采用的实时以太网协议,各自拥有其独特的性能优势和适用场景。本文旨在探讨稳联技术Profinet转EtherNetIP网关WL-PN-EIPM的功能,并评估其在节能实施与监测方面的应用价…

LeetCode刷题-动态规划-爬楼梯

LeetCode刷题-动态规划-爬楼梯 题目: 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢? 示例 1: 输入:n = 2 输出:2 解释:有两种方法可以爬到楼顶。1 阶 + 1 阶 2 阶 示例 2:输入:n = 3 输出:3 解释:有…

【攻防世界】Hidden-Message

⭕、知识点 流量分析/端口号隐写/tshark/json文件处理 一、题目二、解法 1、端口号个位呈现有规律的01交替,可能隐藏信息。 2、为便于提取信息,使用kali的tshark对其进行转存 tshark -r input.pcap -T json > output.txt注意在使用tshark时应避免使用root账户 否则会出现如…

022 props组件交互

.vue 的文件,就是一个组件,每个.vue 文件就是每个页面html 的时候,每个页面都是一个 htmlvue2 和 vue3 的生命周期钩子是不同的components:常用的组件,公共的组件views:用来存放页面的新建项目,删除HelloWorld.vue components也删除views删除 这个index.js删除 这两页面…

客户端打开BI报表提示 Your current browser is not supported”

win7的打开会报这个问题, win11可以正常打开, 应该是环境差异导致。

Linux-常用命令(3)

Linux-常用命令(3)Linux常用命令 查看文件 cat命令 cat命令可以创建一个或者多个文件、查看文件内容、连接文件,常用于查看文件内容 cat 文件名 //显示文件内容 cat -n 文件名 //显示文件内容,并显示行号 cat - 文件名 //显示文件内容(包括不可见字符)系统时间 date命令…

【EI】机器人与传感器网络国际会议(RoSeN 2025)

第一届机器人与传感器网络国际会议(RoSeN 2025)将于2025年5月16-18日在贵阳举行,会议将围绕机器人展开的在机器人、人机交互、传感、智能控制等相关研究领域,邀请国内外数位在此领域学术卓越的学者专家做相关致辞与报告,共同探讨机器人发展最新发展方向及行业前沿动态。会…