异步消息原理

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

在日常开发中,偶尔需要在主业务逻辑之外做一些附加操作,比如下单成功后通知商家、课程报名成功后通知老师、简历投递成功后通知HR。一般来讲,这些业务是不适合放在主线程中的:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {@Testpublic void testAsyncNotify() throws InterruptedException {long start = System.currentTimeMillis();// 投递简历,插入投递记录TimeUnit.SECONDS.sleep(2);log.info("插入投递记录完毕...");// 发送短信通知HR,并留存发送记录notifyHR("mx", "叉车师傅");writeMsg("mx", "叉车师傅");log.info("耗时:{}毫秒", System.currentTimeMillis() - start);}public void notifyHR(String username, String jobName) throws InterruptedException {TimeUnit.SECONDS.sleep(1);log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);}public void writeMsg(String username, String jobName) {// 留存消息发送记录log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);}}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[main] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

[main] INFO - com.bravo.happy.AsyncNotifyTest - 耗时:3019毫秒

消息通知等附加操作为什么不适合放在主流程呢?

  • 首先,消息通知相对没那么重要,即使发送失败了,一般还有发送记录,重新发送或者只要能追溯即可
  • 其次,在主流程中加入消息通知会减慢响应速度
  • 最后,万一消息发送失败,还可能导致事务回滚,但系统本身其实是没有问题的

多线程异步消息

一个解决办法是使用多线程,把消息发送的逻辑单独放在一个异步线程中执行,主流程处理完毕直接返回即可。为了尽可能简单,这里就不配置线程池或使用@Async了,换CompletableFuture做演示:

@Slf4j
@SpringBootTest
public class AsyncNotifyTest {@Testpublic void testAsyncNotify() throws InterruptedException {long start = System.currentTimeMillis();// 投递简历,插入投递记录TimeUnit.SECONDS.sleep(2);log.info("插入投递记录完毕...");// 异步发送短信通知HR,并留存发送记录CompletableFuture.runAsync(() -> {try {notifyHR("bravo1988", "叉车师傅");writeMsg("bravo1988", "叉车师傅");} catch (InterruptedException e) {e.printStackTrace();}});log.info("耗时:{}毫秒", System.currentTimeMillis() - start);// 为了观察到异步线程里的打印信息,主线程sleep一会儿TimeUnit.SECONDS.sleep(2);}public void notifyHR(String username, String jobName) throws InterruptedException {TimeUnit.SECONDS.sleep(1);log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);}public void writeMsg(String username, String jobName) {// 留存消息发送记录log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);}}

结果

[main] INFO - 插入投递记录完毕...

[main] INFO - 耗时:2145毫秒

[ForkJoinPool.commonPool-worker-9] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅

[ForkJoinPool.commonPool-worker-9] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅

可以看到,主线程耗时变成了2秒,如此一来用户整个投递简历的响应时间缩短了。

Spring事件监听机制

具体请参考Spring事件监听机制,本质上和多线程异步消息是一样的。

Redis实现消息队列

上面多线程版本的异步消息其实已经挺不错了,但小概率的情况下可能会出现消息丢失(虽然当前情境下无所谓):

  • 情况1:消息过多,线程数不够触发拒绝策略
  • 情况2:异步线程宕机了,消息丢失(类似于消费者挂了)

此时可以考虑使用Redis做一个简单的消息队列,数据类型可以选择List。

对于Redis的List,如果使用lpush+rpop即可实现先进先出的简单队列,而如果配合brpop则可实现阻塞队列。所谓的brpop,其实就是blocking right pop,即阻塞等待队列中的消息,一旦有消息被push进队列就从右边取出消费。

注意,rpop和brpop的区别是,rpop发现队列为空直接返回null,不会等待,也不能设置等待时间 :

lpush和brpop反映到Java代码里,可以使用RedisTemplate或StringRedisTemplate实现。

public interface RedisService {/*** 向队列插入消息** @param queue 自定义队列名称* @param obj   要存入的消息*/void pushQueue(String queue, Object obj);/*** 从队列取出消息** @param queue    自定义队列名称* @param timeout  最长阻塞等待时间* @param timeUnit 时间单位* @return*/Object popQueue(String queue, long timeout, TimeUnit timeUnit);
}
@Component
public class RedisServiceImpl implements RedisService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate ObjectMapper objectMapper;@Overridepublic void pushQueue(String queue, Object obj) {try {redisTemplate.opsForList().leftPush(queue, objectMapper.writeValueAsString(obj));} catch (JsonProcessingException e) {e.printStackTrace();}}@Overridepublic Object popQueue(String queue, long timeout, TimeUnit timeUnit) {return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);}
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisQueueTest {@Autowiredprivate RedisService redisService;public static final String ORDER_MESSAGE = "order_message";@Testpublic void testRedisBlockingQueue() throws InterruptedException {// 订单服务orderService("bravo1988", 10086L);// 启动消费者,取出消息,逐一发送new Thread(this::consumeMsg).start();// 10秒后再发一条消息,模拟第二次下单TimeUnit.SECONDS.sleep(10);orderService("bravo2020", 99999L);// 等待一会儿,观察第二条消息TimeUnit.SECONDS.sleep(10);}public void orderService(String username, Long orderId) {// 1.操作数据库,插入订单// 2.其他操作// 3.发送消息redisService.pushQueue(ORDER_MESSAGE, new Order(username, orderId));}public void consumeMsg() {for (; ; ) {Object order = redisService.popQueue(ORDER_MESSAGE, 5, TimeUnit.SECONDS);log.info("每隔5秒循环获取,期间for循环阻塞");if (order != null) {log.info("order:{}", order.toString());}}}@Data@AllArgsConstructorstatic class Order {private String username;private Long resumeId;}}

Redis实现消息队列的好处是,可以把消息存起来慢慢消费,而且项目挂了不影响已经存入的消息,重新启动后仍可继续消费:

可能有同学不禁要问:如果消息还没发送到队列中就丢失了呢?发送方也无法感知(没有应答机制),所以Redis作为消息队列还是存在很多问题的。

那为什么要在这一章节安排Redis实现消息队列呢?

首先,就我个人的感受而言,入行后有很长一段时间我都对Redis很抵触、很畏惧,导致自己一直停滞不前,其实Redis并没有我们想的那么难,只要你敢动手去敲,就会迅速熟悉起来(其他技术也是如此)。

其次,实际开发中一些小项目还是有人会用,主要是从系统复杂性考虑,不轻易引入MQ,所以仍有学习的必要。尤其是对于一些消息通知,丢了就丢了,影响不是特别大,而订单操作就不适合用Redis这么简陋的消息队列了。

上面的demo仅仅用作学习,大家有兴趣可以自行拓展,Redis还提供了消息的发布/订阅模式:

用过MQ的同学会觉得上面的模式很熟悉!

小坑

我在Redis实现消息通知的代码中,留了一个小坑,是大家平时可能会不小心犯的。

提示:和SpringBoot定时任务故事中我同事遇到的类似的坑。

在执行上面的代码时可能遇到:

nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisException: Connection closed

这是因为主线程结束导致Redis断开,而两个for循环还在操作队列。实际生产环境一般不会有问题,因为线程是一直跑着的。

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/295862.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

短链接技术解析:链接的简化之道

文章目录 前言起源实现原理常见短链接生成算法哈希算法自增计数随机生成基于关键字的生成 短链接的作用字符空间节省美化和简化个性化定制 实现一个简单的短链接服务个人简介 前言 大家在短信中是不是经常看到下面的短连接,简短易记: 看到这个时你是不是…

Python零基础教程5.0——无限画图下装逼

正方形的脸让我迷糊 引言开整完整代码1效果1完整代码2效果2完整代码3效果3 结尾 引言 哈哈,真巧 今天周末 有趣的人已经开始HAPPY 我只能码代码,写教程 不过,锻炼使我快乐! 少年的苦,中年的甘,老年的甜 …

Mybatis-TypeHandler类型转换器

文章目录 TypeHandler 接口TypeHandler 注册TypeHandler 查询别名管理总结 TypeHandler 接口 TypeHandler 这个接口 就是Mybatis的类型转换器 /*** author Clinton Begin*/ public interface TypeHandler<T> {// 在通过PreparedStatement为SQL语句绑定参数时&#xff0…

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…

80x86汇编—汇编程序基本框架

文章目录 First Program指令系统伪指令数值表达式 程序框架解释int 21 中断 通过一个基本框架解释各个指令和用处&#xff0c;方便复习。所以我认为最好的学习顺序就是先看一段完整的汇编代码程序&#xff0c;然后给你逐个逐个的解释每一个代码是干嘛用的。然后剩下的还有很多指…

LTE之接口协议

一、接口协议栈 接口是指不同网元之间的信息交互方式。既然是信息交互&#xff0c;就应该使用彼此都能看懂的语言&#xff0c;这就是接口协议。接口协议的架构称为协议栈。根据接口所处位置分为空中接口和地面接口&#xff0c;响应的协议也分为空中接口协议和地面接口协议。空…

反序列化漏洞原理、成因、危害、攻击、防护、修复方法

反序列化漏洞是一种安全漏洞&#xff0c;它允许攻击者将恶意代码注入到应用程序中。这种漏洞通常发生在应用程序从不安全的来源反序列化数据时。当应用程序反序列化数据时&#xff0c;它将数据从一种格式&#xff08;例如JSON或XML&#xff09;转换为另一种格式&#xff08;例如…

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数

信号与线性系统翻转课堂笔记7——信号正交与傅里叶级数 The Flipped Classroom7 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#xff0c;重点&a…

Java经典面试题——手写快速排序和归并排序

题目链接&#xff1a;https://www.luogu.com.cn/problem/P1177 输入模板&#xff1a; 5 4 2 4 5 1快速排序 技巧&#xff1a;交换数组中的两个位置 a[l] a[l] a[r] - (a[r] a[l]); 稳定不稳定&#xff1f;:不稳定 注意找哨兵那里内循环的等于号不能漏&#xff0c;不然…

Linux 基础指令三

一、cat命令 默认是顺序查看&#xff0c;可同时查看多个文件&#xff0c;只能看普通文件&#xff0c;不能看文件以外 使用格式: cat [选项] 文件名 常用选项 -n显示行号-b跳过空白行编号-s将所有的连续的多个空行替换为一个空行&#xff08;压缩成一个空行&#xff0…

LabVIEW在横向辅助驾驶系统开发中的应用

LabVIEW在横向辅助驾驶系统开发中的应用 随着横向辅助驾驶技术的快速发展&#xff0c;越来越多的研究致力于提高该系统的效率和安全性。项目针对先进驾驶辅助系统&#xff08;ADAS&#xff09;中的横向辅助驾驶进行深入研究。在这项研究中&#xff0c;LabVIEW作为一个强大的系…

【数据结构入门精讲 | 第十篇】考研408排序算法专项练习(二)

在上文中我们进行了排序算法的判断题、选择题的专项练习&#xff0c;在这一篇中我们将进行排序算法中编程题的练习。 目录 编程题R7-1 字符串的冒泡排序R7-1 抢红包R7-1 PAT排名汇总R7-2 统计工龄R7-1 插入排序还是堆排序R7-2 龙龙送外卖R7-3 家谱处理 编程题 R7-1 字符串的冒…