【stomp 实战】spring websocket用户消息发送源码分析

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** stompClient.subscribe("/user/topic/subNewMsg",...)* 这个时候,后端推送消息应该这么写* msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);* 即去掉了/user前缀*/registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);}
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1stompClient.subscribe("/user/topic/answer", function (response) {//do something});
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic", "/queue")//配置stomp协议里, server返回的心跳.setHeartbeatValue(new long[]{10000L, 10000L})//配置发送心跳的scheduler.setTaskScheduler(new DefaultManagedTaskScheduler());}
  • 前端代码可以这么写
//订阅广播消息topicstompClient.subscribe("/topic/boardCast/hello", function (response) {// do something});
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;public void echo2(Msg msg) {log.info("收到的消息为:{}", msg.getContent());msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");}

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {//do something});

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate@Overridepublic void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,@Nullable MessagePostProcessor postProcessor) throws MessagingException {//对消息进行转换,对象转字符串,或者字节数组之类的Message<?> message = doConvert(payload, headers, postProcessor);//调用Send发送send(destination, message);}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Overridepublic void send(D destination, Message<?> message) {doSend(destination, message);}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate@Overrideprotected void doSend(String destination, Message<?> message) {Assert.notNull(destination, "Destination must not be null");SimpMessageHeaderAccessor simpAccessor =MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);if (simpAccessor != null) {if (simpAccessor.isMutable()) {simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);simpAccessor.setImmutable();sendInternal(message);return;}else {// Try and keep the original accessor typesimpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);initHeaders(simpAccessor);}}else {simpAccessor = SimpMessageHeaderAccessor.wrap(message);initHeaders(simpAccessor);}simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());sendInternal(message);}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());Assert.notNull(destination, "Destination header required");long timeout = this.sendTimeout;boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));if (!sent) {throw new MessageDeliveryException(message,"Failed to send message to destination '" + destination + "' within timeout: " + timeout);}}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel@Overridepublic final boolean send(Message<?> message, long timeout) {Assert.notNull(message, "Message must not be null");Message<?> messageToUse = message;ChannelInterceptorChain chain = new ChannelInterceptorChain();boolean sent = false;try {messageToUse = chain.applyPreSend(messageToUse, this);if (messageToUse == null) {return false;}sent = sendInternal(messageToUse, timeout);chain.applyPostSend(messageToUse, this, sent);chain.triggerAfterSendCompletion(messageToUse, this, sent, null);return sent;}catch (Exception ex) {chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);}catch (Throwable err) {MessageDeliveryException ex2 =new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);throw ex2;}}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel@Overridepublic boolean sendInternal(Message<?> message, long timeout) {for (MessageHandler handler : getSubscribers()) {SendTask sendTask = new SendTask(message, handler);if (this.executor == null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {Message<?> message = this.inputMessage;try {message = applyBeforeHandle(message);if (message == null) {return;}this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由messageChannel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/660994.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux服务器安全基础 - 查看入侵痕迹

1. 常见系统日志 /var/log/cron 记录了系统定时任务相关的日志 /var/log/dmesg 记录了系统在开机时内核自检的信息&#xff0c;也可以使用dmesg命令直接查看内核自检信息 /var/log/secure:记录登录系统存取数据的文件;例如:pop3,ssh,telnet,ftp等都会记录在此. /var/log/btmp:记…

特斯拉与百度合作;字节正全力追赶AI业务;小红书内测自研大模型

特斯拉中国版 FSD 或与百度合作 根据彭博社的报道&#xff0c;特斯拉将通过于百度公司达成地图和导航协议&#xff0c;扫清在中国推出 FSD 功能的关键障碍。 此前&#xff0c;中国汽车工业协会、国家计算机网络应急技术处理协调中心发布《关于汽车数据处理 4 项安全要求检测情…

Jupyter Notebook 中使用虚拟环境的Python解释器

问题&#xff1a;创建虚拟环境&#xff0c;在pycharm中配置虚拟环境的Python解释器&#xff0c;然后在pycharm中打开ipynb&#xff0c;执行发现缺少包&#xff0c;但是虚拟环境中已经安装了 解决方式&#xff1a; 配置Jupyter Notebook 使用虚拟环境的Python解释器 1&#x…

Python 与 TensorFlow2 生成式 AI(二)

原文&#xff1a;zh.annas-archive.org/md5/d06d282ea0d9c23c57f0ce31225acf76 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第四章&#xff1a;教授网络生成数字 在前一章中&#xff0c;我们涵盖了神经网络模型的构建基块。在这一章中&#xff0c;我们的第一个项目…

使用FPGA实现串-并型乘法器

介绍 其实我们知道&#xff0c;用FPGA实现乘法器并不是一件很简单的事&#xff0c;而且在FPGA中也有乘法器的IP核可以直接调用&#xff0c;我这里完全就是为了熟悉一些FPGA的语法然后写了这样一个电路。 串-并型乘法器模块 从字面上看&#xff0c;串-并乘法器就是其中一个乘数…

Flask模版详解

Flask模版详解 概述Jinja2模板引擎渲染模版的步骤变量控制结构自定义错误页面链接静态文件 概述 模板是一个包含响应文本的文件&#xff0c;其中包含用占位变量表示的动态部分&#xff0c;其具体值只在请求的上下文中才能知道。使用真实值替换变量&#xff0c;再返回最终得到的…

公考学习平台|基于SprinBoot+vue的公考学习平台(源码+数据库+文档)

公考学习平台目录 目录 基于SprinBootvue的公考学习平台 一、前言 二、系统设计 三、系统功能设计 5.1用户信息管理 5.2 视频信息管理 5.3公告信息管理 5.1论坛信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&…

【C++】:类和对象(下)

目录 一&#xff0c;再谈构造函数1.初始化列表2. 隐式类型转换的过程及其优化3. 隐式类型转换的使用4. explcit关键字5. 单参数和多参数构造函数的隐式类型转换 二&#xff0c;static成员1.静态成员变量2.静态成员函数 三&#xff0c;友元3.1 友元函数3.2 友元类 四&#xff0c…

慧言AIVoceChat实现在线客服及社区频道

原文链接&#xff1a;小回博客 慧言AI&VoceChat实现在线客服及社区频道 一、VoceChat 简介 官网&#xff1a;https://voce.chat VoceChat 是一款支持独立部署的个人云社交媒体聊天服务。15MB 的大小可部署在任何的服务器上&#xff0c;部署简单&#xff0c;很少需要维护…

IoTDB 入门教程①——时序数据库为什么选IoTDB ?

文章目录 一、前文二、性能排行第一三、完全开源四、数据文件TsFile五、乱序数据高写入六、其他七、参考 一、前文 IoTDB入门教程——导读 关注博主的同学都知道&#xff0c;博主在物联网领域深耕多年。 时序数据库&#xff0c;博主已经用过很多&#xff0c;从最早的InfluxDB&a…

UNI-APP_拨打电话权限如何去掉,访问文件权限关闭

uniapp上架过程中一直提示&#xff1a;允许“app名”拨打电话和管理通话吗&#xff1f; uniapp配置文件&#xff1a;manifest.json “permissionPhoneState” : {“request” : “none”//拨打电话权限关闭 }, “permissionExternalStorage” : {“request” : “none”//访…

C++-9

C 1.已知C风格的字符串&#xff0c;完成对字符串通过下标访问时的异常处理机制(越界访问) 2.写一个程序&#xff0c;程序包含两个类&#xff0c;类中实现一个成员函数&#xff0c;MyGetChar(), 类A中每调用一 次&#xff0c;按顺序得到一个数字字符&#xff0c;比如第-次调用得…