模拟实现消息队列项目(系列5) -- 服务器模块(虚拟主机)

目录

前言

1. 创建VirtualHost

1.1 定义虚拟主机的相关属性

1.2 VirtualHost 构造方法 

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

1.3.2 队列操作 

1.4 绑定的创建和删除

1.5 发送消息到指定的队列/交换机

2. 实现路由规则Router

2.1 checkBindingKey()

2.2 checkRoutingKey()

2.3 route()

2.4 单元测试

3. 订阅消息

3.1 添加一个订阅者

3.2 创建订阅者管理类ConsumerManager

3.3 订阅消息小结

4. 消息确认 basicAck()

5. VirtualHost单元测试

结语


前言

        写到这里,内存和硬盘的数据就组织完毕了,接下来我们就会引入在消息队列初识中提出的一个概念 --- 虚拟主机.简单回顾一下虚拟主机的概念: 它类似于MySQL的database,是一个逻辑的集合,一个BrokerServer上可以存在多个VirtualHost.在一个BrokerServer上可以组织不同的数据,可以使用不同的虚拟主机做出逻辑上的区分.本章节就是进行进一步的封装,同时实现一些消息队列的API.这里需要注意的是在RabbitMq中,虚拟主机是可以随便创建和删除的,在本项目目前只是默认只有一个虚拟主机的存在,后续根据情况会进行扩展,这里也提前预留了对于多虚拟主机的管理的数据结构.保证了不同虚拟机中的交换机 队列 绑定 消息都是相互隔离的.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 创建VirtualHost

👇👇👇

注意: 这一块比较重要也比较复杂,所以将代码进行截图加标注的形式进行总结,完整的VirtualHost.class代码会在讲解完给出.

👆👆👆

1.1 定义虚拟主机的相关属性

Router: 是用来定义交换机转发的规则,主要实现的是对routingKey进行验证以及判断,具体的细节会在后面给出.

ConsumerManager: 实现的是管理消费者进行消费.

以上两者就是锁对象了,后续我们要对硬盘和内存进行数据的读写,为了保证操作的原子性,以及线程安全我们会给相关操作进行加锁. 

1.2 VirtualHost 构造方法 

主要就是传入虚拟主机的名字,对该虚拟主机的数据库以及文件信息进行初始化,主要是对数据库进行初始化.具体DataBaseManager.init()

初始化内容如下:

 初始化完成,将硬盘中的数据恢复到内存中

至此前置工作就差不多了.下面对一些重要的方法进行创建.

1.3 交换机和队列的创建和删除

1.3.1 交换机操作

如果交换机不存在就进行创建,存在就直接返回(ExchangeDeclare)

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 判断交换机是否存在: 存在直接返回true即可,不存在就直接创建新的交换机即可.设置交换机的属性,根据是否持久化写入到硬盘,然后在写入到内存.这里需要注意的是,我们一定要先写硬盘再写内存,因为些硬盘是一个失败率很高的事情,经常会因为文件权限问题导致数据写入不进去.如果先写内存,而硬盘写入不进去,就还需要堆内存的数据进行删除,这就很繁琐了.
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/*** 1. 创建交换机* 如果交换机不存在就进行创建,存在就直接返回*/// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName,ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String, Object> arguments) {// 1. 更改交换机的名字 交换机的名字 = 虚拟主机 + 交换机exchangeName = virtualHostName + exchangeName;try{synchronized (exchangeLocker){// 2. 判定该交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange != null){System.out.println("[VirtualHost] 交换机已经存在!");return true;}// 3. 不存在,直接进行创建新的交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 4. 将构造好的交换机进行写入硬盘(含有持久化信息的交换机)  先写硬盘后写内存if (durable){diskDataCenter.insertExchange(exchange);}// 5. 将交换机写入到内存中memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成! exchangeName="+exchangeName);// 上述操作为什么不先写内存后写硬盘?// 因为写硬盘操作比较容易出现异常,如果写入硬盘失败,写入内存成功,再进行从内存中进行删除就比较麻烦了}return true;} catch (Exception e){System.out.println("[VirtualHost] 交换机创建失败! exchangeName="+exchangeName);e.printStackTrace();return false;}}

删除交换机

  • 1. 更改交换机的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字(更加方便后续的管理)
  • 2. 根据交换机的名字得到交换机对象,判断交换机是否为空,不为空进行删除操作,还是先进行删除硬盘的数据,再删除内存中数据
  • 3. 以上整个操作是对交换机进行读写操作,为了保证线程安全,我们进行加锁操作.
