在一些业务场景中 比如说用户下单支付以后 修改支付状态为 已支付 这时也要更新订单状态 订单状态就可以做成异步的来提高效率
1.什么是异步
就是我直接把修改的订单id 丢给一个消息代理 由消息代理通知订单模块 这样的话支付完成就可以下一步了 而不用非要等到订单状态也改过来才能进行下一步
2.RabbitMQ基础:
异步处理:将耗时任务放入队列中,避免阻塞主线程。
削峰填谷:通过队列缓冲高峰期流量,防止后端服务崩溃。
系统解耦:不同系统之间通过消息队列进行通信,降低耦合度。
MQ基础直接拿代码举例:
我这里要修改订单状态
向这个交换机:MqConstants.PAY_EXCHANGE_NAME key:MqConstants.PAY_SUCCESS_KEY message:po.getBizOrderNo()
rabbitTemplate.convertAndSend(MqConstants.PAY_EXCHANGE_NAME, MqConstants.PAY_SUCCESS_KEY, po.getBizOrderNo());
然后这时绑定到该交换机并且 key与他一致的queue就能接收到他传过来的数据 然后修改订单状态
这是用注解的方式创建
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.PAY_SUCCESS_QUEUE, durable = "true"),
exchange = @Exchange(name = MqConstants.PAY_EXCHANGE_NAME, type = "direct"),
key = MqConstants.PAY_SUCCESS_KEY
))
public void listenPaySuccess(Long orderId){
orderService.markOrderPaySuccess(orderId);
}
以上注解的含义为:
@QueueBinding(
value = @Queue(name = "example.queue", durable = "true"),
exchange = @Exchange(name = "example.exchange", type = "direct"),
key = "example.key"
)
创建一个名为 example.queue 的持久化队列。
创建一个名为 example.exchange 的 direct 类型交换机。
将队列 example.queue 绑定到交换机 example.exchange,并使用路由键 example.key。
简单来说就是创建了一个定义了一个持久化队列 绑定到 定义的交换机上 定义了一个路由键
MQ高级:
基础就是 创建了一个交换机 队列 路由键 然后生产者去给这个交换机附带路由键发消息 消费者 queue队列 通过路由键 绑定到这个交换机 从而完成异步通信
高级的话要考虑到更多的东西 比如说如果连接mq交换机失败了咋搞 或者说 连接到交换机但是key值不对又咋搞 又或者说我想发送一个延迟消息咋搞 消息没发过去 那我的消息是直接丢掉还是说给他存起来这些就是高级要考虑的事情.
1.首先介绍一下ReturnCallback和ConfirmCallback
ReturnCallback 用于处理消息发送到交换机后,由于路由键不匹配等原因未能被任何队列接收的情况。
当消息成功发送到交换机但未被任何队列接收时,ReturnCallback 会被调用。 要使用这个的话需要配置
ConfirmCallback 用于确认消息是否成功发送到 RabbitMQ 服务器(即交换机)。
当消息成功发送到交换机时,ConfirmCallback 会被调用,并返回一个确认(ack)。
当消息发送到交换机失败时,ConfirmCallback 也会被调用,并返回一个否定确认(nack) 要使用这个的话需要用rabbitTemplate.coverAndSend的时候多添加一个参数
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
2.本地重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力
所以我们要配置一下 如果消息一直发送失败的话 就需要采取措施 比如说失败三次就不去发送了 可以再yaml里面配置一下
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
max-attempts: 3 # 最大重试次数
3.延迟消息
简单方法使用插件交换机类型就多了一个选项
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true",type = ExchangeTypes.DIRECT, durable = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
其实跟正常direct是一样的 不过 他就像ConfirmCallback这个机制一样 需要在发送的时候添加一个条件代码如下
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
log.info("发送消息"+new String(message.getBody(), StandardCharsets.UTF_8)+ LocalDateTime.now());
return message;
}
});
}
就是这个匿名内部类中 设置了延时
4.幂等性
什么是幂等性 就是比如说你在查询的时候 点击查询 你点击了一百次出现的数据还是一样的数据 就没必要再去查99次 只用查一次就好了
解决的两种方案
4.1唯一消息ID
这个思路非常简单:
4.1.1每一条消息都生成一个唯一的id,与消息一起投递给消费者。
4.2.2 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库或Redis
4.3.3如果下次又收到相同消息,去数据库或Redis查询判断是否存在,存在则为重复消息放弃处理。
4.2业务判断
例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
如果是已支付那就说明已经改过了就没必要再改了 加个判断可以不用redis或者数据库更好一点