实时通讯技术实现

实时通讯技术实现

前言

在CS架构中,经常会有实时通信的需求。客户端和服务端建立连接,服务端实时推送数据给客户端。本文介绍几种常见的实现方式,希望能给读者们一点点参考。

实时通讯的主要实现技术

  • 长轮询(Long Polling)
  • WebSocket
  • 服务器发送事件(Server-Sent Events, SSE)
  • XMPP (Extensible Messaging and Presence Protocol)
  • MQTT (Message Queuing Telemetry Transport)

长轮询

长轮询(Long Polling): 一种网络通信机制,用于实现客户端和服务器之间的实时数据传输。

Http长轮询机制:

原理图
原理图

长轮询工作原理:

  1. client端请求server端,并约定好超时时间;
  2. server端收到请求后,判断数据是否有变化:
    • 有变化:立即返回数据;
    • 没变化:则阻塞http请求,并且将长轮询请求任务放入队列中,然后开启任务调度,调度任务在长连接维持时间到期后,会将长轮询请求移除队列,并返回对应数据。
  3. 如果在挂起的这段时间内,数据有变化,服务器会移除队列中的长轮询请求,并响应数据给客户端。

长轮询优缺点:

优点:
  • 兼容性好
  • 实现简单
  • 即时性
缺点:
  • 服务器hold住连接,占用资源
  • 会有延迟,服务器响应后,客户端要重新发起连接(这段时间内有新消息不能即时触达)

Java 示例代码


@Controller
public class LongPollingController {

    private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();

    @GetMapping("/longpolling")
    @ResponseBody
    public Object longPolling() {
        DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "time out");
        deferredResults.put("key", deferredResult); // 假设每个客户端有一个唯一的key
        return deferredResult;
    }

    @GetMapping("/push")
    public void push() {
        // 模拟异步数据获取
        deferredResults.get("key").setResult("data update"); // 当数据准备好时,触发长轮询
    }
}

DeferredResult 是 Spring MVC 提供的一种用于处理异步请求的机制,它允许在处理请求时延迟产生结果,并且允许在处理请求的不同线程中生成结果。DeferredResult 可以用于异步处理 HTTP 请求,并在处理完成后返回结果给客户端。

WebSocket

WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。

WebSocket原理图:

alt

WebSocket特点:

  1. 单一的TCP连接,采用全双工模式通信;
  2. 对代理、防火墙和路由器透明;
  3. 无头部信息、Cookie和身份验证;
  4. 无安全开销;
  5. 通过“ping/pong”帧保持链路激活;
  6. 服务器可以主动传递消息给客户端,不再需要客户端轮询。

示例代码(netty实现websocket通信)

WebSocket服务端启动类:


@Component
@Slf4j
public class WebSocketServer {

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:58080}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 启动
     *
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup, workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                //将收到的 HTTP 请求或响应的多个部分合并成一个完整的对象
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                说明:
                1、对应webSocket,它的数据是以帧(frame)的形式传递
                2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
                3、核心功能是将http协议升级为ws协议,保持长连接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
                ch.pipeline().addLast(new HeartBeatHandler());
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);

            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     *
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }

    @PostConstruct
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

业务逻辑处理器:


@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 接收到 WebSocket 文本消息
        System.out.println("Received message: " + msg.text());
        // 响应 WebSocket 文本消息
        ctx.writeAndFlush(new TextWebSocketFrame("Received your message: " + msg.text()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常处理
        cause.printStackTrace();
        ctx.close();
    }
}

SSE

SSE(Server-Sent Events,服务器发送事件)是一种用于实现服务器向客户端单向推送数据的技术。它允许服务器端在任何时候发送数据到客户端,而客户端不需要发起请求。SSE 基于 HTTP 协议,使用简单的文本格式进行通信,通常被用于实时更新网页内容、实时通知等场景。

SSE 的工作原理如下:

  1. 客户端向服务器发送一个 HTTP 请求,请求的头部包含 Accept: text/event-stream 表示接受 SSE 格式的响应。
  2. 服务器接收到请求后,保持连接打开,并在连接上周期性地发送消息给客户端。每个消息都以 data: 开头,并以两个换行符 \n\n 结束。
  3. 客户端接收到消息后,将其通过事件监听器处理。
SSE原理图:
alt

代码示例