/*** 2.删除交换机* @param exchangeName 交换机名字* @return*/public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1. 先找到对应的交换机.Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if (toDelete == null) {throw new MqException("[VirtualHost] 交换机不存在无法删除!");}// 2. 删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3. 删除内存中的交换机数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}

1.3.2 队列操作 

针对队列创建和删除操作,这里就不做过多的解释了,过程跟上述交换机的操作一样. 下面给出代码:

/*** 3. 创建队列* @param queueName 队列名* @param durable 持久化* @param exclusive 队列独有* @param autoDelete 自动删除* @param arguments 其他声明* @return*/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {// 把队列的名字, 给拼接上虚拟主机的名字.queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 判定队列是否存在MSQueue existsQueue = memoryDataCenter.getQueue(queueName);if (existsQueue != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2. 创建队列对象MSQueue queue = new MSQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 写硬盘if (durable) {diskDataCenter.insertQueue(queue);}// 4. 写内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}/*** 4. 删除队列* @param queueName 队列名* @return*/public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 根据队列名字, 查询下当前的队列对象MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);}// 2. 删除硬盘数据if (queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}// 3. 删除内存数据memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);e.printStackTrace();return false;}}

1.4 绑定的创建和删除

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,不为空抛出异常
  • 3. 绑定对象为空: 1, 判断绑定的bindingKey是否合法. 2.合法就创建绑定对象,设置响应的绑定属性.
  • 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.
  • 5. 写入硬盘,再写内存
  • 6. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.

 这一步我们在Router进行设置一个方法,等下面更加详细的介绍router类.

/*** 5. 创建绑定* @param queueName 队列名字* @param exchangeName 交换机名字* @param bindingKey 绑定规则* @return*/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {// 1. 转换交换机和队列的名字queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){// 2. 判断交换机和队列是否已经绑定成功Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);if (existBinding != null){throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);}// 3. 验证bing中的bindingKey 是否合法if (!router.checkBindingKey(bindingKey)){throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 4. 创建绑定对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 5. 获取对应的交换机和队列,判断是否是存在的MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 5. 先写硬盘if (queue.isDurable() && exchange.isDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}return true;}} catch (MqException e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}

删除绑定 

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字
  • 2. 根据交换机和队列的名字得到绑定信息的对象,判断绑定是否为空,为空抛出异常
  • 3. 从硬盘进行删除,从内存进行删除
  • 4. 以上整个操作是对交换机和队列进行读写操作,为了保证线程安全,我们进行加锁操作.这里需要注意的是,我们对交换机和队列进行加锁的时候,顺序要和创建绑定的顺序是一致的.不然会出现死锁的现象.
/*** 6. 删除绑定* @param queueName 队列名* @param exchangeName 交换机名字* @return*/public boolean queueUnbind(String queueName, String exchangeName) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 获取 binding 看是否已经存在~Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}

1.5 发送消息到指定的队列/交换机

发布消息其实就是把消息发送到指定的交换机中,然后根据绑定关系发送到指定的队列

  • 1. 更改交换机和队列的名字: 交换机名字 = 虚拟主机的名字 + 交换机的名字  队列名字 = 虚拟主机的名字 + 队列的名字 
  • 2. 检查消息的routingKey是否合法,不合法抛出异常
  • 3. 根据传入的交换机的名字进行查找交换机对象,然后判断交换机的类型,而进行下一步的行为.
  • 4. 如果交换机类型为DIRECT,则表示为直接交换机,则把routingKey作为队列的名字,先进行根据传入的参数,创建消息对象,然后按照刚才组合好的队列名字进行查找队列,查找队列进行发送消息,没查找进行抛出异常.发送消息的时候判断消息是否是持久化的,是持久化就往硬盘中写入,否则只写内存就可以.发送完消息之后,要进行重要的操作.通知消费者进行消费消息.这一块是在管理消费者进行消费消息实现的.
  • 5. 如果交换机类型为Fanout 或者 Topic 我们需要在Router中进行设置相应的路由规则.
/*** 9. 发送消息到指定的队列/交换机*/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 4. 判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.//    如果是 fanout, 所有绑定的队列都要转发的.//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上. 根据此条消息时是否要进行持久化进行判断int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}

2. 实现路由规则Router

 这个类我们实现具体的路由转发规则,对之前还没实现的方法进行实现.还未实现的方法具体如下:

1. 在创建绑定的时候我们对bindingKey进行验证是否合法checkBindingKey();

2. 在往交换机进行发送消息的时候,我们对消息的routingKey进行验证\checkRoutingKey();

3. 当消息插入到交换机之后,根据交换机的主题往队列中分发消息的时候.对不同主题的交换机实现不同的路由规则route();

以上是我们在虚拟主机类中还没有进行实现的方法.下面进行一一实现:

2.1 checkBindingKey()

 以下是我们合法的BindingKey的规则

/*** 验证bindingKey是否是合法的*     1. 数字, 字母, 下划线*     2. 使用 . 分割成若干部分*     3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.* @return*/public boolean checkBindingKey(String bindingKey){if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

2.2 checkRoutingKey()

验证routingKey是合法的.routingKey是与BindingKey进行匹配的,所以必须是具体的.        

 

/*** 验证routingKey是否是合法的*      1. 数字, 字母, 下划线*      2. 使用 . 分割成若干部分* @return*/public boolean checkRoutingKey(String routingKey){if (routingKey.length() == 0){// 空字符串,合法的情况  当交换机的类型为fanout的时候,是不需要的,所以可以设置为""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}

2.3 route()

判断交换机的类型进而得出是否可以进行给队列进行转发消息.

1. 交换机的类型为fanout.代表给交换机进行绑定的所有队列进行转发消息.

2. 交换机的类型为Topic,需要对routingKey进行判断.进而设置给队列转发消息

/***  判断是否可以给绑定的交换机进行转发消息* @return*/public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {if (type == ExchangeType.FANOUT){// 如果交换机类型为 fan-out 就直接进行返回true,表示转发给当前当前绑定的所有对列return true;}else if(type == ExchangeType.TOPIC){// 如果是主题交换机,规则就比较复杂return routerTopic(binding,message);}else {throw new MqException("[Router] 交换机类型有误 exchangeType=" + type);}}

 对于主题交换机,我们进行详细的讲解.

  • 1. 将bindingKey 和 routingKey 进行按照"."进行分割成字符串数组
  • 2. 定义下标进行遍历数组
  • 3. 遍历两个数组,主要分为5种情况.
    • 3.1  当bindingKey遇到*号时直接跳过*,两个下标都进行自增1
    • 3.2 当bindingKey遇到#号,如果此时#号是bindingKey的最后一位,那么直接返回true
    • 3.3 当bindingKey遇到#号,如果此时#号不是最后一位,就去匹配#号下一位在routingKey的部分,匹配到了就将routingIndex指到匹配的位置,进而在进行上述循环,如果没匹配到就返回false
    • 3.4 此时没有遇见通配符,所有的内容部都要进行匹配上,匹配不上就返回false
    • 3.5 最后判断此时两个数组的下标是否都比较到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的
/*** 用来实现:topic类型的交换机的转发规则* @param binding  绑定信息对象* @param message  消息对象* @return*/private boolean routerTopic(Binding binding, Message message) {// 1. 将bindingKey 和 routingKey 进行按照"."进行分割String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 2. 定义用来遍历数组的下标int bindingIndex = 0;int routingIndex = 0;// 3. 进行遍历两个数组while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){if (bindingTokens[bindingIndex].equals("*")){// (1.)遇到*号两个下标直接跳过 * 可以匹配一个部分bindingIndex++;routingIndex++;}else if (bindingTokens[bindingIndex].equals("#")){bindingIndex += 1;// (2.)遇到#号   # 可以匹配多个部分if (bindingIndex == bindingTokens.length){// (3.)当遇到#号,#号的下标为最后一个元素的时候,直接返回true,因为可以直接匹配后面所有的内容return true;}else {// (4.)当遇到#号,后面后还有内容的时候,就去匹配#号下一个部分在routingKey的部分,// 匹配了就直接将bindingIndex指到bindingTokens下一个部分,同时将routingIndex指到匹配的地方// 没匹配配到就返回falseroutingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);if (routingIndex == -1){return false;}bindingIndex++;routingIndex++;}}else {// (5.)此时没有遇见通配符,所有的内容部都要进行匹配上if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}// (6.)最后判断此时两个数组的下标是否都比较到了末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}/*** 给定起始下标去在一个数组中寻找指定数组元素,找到就返回该元素在数组的下标,没找到就返回-1;* @param routingIndex   起始下标* @param routingTokens  目标数组* @param bindingToken   目标元素* @return*/private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)){return i;}}return -1;}

以上就是整个Router的所有方法.我们对上述代码进行单元测试.

2.4 单元测试

 

 

package com.example.demo.mqserver.core;import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:测试交换机的转发规则(交换机类型为topic)* User: YAO* Date: 2023-08-01* Time: 13:56*/
@SpringBootTest
class RouterTest {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}/***  [测试用例]*      binding key          routing key         result*      aaa                  aaa                 true*      aaa.bbb              aaa.bbb             true*      aaa.bbb              aaa.bbb.ccc         false*      aaa.bbb              aaa.ccc             false*      aaa.bbb.ccc          aaa.bbb.ccc         true*      aaa.*                aaa.bbb             true*      aaa.*.bbb            aaa.bbb.ccc         false*      *.aaa.bbb            aaa.bbb             false*      #                    aaa.bbb.ccc         true*      aaa.#                aaa.bbb             true*      aaa.#                aaa.bbb.ccc         true*      aaa.#.ccc            aaa.ccc             true*      aaa.#.ccc            aaa.bbb.ccc         true*      aaa.#.ccc            aaa.aaa.bbb.ccc     true*      #.ccc                ccc                 true*      #.ccc                aaa.bbb.ccc         true*/@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}

 单元测试通过.

3. 订阅消息

        在我们的虚拟主机中进行添加方法完成消息的订阅.要想完成消息的订阅,就需要在消息队列中新建一个列表consumerEnvList用来存储消费者的信息,当有消息进行存储到队列的时候,此时选出消费者进行消费消息.而消费者消费信息的这个环境需要单独定义一个类ConsumerEnv进行描述.以上这个消费信息的过程我们定义一个类ConsumerManager进行管理这些逻辑.

3.1 添加一个订阅者

给队列添加消费者,当队列接收到消息的时候,就要将消息推送给订阅者

public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

此处插入的参数Consumer相当于一个回调函数,就是一个函数式接口.我们在common中进行定义Consumer

@FunctionalInterface
public interface Consumer {// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}

定义这个回调函数表示:收到消息之后要对消息进行处理.

3.2 创建订阅者管理类ConsumerManager

1.  这个类是和虚拟主机是一一对应的,每个虚拟主机都有一个管理消费者的对象,而管理的消费者的对象对应的是与之对应的.

2. 我们采用一个堵塞队列来记录收到消息的的队列名字,每次队列收到消息,就会往这个队列中进行添加队列的名字,然后后续进行通知这个队列的消费者进行消费消息.

3. 单独使用一个线程池用来执行消息的回调.(主要是获取到消息之后,给响应设置消息的属性与消息本体发送给客户端.)

4. 我们设置一个扫描线程,从堵塞队列不断地取出元素,进而找到队列,在这个队列进行消费消息,并且设置扫描线程为后台线程,这样就不会阻止进程的结束.

public class ConsumerManager {// 1. 持有虚拟主机对象的引用,用来操作数据private VirtualHost parent;// 2. 指定一个线程池,负责执行具体的回调任务private ExecutorService workPool = Executors.newCachedThreadPool();// 3. 存放令牌的队列,存放接收到消息的队列名字(堵塞队列)// 当这个堵塞队列一接收到队列的名字,扫描线程就会就会找到虚拟主机,然后找到这个队列,进而消费消息private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();// 4. 扫描线程  (关注令牌队列中添加了哪些队列的名字,就知道哪些队列添加了消息,取出消息,进而交给线程池,进行消费这些消息)private Thread scannerThread = null;
}

1. 给堵塞队列设置接口,供虚拟主机进行调用.

/*** 1. 收到消息,通知消费者进行消费消息(将消息对应的队列名字添加到堵塞队列中)*/public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

2. 实现扫描线程

public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.// 后台线程不会影响进程的结束scannerThread.setDaemon(true);scannerThread.start();}

3. 添加消费者环境ConsumerEnv到指定的队列

我们在common中实现这个类

@Data
public class ConsumerEnv {// 1. 消费者的身份标识private String consumerTag;// 2. 消费者消费队列的名字private String queueName;// 3. 是否自动应答private boolean autoAck;// 4. 通过这个回调函数来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

(1) 按照指定的队列名找到这个类.

(2) 创建消费者环境对象,进行添加,同时如果这个队列的消息存在,就需要进行消费这些信息,调用consumeMessage()方法传入队列的名字.

/*** 2. 新增Consumer对象到指定的对列*/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

4. 消费消息 consumeMessage()

(1) 因为一个队列中可能会有多个消费者,我们按照轮询的方式进行挑选消费者进行消费消息,在队列的类中,设置方法chooseConsumer()

/*** 挑选订阅者 进行消费队列中的消息 (轮询的方式)* @return*/public ConsumerEnv chooseConsumer(){// 1. 如果当前队列对应的消费者的数量为0,直接返回null,表示没有筛选到消费者if (consumerEnvList.size() == 0){return null;}// 2. 使用当前订阅到的下标进行对消费者列表取模,然后进行挑选消费者记性消费消息,实现消息的轮询消费int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}

(2) 从队列中取出消息

(3) 把消息带入到回调方法,交给线程池进行执行

/*** 消费者进行消费信息* @param queue*/private void consumeMessage(MSQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());System.out.println(message);if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 此时是自动应答,表示直接删除// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageID());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

3.3 订阅消息小结

 4. 消息确认 basicAck()

 此处是消费者在回调函数中对消息进行处理之后再回调函数中执行的.

  • 1. 获取要删除消息以及所在队列的对象
  • 2. 删除硬盘和内存的数据
  • 3. 删除未确认消息集合的数据
/*** 消费者消费完消息进行手动应答* @return*/public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {// 1. 获取要删除消息以及所在队列的对象Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 确认的信息不存在 messageId="+messageId);}MSQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null){throw new MqException("[VirtualHost] 确认的队列不存在 queueName="+queueName);}// 2// 1.)删除硬盘中的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}// 2.) 删除消息中心的消息memoryDataCenter.removeMessage(message.getMessageID());// 3.) 删除委未确认消息集合的消息memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());System.out.println("[VirtualHost] basicAck成功 消息被确认成功  queueName=" + queueName+ ",messageId:." + messageId);return true;} catch (MqException | ClassNotFoundException | IOException e) {e.printStackTrace();System.out.println("[VirtualHost] basicAck失败 消息被确认失败  queueName=" + queueName+ ",messageId:." + messageId);return false;}}

至此以上就是VirtualHost的全部内容,内容很多,很繁琐需要,静下心来仔细的体会.

5. VirtualHost单元测试

 

package com.example.demo.mqserver;import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description:虚拟主机的操作测试* User: YAO* Date: 2023-08-01* Time: 18:26*/
class VirtualHostTest {@Autowiredpublic VirtualHost  virtualHost = null;@BeforeEachvoid setUp() {DemoApplication.context = SpringApplication.run(DemoApplication.class);// 创建好虚拟主机对象virtualHost = new VirtualHost("default");}@AfterEachvoid tearDown() throws IOException {DemoApplication.context.close();//把硬盘的目录进行删除File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testvoid exchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);}@Testvoid exchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@Testvoid queueDeclare() {boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);}@Testvoid queueDelete() {boolean ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@Testvoid queueBind() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");Assertions.assertTrue(ok);}@Testvoid queueUnbind() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.queueBind("testQueue","testExchange","testBindingKey");ok = virtualHost.queueUnbind("testQueue","testExchange");Assertions.assertTrue(ok);}@Testvoid basicPublish() {boolean ok = virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);ok = virtualHost.queueDeclare("testQueue",true,false,false,null);ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));Assertions.assertTrue(ok);}/*** 1. 先订阅, 后发布消息*/@Testpublic void testBasicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}/***  先发送消息, 后订阅队列.*/@Testpublic void testBasicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {// 创建一个交换机,并且绑定两个队列boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 发布消息发到交换机ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {// 1. 创建交换机(主题交换机)boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);// 2. 创建队列ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);// 3. 将交换机和队列进行绑定(设置bindingKey)ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);// 4. 发布消息(设置routingKey)ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);// 5. 订阅消息ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

结语

        本文将整个VirtualHost进行了实现,实现了供BrokerServer调用的API.基础的消息队列框架已经搭建好了,接下来就是搭建服务器和客户端了.请持续关注,谢谢!!!

完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/63442.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

23款奔驰AMG GT50升级原厂香氛负离子系统,清香宜人,久闻不腻

奔驰原厂香氛合理性可通过车内空气调节组件营造芳香四溢的怡人氛围。通过更换手套箱内香氛喷雾发生器所用的香水瓶&#xff0c;可轻松选择其他香氛。香氛的浓度和持续时间可调。淡雅的香氛缓缓喷出&#xff0c;并且在关闭后能够立刻散去。车内气味不会永久改变&#xff0c;香氛…

蓝桥杯-统计子矩阵

统计子矩阵 题目链接 思路&#xff1a; 使用前缀和滑动窗口 &#xff0c;可以先计算出纵向或横向的前缀和&#xff0c;matrix[i][j]表示前i行第j列之和 然后遍历上边界top和下边界buttom&#xff0c;再这个上下边界内使用滑动窗口&#xff0c;由于前面维护了纵向前缀和&…

Linux网络协议和管理

Linux网络协议和管理 一.网络设备基本知识 图1-网络设备基本知识 二.TCP/IP协议栈简介 1.概述 网络协议通常工作在不同的层中&#xff0c;每一层分别负责不同的通信功能。一个协议族&#xff0c; 比如T C P / I P&#xff0c;是一组不同层次上的多个协议的组合。T C P / I P通…

什么是Netty

介绍 第一&#xff1a;Netty 是一个 基于 NIO 模型的高性能网络通信框架&#xff0c;其实可以认为它是 对 NIO 网络模型的封装&#xff0c;提供了简单易用的 API&#xff0c;我们可以利用这些封装好的 API 快速开发自己的网络程序。 第二&#xff1a;Netty 在 NIO 的基础上做了…

安卓13不再支持PPTP怎么办?新的连接解决方案分享

随着Android 13的发布&#xff0c;我们迎来了一个令人兴奋的新品时刻。然而&#xff0c;对于一些用户而言&#xff0c;这也意味着必须面对一个重要的问题&#xff1a;Android 13不再支持PPTP协议。如果你是一个习惯使用PPTP协议来连接换地址的用户&#xff0c;那么你可能需要重…

微信小程序中键盘弹起输入框自动跳到键盘上方处理

效果展示 键盘未弹起时 键盘弹起后&#xff1a; 实现方式 话就不多说了 我直接贴代码了 原理就是用你点击的输入框的底部 距离顶部的位置 减去屏幕高度除以2&#xff0c;然后设成负值&#xff0c;再将这个值给到最外层相对定位的盒子的top属性&#xff0c;这样就不会出现顶…

【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)

背景介绍 目前对于一些非核心操作&#xff0c;如增减库存后保存操作日志发送异步消息时&#xff08;具体业务流程&#xff09;&#xff0c;一旦出现MQ服务异常时&#xff0c;会导致接口响应超时&#xff0c;因此可以考虑对非核心操作引入服务降级、服务隔离。 Hystrix说明 官方…

使用 Python 和 Flask 构建简单的 Restful API 第 1 部分

一、说明 我将把这个系列分成 3 或 4 篇文章。在本系列的最后&#xff0c;您将了解使用flask构建 restful API 是多么容易。在本文中&#xff0c;我们将设置环境并创建将显示“Hello World”的终结点。 我假设你的电脑上安装了python 2.7和pip。我已经在python 2.7上测试了本文…

C语言属刷题训练【第八天】

文章目录 &#x1fa97;1、如下程序的运行结果是&#xff08; &#xff09;&#x1f4bb;2、若有定义&#xff1a; int a[2][3]; &#xff0c;以下选项中对 a 数组元素正确引用的是&#xff08; &#xff09;&#x1f9ff;3、在下面的字符数组定义中&#xff0c;哪一个有语法错…

大厂容器云实践之路(一)

1-华为CCE容器云实践 华为企业云 | CCE容器引擎实践 ——从IaaS到PaaS到容器集群 容器部署时代的来临 IaaS服务如日中天 2014-2015年&#xff0c;大家都在安逸的使用IaaS服务&#xff1b; 亚马逊AWS的部署能力方面比所有竞争对手…

从零构建深度学习推理框架-7 计算图的表达式

什么是表达式 表达式就是一个计算过程,类似于如下&#xff1a; output_mid input1 input2 output output_mid * input3用图形来表达就是这样的。 但是在PNNX的表达式&#xff08;Experssion Layer&#xff09;中不是这个样子&#xff0c;而是以一种抽象得方式&#xff0c;…

Active Directory安全和风险状况管理

风险评估和管理 风险评估和管理是主动安全性和合规性管理不可或缺的一部分。 发现关键基础设施组件中的风险行为和配置对于阻止网络入侵和预防网络攻击至关重要。帐户泄露和配置错误漏洞是用于破坏网络的常见技术。当评估、监控和降低 Active Directory 基础架构的风险时&…