Routing Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
、 item.#
、item.*
。
统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配[0-n]个词a.# 可以匹配 a.b、a.b.c、a.b.c.d 等只要是a.开头的情况
a.* 只能匹配 a.b 这种后面只有一个单词的情况
通配符可以出现在
注意:RoutingKey的任意位置。
创建生产者
public class MyProducer {@Testpublic void test() throws Exception {// 交换机String exchange = "logs_topic";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(exchange, "topic");// 发布消息channel.basicPublish(exchange, "a.b", null, "a.b".getBytes());channel.basicPublish(exchange, "a.b.c", null, "a.b.c".getBytes());channel.basicPublish(exchange, "a.b.c.d", null, "a.b.c.d" .getBytes());channel.basicPublish(exchange, "a.b.c.d.e", null, "a.b.c.d.e".getBytes());}
}
创建消费者1
public class MyConsumer1 {public static void main(String[] args) throws Exception {// 指定交换机String exchange = "logs_topic";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare(exchange, "topic");// 创建临时队列String queue = channel.queueDeclare().getQueue();// 将临时队列绑定exchangechannel.queueBind(queue, exchange, "a.*");channel.queueBind(queue, exchange, "#.d.#");// 处理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: " + new String(body));// TODO 业务处理}});}
}
创建消费者2
public class MyConsumer2 {public static void main(String[] args) throws Exception {// 指定交换机String exchange = "logs_topic";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare(exchange, "topic");// 创建临时队列String queue = channel.queueDeclare().getQueue();// 将临时队列绑定exchangechannel.queueBind(queue, exchange, "#.b.#");// 处理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: " + new String(body));// TODO 业务处理}});}
}
生产者生产的消息:a.b
、a.b.c
、a.b.c.d
、a.b.c.d.e
消费者1接受的消息规则为:
channel.queueBind(queue, exchange, "a.*");
channel.queueBind(queue, exchange, "#.d.#");
所以消费者1将会接收到:a.b
、a.b.c.d
、a.b.c.d.e
消费者2接受的消息规则为:
channel.queueBind(queue, exchange, "#.b.#");
所以消费者2将会接收到:a.b
、a.b.c
、a.b.c.d
、a.b.c.d.e