@Controller
public class SSEController {

    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> sse() {
        return Flux.interval(Duration.ofSeconds(10))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("message")
                        .data("Hello SSE - " + LocalTime.now())
                        .build());
    }
}

Postman 响应示例:

alt

长轮询、WebSocket、SSE 对比

1. 服务器资源消耗:
  • 长轮询:服务器需要为每个客户端请求保持一个开放的连接,直到有数据发送。这导致服务器资源(如内存和连接槽)的消耗,特别是在高并发场景下。
  • WebSocket:建立后,WebSocket 提供了一个持久的、全双工的连接通道。虽然它也占用服务器资源,但由于其连接是持久的,所以不需要频繁地创建和销毁连接,相对于长轮询,这可以减少资源消耗和延迟。
  • SSE:SSE 也保持开放的连接,但只支持单向通信(服务器到客户端)。与长轮询相比,SSE 通过减少连接的建立和销毁次数来优化资源使用,但对于每个客户端,它仍然占用一个连接。
2. 网络延迟和效率:
  • 长轮询:每次请求可能在服务器有数据可发送之前保持打开状态,这可能导致网络延迟。
  • WebSocket:一旦建立,消息可以几乎无延迟地在客户端和服务器之间传输,提高了效率和实时性。
  • SSE:由于连接持续开放,SSE 可以实现低延迟的服务器到客户端消息传输,但不支持客户端到服务器的实时通信。
3. 实现复杂性:
  • 长轮询:相对简单,不需要特殊的协议支持,但服务器端需要逻辑来管理多个持续的请求。
  • WebSocket:需要在客户端和服务器端实现WebSocket协议,比长轮询实现复杂,连接的建立、错误的处理和断开连接时的重连等机制都需要考虑。
  • SSE:客户端实现相对简单,主要的复杂性在于服务器端,需要支持HTTP/1.1的持久连接。

XMPP

XMPP (Extensible Messaging and Presence Protocol) 是一个支持消息传递和状态显示的开放即时通讯协议。它实现了客户端与服务器之间的双向通信,并可以通过扩展以适应多样的即时通讯服务需求。基于 XML (Extensible Markup Language) 和 TCP/IP 协议构建,XMPP 特点包括灵活性、可扩展性和分布式架构。

XMPP设计的网络结构中定义了3类通信实体:

  • 客户端
  • 服务器
  • 网关

XMPP中基本的通信基于传统的CS模式,即客户端通过TCP/IP连接到服务器,然后通过传输XML流进行通信。

XMPP的系统原理图:

网图
网图

MQTT

MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息协议,专为低带宽和不可靠网络环境设计,广泛应用于物联网(IoT)、移动应用等场景。基于发布/订阅模型,它允许设备发布消息到主题,同时允许其他设备订阅这些主题以接收消息。MQTT运行于TCP/IP协议之上,提供了一种简单有效的方式来进行设备间的通信。

MQTT原理图:

alt

MQTT 和 XMPP 都需要额外的服务器来进行消息通信,这些服务器通常被称为 MQTT 代理(broker)和 XMPP 服务器。

总结

MQTT 和 XMPP 需要中间服务器(分别是 MQTT 代理和 XMPP 服务器)来处理消息的路由、传递和存储,增加了部署的复杂性,并需要确保中间件服务的高可用性。

相比之下,SSE(Server-Sent Events)和 WebSocket 提供了更直接的通信方式,允许服务器和客户端之间建立持久的连接。这两种技术直接基于现有的 HTTP/HTTPS 协议,可以利用现有的 Web 服务器架构进行部署,从而减少了额外的中间件需求。

在复杂性方面,MQTT 和 XMPP 要求开发者具备对相应协议的深入了解,相较于 SSE 和 WebSocket 的简单 API 来说,实现起来会相对复杂,但这些协议也提供了更丰富的灵活性。

总的来说,选择合适的技术实现取决于业务需求和现有架构,各种技术都有其适用的场景。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/577896.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶篇----内存管理

目录 引言1. 内存分布2.C动态内存管理方式&#xff1a;malloc/calloc/realloc/free3. C动态内存管理:new和delete3.1内置类型3.2 自定义类型 4.operator new与operator delete函数4.1 operator new 与operator delete函数 5. new和delete的实现底层5.1 内置类型5.2 自定义类型 …

Redis 特性,为什么要用Redis,Redis到底是多线程还是单线程

