作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
在日常开发中,偶尔需要在主业务逻辑之外做一些附加操作,比如下单成功后通知商家、课程报名成功后通知老师、简历投递成功后通知HR。一般来讲,这些业务是不适合放在主线程中的:
@Slf4j
@SpringBootTest
public class AsyncNotifyTest {@Testpublic void testAsyncNotify() throws InterruptedException {long start = System.currentTimeMillis();// 投递简历,插入投递记录TimeUnit.SECONDS.sleep(2);log.info("插入投递记录完毕...");// 发送短信通知HR,并留存发送记录notifyHR("mx", "叉车师傅");writeMsg("mx", "叉车师傅");log.info("耗时:{}毫秒", System.currentTimeMillis() - start);}public void notifyHR(String username, String jobName) throws InterruptedException {TimeUnit.SECONDS.sleep(1);log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);}public void writeMsg(String username, String jobName) {// 留存消息发送记录log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);}}
结果
[main] INFO - 插入投递记录完毕...
[main] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅
[main] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅
[main] INFO - com.bravo.happy.AsyncNotifyTest - 耗时:3019毫秒
消息通知等附加操作为什么不适合放在主流程呢?
- 首先,消息通知相对没那么重要,即使发送失败了,一般还有发送记录,重新发送或者只要能追溯即可
- 其次,在主流程中加入消息通知会减慢响应速度
- 最后,万一消息发送失败,还可能导致事务回滚,但系统本身其实是没有问题的
多线程异步消息
一个解决办法是使用多线程,把消息发送的逻辑单独放在一个异步线程中执行,主流程处理完毕直接返回即可。为了尽可能简单,这里就不配置线程池或使用@Async了,换CompletableFuture做演示:
@Slf4j
@SpringBootTest
public class AsyncNotifyTest {@Testpublic void testAsyncNotify() throws InterruptedException {long start = System.currentTimeMillis();// 投递简历,插入投递记录TimeUnit.SECONDS.sleep(2);log.info("插入投递记录完毕...");// 异步发送短信通知HR,并留存发送记录CompletableFuture.runAsync(() -> {try {notifyHR("bravo1988", "叉车师傅");writeMsg("bravo1988", "叉车师傅");} catch (InterruptedException e) {e.printStackTrace();}});log.info("耗时:{}毫秒", System.currentTimeMillis() - start);// 为了观察到异步线程里的打印信息,主线程sleep一会儿TimeUnit.SECONDS.sleep(2);}public void notifyHR(String username, String jobName) throws InterruptedException {TimeUnit.SECONDS.sleep(1);log.info("【发送消息】HR你好,用户:{}, 投递你的岗位:{}", username, jobName);}public void writeMsg(String username, String jobName) {// 留存消息发送记录log.info("【保存消息】保存到数据库, 用户:{}, 岗位:{}", username, jobName);}}
结果
[main] INFO - 插入投递记录完毕...
[main] INFO - 耗时:2145毫秒
[ForkJoinPool.commonPool-worker-9] INFO - 【发送消息】HR你好,用户:mx, 投递你的岗位:叉车师傅
[ForkJoinPool.commonPool-worker-9] INFO - 【保存消息】保存到数据库, 用户:mx, 岗位:叉车师傅
可以看到,主线程耗时变成了2秒,如此一来用户整个投递简历的响应时间缩短了。
Spring事件监听机制
具体请参考Spring事件监听机制,本质上和多线程异步消息是一样的。
Redis实现消息队列
上面多线程版本的异步消息其实已经挺不错了,但小概率的情况下可能会出现消息丢失(虽然当前情境下无所谓):
- 情况1:消息过多,线程数不够触发拒绝策略
- 情况2:异步线程宕机了,消息丢失(类似于消费者挂了)
此时可以考虑使用Redis做一个简单的消息队列,数据类型可以选择List。
对于Redis的List,如果使用lpush+rpop即可实现先进先出的简单队列,而如果配合brpop则可实现阻塞队列。所谓的brpop,其实就是blocking right pop,即阻塞等待队列中的消息,一旦有消息被push进队列就从右边取出消费。
注意,rpop和brpop的区别是,rpop发现队列为空直接返回null,不会等待,也不能设置等待时间 :
lpush和brpop反映到Java代码里,可以使用RedisTemplate或StringRedisTemplate实现。
public interface RedisService {/*** 向队列插入消息** @param queue 自定义队列名称* @param obj 要存入的消息*/void pushQueue(String queue, Object obj);/*** 从队列取出消息** @param queue 自定义队列名称* @param timeout 最长阻塞等待时间* @param timeUnit 时间单位* @return*/Object popQueue(String queue, long timeout, TimeUnit timeUnit);
}
@Component
public class RedisServiceImpl implements RedisService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate ObjectMapper objectMapper;@Overridepublic void pushQueue(String queue, Object obj) {try {redisTemplate.opsForList().leftPush(queue, objectMapper.writeValueAsString(obj));} catch (JsonProcessingException e) {e.printStackTrace();}}@Overridepublic Object popQueue(String queue, long timeout, TimeUnit timeUnit) {return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);}
}
Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisQueueTest {@Autowiredprivate RedisService redisService;public static final String ORDER_MESSAGE = "order_message";@Testpublic void testRedisBlockingQueue() throws InterruptedException {// 订单服务orderService("bravo1988", 10086L);// 启动消费者,取出消息,逐一发送new Thread(this::consumeMsg).start();// 10秒后再发一条消息,模拟第二次下单TimeUnit.SECONDS.sleep(10);orderService("bravo2020", 99999L);// 等待一会儿,观察第二条消息TimeUnit.SECONDS.sleep(10);}public void orderService(String username, Long orderId) {// 1.操作数据库,插入订单// 2.其他操作// 3.发送消息redisService.pushQueue(ORDER_MESSAGE, new Order(username, orderId));}public void consumeMsg() {for (; ; ) {Object order = redisService.popQueue(ORDER_MESSAGE, 5, TimeUnit.SECONDS);log.info("每隔5秒循环获取,期间for循环阻塞");if (order != null) {log.info("order:{}", order.toString());}}}@Data@AllArgsConstructorstatic class Order {private String username;private Long resumeId;}}
Redis实现消息队列的好处是,可以把消息存起来慢慢消费,而且项目挂了不影响已经存入的消息,重新启动后仍可继续消费:
可能有同学不禁要问:如果消息还没发送到队列中就丢失了呢?发送方也无法感知(没有应答机制),所以Redis作为消息队列还是存在很多问题的。
那为什么要在这一章节安排Redis实现消息队列呢?
首先,就我个人的感受而言,入行后有很长一段时间我都对Redis很抵触、很畏惧,导致自己一直停滞不前,其实Redis并没有我们想的那么难,只要你敢动手去敲,就会迅速熟悉起来(其他技术也是如此)。
其次,实际开发中一些小项目还是有人会用,主要是从系统复杂性考虑,不轻易引入MQ,所以仍有学习的必要。尤其是对于一些消息通知,丢了就丢了,影响不是特别大,而订单操作就不适合用Redis这么简陋的消息队列了。
上面的demo仅仅用作学习,大家有兴趣可以自行拓展,Redis还提供了消息的发布/订阅模式:
用过MQ的同学会觉得上面的模式很熟悉!
小坑
我在Redis实现消息通知的代码中,留了一个小坑,是大家平时可能会不小心犯的。
提示:和SpringBoot定时任务故事中我同事遇到的类似的坑。
在执行上面的代码时可能遇到:
nested exception is io.lettuce.core.RedisException: io.lettuce.core.RedisException: Connection closed
这是因为主线程结束导致Redis断开,而两个for循环还在操作队列。实际生产环境一般不会有问题,因为线程是一直跑着的。
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO
进群,大家一起学习,一起进步,一起对抗互联网寒冬