示例场景:
消费者发送消息给normal_exchange交换机,队列normal-queue通过routingKeyz:zhangsan绑定normal_exchange,消费者C1消费。
使队列normal_queue达到死信条件时候,normal-queue队列把消息发送给dead-exchange交换机,routingKey为lisi。这个死信队列由C2消费者处理。
rabbitMQ出现死信场景:
1.消息被拒(消费者ack应答)
2.消息TTL过期(声明队列时候或者生产者发送消失时设置TTL)
3.队列达到最大长度(声明队列时候设置)
消费者C1 声明两个交换机,两个队列以及两个队列与交换机的绑定关系,也设定好正常队列与死信交换机的关系 当前代码时消费者被拒时的死信队列
package com.esint.rabbitmq.work06;import com.esint.rabbitmq.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;/*** 私信队列 处理*/
public class Consumer01 {public static final String NORMAL_EXCHANGE = "normal_exchange";public static final String DEAD_EXCHANGE = "dead_exchange";public static final String NORMAL_QUEUE = "normal_queue";public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();/*** 声明交换机*/channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/*** 声明队列* 队列转发私信交换机配置** 【设置过期时间】* x-message-ttl 10000毫秒 队列设置过期时间 也可以在发送的时候设置* 正常队列 转发给死信交换机 设置过期后的死信交换机* 【过期成为死信交换机指定名称】 x-dead-letter-exchange* 【过期成为私信指定routig-key】 x-dead-letter-routing-key**/HashMap<String, Object> arguments = new HashMap<>();
// arguments.put("x-message-ttl",10000); //队列设置过期时间 单位毫秒 这里可以设定,发消息也可以设定。通常在发消息时候设定
// arguments.put("x-max-length",6);//设置最大队列长度arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key","lisi");//注意 :通常我们消费者发送消息至交换机时指定routing-key 这里的消息在特定情况下发送到交换机,channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);/***/channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//队列绑定交换机 *普通channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//队列绑定交换机 *死信channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");DeliverCallback deliverCallback = (deliverTag,mes)->{String getmsg = new String(mes.getBody(),"UTF-8");if(getmsg.equals("message7")){channel.basicReject(mes.getEnvelope().getDeliveryTag(),false);//第二个参数时:拒绝后是否回滚队列System.out.println("此消息时被拒绝的C1:"+ getmsg);}else{channel.basicAck(mes.getEnvelope().getDeliveryTag(),false);System.out.println("消费者1接收的消息" + new String(mes.getBody(),"UTF-8"));}};boolean autoAck = false;channel.basicConsume(NORMAL_QUEUE,autoAck,deliverCallback,tag->{});}
}
生产者:单纯的发送消息给正常交换机. 消息过期时间
package com.esint.rabbitmq.work06;import com.esint.rabbitmq.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;/*** 私信队列处理的生产者*/
public class Produc {public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();//交换机 队列已经在消费01声明了 所以这里不要在声明 注意这里的测试需要先启动消费者01AMQP.BasicProperties poperties = new AMQP.BasicProperties().builder().expiration("10000").build();//设置消息过期TTLfor (int i = 0; i < 10; i++) {String mes = "message" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",poperties,mes.getBytes("UTF-8"));}}
}
消费者C2消费 死信队列
package com.esint.rabbitmq.work06;import com.esint.rabbitmq.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMQUtils.getChannel();System.out.println("C2等待接受消息...");DeliverCallback deliverCallback = (deleverTag,mes)->{System.out.println(new String(mes.getBody(),"UTF-8"));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,tags->{});}
}
实用场景:
用户在商城购买商品提交订单后,假设30分钟内为正常队列,超期未支付为死信队列 TTL