实时通讯技术实现
前言
在CS架构中,经常会有实时通信的需求。客户端和服务端建立连接,服务端实时推送数据给客户端。本文介绍几种常见的实现方式,希望能给读者们一点点参考。
实时通讯的主要实现技术
-
长轮询(Long Polling) -
WebSocket -
服务器发送事件(Server-Sent Events, SSE) -
XMPP (Extensible Messaging and Presence Protocol) -
MQTT (Message Queuing Telemetry Transport)
长轮询
长轮询(Long Polling): 一种网络通信机制,用于实现客户端和服务器之间的实时数据传输。
Http长轮询机制:
长轮询工作原理:
-
client端请求server端,并约定好超时时间; -
server端收到请求后,判断数据是否有变化: -
有变化:立即返回数据; -
没变化:则阻塞http请求,并且将长轮询请求任务放入队列中,然后开启任务调度,调度任务在长连接维持时间到期后,会将长轮询请求移除队列,并返回对应数据。
-
-
如果在挂起的这段时间内,数据有变化,服务器会移除队列中的长轮询请求,并响应数据给客户端。
长轮询优缺点:
优点:
-
兼容性好 -
实现简单 -
即时性
缺点:
-
服务器hold住连接,占用资源 -
会有延迟,服务器响应后,客户端要重新发起连接(这段时间内有新消息不能即时触达)
Java 示例代码
@Controller
public class LongPollingController {
private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();
@GetMapping("/longpolling")
@ResponseBody
public Object longPolling() {
DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "time out");
deferredResults.put("key", deferredResult); // 假设每个客户端有一个唯一的key
return deferredResult;
}
@GetMapping("/push")
public void push() {
// 模拟异步数据获取
deferredResults.get("key").setResult("data update"); // 当数据准备好时,触发长轮询
}
}
DeferredResult 是 Spring MVC 提供的一种用于处理异步请求的机制,它允许在处理请求时延迟产生结果,并且允许在处理请求的不同线程中生成结果。DeferredResult 可以用于异步处理 HTTP 请求,并在处理完成后返回结果给客户端。
WebSocket
WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。
WebSocket原理图:
WebSocket特点:
-
单一的TCP连接,采用全双工模式通信; -
对代理、防火墙和路由器透明; -
无头部信息、Cookie和身份验证; -
无安全开销; -
通过“ping/pong”帧保持链路激活; -
服务器可以主动传递消息给客户端,不再需要客户端轮询。
示例代码(netty实现websocket通信)
WebSocket服务端启动类:
@Component
@Slf4j
public class WebSocketServer {
/**
* webSocket协议名
*/
private static final String WEBSOCKET_PROTOCOL = "WebSocket";
/**
* 端口号
*/
@Value("${webSocket.netty.port:58080}")
private int port;
/**
* webSocket路径
*/
@Value("${webSocket.netty.path:/webSocket}")
private String webSocketPath;
@Autowired
private WebSocketHandler webSocketHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
/**
* 启动
*
* @throws InterruptedException
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup, workGroup);
// 设置NIO类型的channel
bootstrap.channel(NioServerSocketChannel.class);
// 设置监听端口
bootstrap.localAddress(new InetSocketAddress(port));
// 连接到达时会创建一个通道
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 流水线管理通道中的处理程序(Handler),用来处理业务
// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ObjectEncoder());
// 以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
//将收到的 HTTP 请求或响应的多个部分合并成一个完整的对象
ch.pipeline().addLast(new HttpObjectAggregator(8192));
/*
说明:
1、对应webSocket,它的数据是以帧(frame)的形式传递
2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
3、核心功能是将http协议升级为ws协议,保持长连接
*/
ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
ch.pipeline().addLast(new HeartBeatHandler());
// 自定义的handler,处理业务逻辑
ch.pipeline().addLast(webSocketHandler);
}
});
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}
/**
* 释放资源
*
* @throws InterruptedException
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
}
@PostConstruct
public void init() {
//需要开启一个新的线程来执行netty server 服务器
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
业务逻辑处理器:
@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 接收到 WebSocket 文本消息
System.out.println("Received message: " + msg.text());
// 响应 WebSocket 文本消息
ctx.writeAndFlush(new TextWebSocketFrame("Received your message: " + msg.text()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常处理
cause.printStackTrace();
ctx.close();
}
}
SSE
SSE(Server-Sent Events,服务器发送事件)是一种用于实现服务器向客户端单向推送数据的技术。它允许服务器端在任何时候发送数据到客户端,而客户端不需要发起请求。SSE 基于 HTTP 协议,使用简单的文本格式进行通信,通常被用于实时更新网页内容、实时通知等场景。
SSE 的工作原理如下:
-
客户端向服务器发送一个 HTTP 请求,请求的头部包含 Accept: text/event-stream 表示接受 SSE 格式的响应。 -
服务器接收到请求后,保持连接打开,并在连接上周期性地发送消息给客户端。每个消息都以 data: 开头,并以两个换行符 \n\n 结束。 -
客户端接收到消息后,将其通过事件监听器处理。
SSE原理图:
代码示例
@Controller
public class SSEController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse() {
return Flux.interval(Duration.ofSeconds(10))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("message")
.data("Hello SSE - " + LocalTime.now())
.build());
}
}
Postman 响应示例:
长轮询、WebSocket、SSE 对比
1. 服务器资源消耗:
-
长轮询:服务器需要为每个客户端请求保持一个开放的连接,直到有数据发送。这导致服务器资源(如内存和连接槽)的消耗,特别是在高并发场景下。 -
WebSocket:建立后,WebSocket 提供了一个持久的、全双工的连接通道。虽然它也占用服务器资源,但由于其连接是持久的,所以不需要频繁地创建和销毁连接,相对于长轮询,这可以减少资源消耗和延迟。 -
SSE:SSE 也保持开放的连接,但只支持单向通信(服务器到客户端)。与长轮询相比,SSE 通过减少连接的建立和销毁次数来优化资源使用,但对于每个客户端,它仍然占用一个连接。
2. 网络延迟和效率:
-
长轮询:每次请求可能在服务器有数据可发送之前保持打开状态,这可能导致网络延迟。 -
WebSocket:一旦建立,消息可以几乎无延迟地在客户端和服务器之间传输,提高了效率和实时性。 -
SSE:由于连接持续开放,SSE 可以实现低延迟的服务器到客户端消息传输,但不支持客户端到服务器的实时通信。
3. 实现复杂性:
-
长轮询:相对简单,不需要特殊的协议支持,但服务器端需要逻辑来管理多个持续的请求。 -
WebSocket:需要在客户端和服务器端实现WebSocket协议,比长轮询实现复杂,连接的建立、错误的处理和断开连接时的重连等机制都需要考虑。 -
SSE:客户端实现相对简单,主要的复杂性在于服务器端,需要支持HTTP/1.1的持久连接。
XMPP
XMPP (Extensible Messaging and Presence Protocol) 是一个支持消息传递和状态显示的开放即时通讯协议。它实现了客户端与服务器之间的双向通信,并可以通过扩展以适应多样的即时通讯服务需求。基于 XML (Extensible Markup Language) 和 TCP/IP 协议构建,XMPP 特点包括灵活性、可扩展性和分布式架构。
XMPP设计的网络结构中定义了3类通信实体:
-
客户端 -
服务器 -
网关
XMPP中基本的通信基于传统的CS模式,即客户端通过TCP/IP连接到服务器,然后通过传输XML流进行通信。
XMPP的系统原理图:
MQTT
MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息协议,专为低带宽和不可靠网络环境设计,广泛应用于物联网(IoT)、移动应用等场景。基于发布/订阅模型,它允许设备发布消息到主题,同时允许其他设备订阅这些主题以接收消息。MQTT运行于TCP/IP协议之上,提供了一种简单有效的方式来进行设备间的通信。
MQTT原理图:
MQTT 和 XMPP 都需要额外的服务器来进行消息通信,这些服务器通常被称为 MQTT 代理(broker)和 XMPP 服务器。
总结
MQTT 和 XMPP 需要中间服务器(分别是 MQTT 代理和 XMPP 服务器)来处理消息的路由、传递和存储,增加了部署的复杂性,并需要确保中间件服务的高可用性。
相比之下,SSE(Server-Sent Events)和 WebSocket 提供了更直接的通信方式,允许服务器和客户端之间建立持久的连接。这两种技术直接基于现有的 HTTP/HTTPS 协议,可以利用现有的 Web 服务器架构进行部署,从而减少了额外的中间件需求。
在复杂性方面,MQTT 和 XMPP 要求开发者具备对相应协议的深入了解,相较于 SSE 和 WebSocket 的简单 API 来说,实现起来会相对复杂,但这些协议也提供了更丰富的灵活性。
总的来说,选择合适的技术实现取决于业务需求和现有架构,各种技术都有其适用的场景。
本文由 mdnice 多平台发布