一、Redis介绍 Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务&#xff0c;是一个开源的&#xff0c;使用C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库&#xff0c;并提供多种语言的API。 二、特性(为什么要用Redis&#x…

如何注册谷歌邮箱gmail

不知道大家在工作生活中有没有需要用到谷歌邮箱的地方&#xff0c;但是最近我就用到了它。因为注册ChatGPT的事&#xff0c;用了outlook&#xff0c;hotmail邮箱注册的gpt账号都被封了&#xff0c;然后通过各方面的了解&#xff0c;发现谷歌的邮箱是没有问题的&#xff0c;不会…

基于springboot的人事管理系统

人事管理系统 摘 要 人事管理系统理工作是一种繁琐的&#xff0c;务求准确迅速的信息检索工作。随着计算机信息技术的飞速发展&#xff0c;人类进入信息时代&#xff0c;社会的竞争越来越激烈&#xff0c;人事就越显示出其不可或缺性&#xff0c;成为学校一个非常重要的模块。…

小狐狸JSON-RPC:钱包连接,断开连接,监听地址改变

detect-metamask 创建连接&#xff0c;并监听钱包切换 一、连接钱包&#xff0c;切换地址&#xff08;监听地址切换&#xff09;&#xff0c;断开连接 使用npm安装 metamask/detect-provider在您的项目目录中&#xff1a; npm i metamask/detect-providerimport detectEthereu…

36-递归与迭代

36-1 用递归和迭代解决问题 1、求n的阶乘 公式&#xff1a; n!123...(n-1)n。用递归方式定义&#xff1a;0!1&#xff0c;n!(n-1)!n。 代码1&#xff1a; 我们先回忆一下之前用循环怎么实现的吧 非递归&#xff0c;也可称迭代&#xff1a; int main() {int n 0;scanf(&q…

书生浦语训练营2期-第一节课笔记

笔记总结: 了解大模型的发展方向、本质、以及新一代数据清洗过滤技术、从模型到应用的典型流程、获取数据集的网站、不同微调方式的使用场景和训练数据是什么&#xff0c;以及预训练和微调在训练优势、通信/计算调度、显存管理上的区别。 收获&#xff1a; 理清了预训练和微调…

Laya1.8.4 UI长按选择对应位置释放技能

需求&#xff1a; 需要实现拖拽摇杆选择技能释放位置&#xff0c;释放技能。 原理&#xff1a;首先拆分需求&#xff0c;分为两部分&#xff0c;UI部分和场景部分&#xff0c;UI部分需要实现长按效果&#xff0c;长按后又要有拖动效果&#xff0c;将官方文档的示例代码改了改…

[机器学习]练习KNN算法-曼哈顿距离

曼哈顿距离(Manhattan distance) 曼哈顿距离是指在几何空间中两点之间的距离&#xff0c;其计算方法是通过将两点在各个坐标轴上的差值的绝对值相加得到。在二维空间中&#xff0c;曼哈顿距离可以表示为两点在横纵坐标上的差值的绝对值之和&#xff1b;在三维空间中&#xff0…

第十五届蓝桥杯第三期模拟赛第十题 ← 上楼梯

【问题描述】 小蓝要上一个楼梯&#xff0c;楼梯共有 n 级台阶&#xff08;即小蓝总共要走 n 级&#xff09;。小蓝每一步可以走 a 级、b 级或 c 级台阶。 请问小蓝总共有多少种方案能正好走到楼梯顶端&#xff1f;【输入格式】 输入的第一行包含一个整数 n 。 第二行包含三个整…

鸿蒙OS开发实战:【Socket小试MQTT连接】

本篇分享一下 HarmonyOS 中的Socket使用方法 将从2个方面实践&#xff1a; HarmonyOS 手机应用连接PC端 SocketServerHarmonyOS 手机应用连接MQTT 服务端 通过循序渐进的方式&#xff0c;全面了解实践HarmonyOS中的Socket用法 学习本章前先熟悉文档开发知识更新库gitee.com…

马蹄集第九周

MT3011 代码 #include<bits/stdc.h> using namespace std; const int N 1e3 7;int n; struct NODE{vector<int> v;int ind 0; }g[N];int main( ) {cin >> n;int x;for(int i 1; i < n; i){for(int j 1; j< n-1; j){cin >> x;g[i].v.push_…