Demo代码分享
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><mybatis-plus.version>3.5.2</mybatis-plus.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
后端接收sse连接
@Controller
@RequestMapping("/sse")
public class IndexController {/*** 创建SSE连接** @return*/@GetMapping(path = "/connect")public SseEmitter sse() {SseEmitter sseEmitter = new SseEmitter();// 发送一个注释,响应前端请求sseEmitter.send(SseEmitter.event().comment("welcome"));return sseEmitter;}
}
前端浏览器代码
// 连接服务器
var sseSource = new EventSource("http://localhost:8080/sse/connect");// 连接打开
sseSource.onopen = function () {console.log("连接打开");
}// 连接错误
sseSource.onerror = function (err) {console.log("连接错误:", err);
}// 接收到数据
sseSource.onmessage = function (event) {console.log("接收到数据:", event);handleReceiveData(JSON.parse(event.data))
}
定义一个返回数据 Message.java
package com.example.demo.entity;import lombok.Data;@Data
public class Message {private String data;private Integer total;
}
定义sse接口 SseService.java
package com.example.demo.service;import com.example.demo.entity.Message;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseService {SseEmitter connect(String uuid);void sendMessage(Message message);
}
实现sse接口 SseServiceImpl.java
package com.example.demo.service.impl;import com.example.demo.entity.Message;
import com.example.demo.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Service
public class SseServiceImpl implements SseService {/*** messageId的 SseEmitter对象映射集*/private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** 连接sse* @param uuid* @return*/@Overridepublic SseEmitter connect(String uuid) {SseEmitter sseEmitter = new SseEmitter();// 连接成功需要返回数据,否则会出现待处理状态try {sseEmitter.send(SseEmitter.event().comment("welcome"));} catch (IOException e) {e.printStackTrace();}// 连接断开sseEmitter.onCompletion(() -> {sseEmitterMap.remove(uuid);});// 连接超时sseEmitter.onTimeout(() -> {sseEmitterMap.remove(uuid);});// 连接报错sseEmitter.onError((throwable) -> {sseEmitterMap.remove(uuid);});sseEmitterMap.put(uuid, sseEmitter);return sseEmitter;}/*** 发送消息* @param message*/@Overridepublic void sendMessage(Message message) {message.setTotal(sseEmitterMap.size());sseEmitterMap.forEach((uuid, sseEmitter) -> {try {sseEmitter.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {e.printStackTrace();}});}
}
定时任务 SendMessageTask.java
package com.example.demo.task;import com.example.demo.entity.Message;
import com.example.demo.service.SseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;@Configuration
public class SendMessageTask {@Autowiredprivate SseService sseService;/*** 定时执行 秒 分 时 日 月 周*/@Scheduled(cron = "*/5 * * * * *") // 间隔5秒public void sendMessageTask() {Message message = new Message();DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");message.setData(LocalDateTime.now().format(format));sseService.sendMessage(message);}
}
前端路由 IndexController.java
package com.example.demo.controller;import com.example.demo.entity.Message;
import com.example.demo.service.SseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.UUID;@Slf4j
@Controller
@RequestMapping("/sse")
public class IndexController {@Autowiredprivate SseService sseService;/*** 首页** @return*/@GetMapping("/")public String index() {return "index";}/*** 创建SSE连接** @return*/@GetMapping(path = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sse() {String uuid = UUID.randomUUID().toString();log.info("新用户连接:{}", uuid);return sseService.connect(uuid);}/*** 广播消息** @param message*/@PostMapping("/sendMessage")@ResponseBodypublic void sendMessage(@RequestBody Message message) {sseService.sendMessage(message);}
}
这里简单是用Apifox测试了一下的效果