RabbitMQ系列(14)--Topics交换机的简介与实现

1、Topics交换机的介绍

Topics交换机能让消息只发送往绑定了指定routingkey的队列中去,不同于Direct交换机的是,Topics能把一个消息往多个不同的队列发送;Topics交换机的routingkey不能随意写,必须是一个单词列表,并以点号分隔开,例如“one.two.three”,除此外还有两个替换符,*(星号)能代替一个单词,#(井号)可以代替零个或多个单词,例如“*.one.*”是中间是one的3个单词,“*.*.one”是最后一个是one的3个单词,“one.#”是第一个单词是one的多个单词,若队列绑定键是#,这个队列将接收所有数据,这时候类似fanout交换机,若队列绑定键中没有#和*出现,这时候就类似direct交换机

 2、Topics交换机的实现 

(1)新建一个名为topics的包,用于装发布确认的代码

效果图:

(2)新建一个名为Receive01的类用于编写消费者的代码

代码如下:

 注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

RabbitMQ系列(6)--RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客

package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;/*** 消息接收*/
public class Receive01 {//声明交换机的名称public static  final String EXCHANGE_NAME = "topic_exchange";//接收消息public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"topic");//声明队列String queueName = "Q1";/*** 创建队列* 第一个参数:队列名称* 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中* 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false* 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除* 第五个参数:其他参数*/channel.queueDeclare(queueName,false,false,false,null);//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,"*.one.*");/*** 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)* 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数** 以下是DeliverCallback接口的源代码*  @FunctionalInterface*  public interface DeliverCallback {*      void handle (String consumerTag, Delivery message) throws IOException;*  }*/DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());};/*** 用信道对消息进行接收* 第一个参数:消费的是哪一个队列的消息* 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答* 第三个参数:消费者接收消息后的回调方法* 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)*/channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});}}

(3)复制Receive01类并粘贴重命名为Receive02

代码如下:

package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** 消息接收*/
public class Receive02 {//声明交换机的名称public static  final String EXCHANGE_NAME = "topic_exchange";//接收消息public static void main(String[] args) throws Exception{Channel channel = RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"topic");//声明队列String queueName = "Q1";/*** 创建队列* 第一个参数:队列名称* 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中* 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false* 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除* 第五个参数:其他参数*/channel.queueDeclare(queueName,false,false,false,null);//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,"*.*.two");//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,"three.#");/*** 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)* 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数** 以下是DeliverCallback接口的源代码*  @FunctionalInterface*  public interface DeliverCallback {*      void handle (String consumerTag, Delivery message) throws IOException;*  }*/DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody(),"UTF-8"));System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());};/*** 用信道对消息进行接收* 第一个参数:消费的是哪一个队列的消息* 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答* 第三个参数:消费者接收消息后的回调方法* 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)*/channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});}}

(4)新建一个名为Emit的类用于编写生产者的代码

代码如下:

package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;/*** 发消息*/
public class Emit {//声明交换机的名称public static  final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Map<String,String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("four.one.two","被队列Q1Q2接收");bindingKeyMap.put("three.one.five","被队列Q1Q2接收");bindingKeyMap.put("four.one.six","被队列Q1接收");bindingKeyMap.put("three.seven.six","被队列Q2接收");bindingKeyMap.put("three.eight.two","虽然满足两个绑定,但只被队列Q2接收一次");bindingKeyMap.put("three.seven.six","不匹配任何绑定,不会被任何队列接收到,会被丢弃");bindingKeyMap.put("four.one.nine.two","四个单词,不匹配任何绑定,会被丢弃");bindingKeyMap.put("three.one.nine.two","四个单词,但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}}

