为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态。回执有三种可选值:
-
ack:成功处理消息,RabbitMQ从队列中删除该消息
-
nack:消息处理失败,RabbitMQ需要再次投递消息
-
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
pringAMQP帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式: -
none
:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用 -
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活 -
auto
:自动模式。当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:
通过下面的配置可以修改消息确认的处理方式为auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。
这是不行的所以我们需要配置最大重试次数
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
max-attempts: 3 # 最大重试次数 -
开启本地重试时,消息处理过程中抛出异常,不会请求到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回reject,消息会被丢弃
但是显然直接抛弃消息是不好的所以我们最好应该将发送失败的消息存到一个失败消息队列中去
我们可以定义一个MessageRecoverer
完整代码如下
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}