 (5)分别先运行Receive01、Receive02、Emit

(6)查看Receive01和Receive02接收消息的情况

从上述结果可看出topic交换机实现成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18641.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

23款奔驰GLE450动感型升级柏林之声音响系统,体验不一样的感觉

奔驰GLE450动感型升级柏林之声的音响效果自然非同凡响&#xff0c;在人声、交响乐音乐厅感受方面都有非常逼真的现场感受&#xff0c;结合柏林之声的界面调整&#xff0c;可以在不同方位体验的高保真的音乐之享&#xff01; 小柏林音响总共13个喇叭1台功放由4个高音、4个中音、…

13 个最佳免费 PDF 编辑器清单

您正在寻找一款真正免费的 PDF 编辑器&#xff0c;不仅可以编辑和添加文本&#xff0c;还可以更改图像、添加您自己的图形、签署您的名字、填写表格等等&#xff1f;您来对地方了&#xff1a;我研究了这些类型的应用程序&#xff0c;以得出您正在寻找的内容的列表。 其中一些是…

Linux·图解Linux网络包接收过程

因为要对百万、千万、甚至是过亿的用户提供各种网络服务&#xff0c;所以在一线互联网企业里面试和晋升后端开发同学的其中一个重点要求就是要能支撑高并发&#xff0c;要理解性能开销&#xff0c;会进行性能优化。而很多时候&#xff0c;如果你对Linux底层的理解不深的话&…

hadoop -- Hbase

HBase是一个分布式、可扩展、面向列的数据存储&#xff08;百万级别列&#xff09;、可伸缩、高可靠性、实时读写的NoSQL 数据库。 HBase利用 Hadoop的 HDFS作为其文件存储系统&#xff0c; 利用MapReduce 来处理HBase中的海量数据&#xff0c; 利用Zookeeper作为分布式协同服…

《Redis 核心技术与实战》课程学习笔记(七)

切片集群&#xff1a;数据增多了&#xff0c;是该加内存还是加实例&#xff1f; 切片集群&#xff0c;也叫分片集群&#xff0c;就是指启动多个 Redis 实例组成一个集群&#xff0c;然后按照一定的规则&#xff0c;把收到的数据划分成多份&#xff0c;每一份用一个实例来保存。…

手机屏幕点胶区域定位机器视觉系统软硬件方案

【检测目的】 点胶之前定位产品&#xff0c;找寻效果较明显的边缘位置 【样品一】 两张图片为一个产品点胶部位的左边和右边。通过边缘A和边缘B这两条线可以找出交点位置做为定位点&#xff08;产品不平会造成图像模糊的情况&#xff09; 【样品二】 两张图片为一个产品点胶部位…

【Zabbix 监控 Windows 系统,Java应用,SNMP】

目录 一、Zabbix 监控 Windows 系统1、下载 Windows 客户端 Zabbix agent 22、安装客户端&#xff0c;配置3、在服务端 Web 页面添加主机&#xff0c;关联模板 二、Zabbix 监控 java 应用1、客户端开启 java jmxremote 远程监控功能1、配置 java jmxremote 远程监控功能2、启动…

Python算法笔记(1)-时间复杂度、空间复杂度

Python算法笔记&#xff08;1&#xff09;-时间复杂度 1.时间复杂度 时间复杂度是一个描述算法的运行时间的一个函数&#xff0c;它描述了算法的运行时间和输入数据的规模之间的关系&#xff0c;时间复杂度的表示方法用O表示&#xff0c;时间复杂度也用来考察输入值无限趋近无…

【NLP】基础工程:词嵌入

一、说明 词嵌入是高维向量空间中单词或短语的数字表示,其中向量之间的几何关系捕获相应单词之间的语义和句法相似性。这些表示使机器学习模型能够以更有意义的方式理解和处理自然语言。 在传统的 NLP 方法中,单词是使用稀疏的 one-hot 编码向量来表示的,其中每个单词在大词…

专治疑难系列 - 解决“npm ERR!”报错问题

‍‍&#x1f3e1;博客主页&#xff1a; Passerby_Wang的博客_CSDN博客-系统运维,云计算,Linux基础领域博主 &#x1f310;所属专栏&#xff1a;『专治疑难系列』 &#x1f30c;上期文章&#xff1a; 专治疑难系列 - 解决打印机凭证冲突问题 &#x1f4f0;如觉得博主文章写的…

插入排序法解析

插入排序法解析 什么是插入排序法 插入排序法是一种简单但有效的排序算法&#xff0c;其基本思想是将一个待排序的元素逐个插入到已经排好序的元素序列中&#xff0c;直至所有元素都被插入完成&#xff0c;从而得到一个有序序列。 具体步骤如下&#xff1a; 假设初始时&…

css实现九宫格有边框,最外层四周无边框

1.先设置9个div&#xff0c;如下&#xff1a; <div class"wrapper"><div class"cell"></div><div class"cell"></div><div class"cell"></div><div class"cell"></div&…