项目实战--Message Queue

一. 概念篇

我们在学习多线程的时候,学习过生产者-消费者模型,为了实现解耦合和削峰填谷,引入了阻塞队列.

在实际的后端开发中,跨主机之间使用生产者消费者模型,也是非常普遍的需求,因此,阻塞队列会被封装成一个独立的服务器程序,实现更丰富的功能.这样的程序称为"消息队列" .

市面上成熟的消息队列非常多,有Kafka,RabbitMQ,RocketMQ,ActiveMQ...接下来我们要实现的MQ就是以RabbitMQ为蓝图的.

在实现MQ之前,我们先来认识几个核心概念.

1.1 核心概念

1. 生产者(Producer) : 负责生产消息

2. 消费者(Consumer) : 负责消费消息

3. 中间人(Broker) : 消息队列

4. 发布(Publish) : 生产者生产消息并存放到消息队列中

5. 订阅(Subscribe) : 消费者预订某个Queue的消息,不是把消息从Broker中取出来,也不是消费消息

1.2 有关Broker的概念

1. 虚拟机(VirtualHost) :类似于MySQL中的database,是一个逻辑上的集合,一个Broker上可以存在多个虚拟机

2. 交换机(Exchange) : 生产者先把消息发送到Broker的Exchange上,再由Exchange根据不同的规则转发给不同的Queue

3. 队列(Queue) : 真正存储消息的实体,每个消费者决定自己从哪个Queue上读取消息

4. 绑定(Binding) : Exchange 和 Queue之间的关联关系.Exchange 和 Queue之间可以理解成"多对多"的关系,每对关系可以用一个Binding来表示.

5. 消息(Message) ; 传递的内容

1.3 交换机类型

刚才我们提到,Exchange和Queue之间是"多对多"的关系, 其实是由交换机的类型决定的.

以RabbitMQ为例,交换机主要支持4种类型:

  • Direct , 直接交换机,生产者发送消息时,直接指定要发送到哪个队列上;交换机收到消息后,如果绑定了该队列,就会把消息发过去,如果没有就丢弃该消息. 
  • Fanout , 扇出交换机,交换机收到生产者发送的消息,会将该消息发送到所有绑定的队列上.
  • Topic ,主题交换机,队列绑定到交换机上时,指定一个字符串为BindingKey,消息被发送到交换机上时,指定一个字符串为RoutingKey,如果这两个字符串满足一定的匹配条件,则交换机将该消息投递到相应队列.
  • Header,这种交换机的类型比较复杂,也比较难实现,我们就不做过多介绍了.

1.4 Broker的核心API

在我们实现的消息队列项目中,Broker是至关重要的,它要实现下列几个核心API,供生产者/消费者调用.

1. 创建队列(queueDeclare),这里的"创建"指的是没有则创建,有就啥也不干

2. 删除队列(queueDelete)

3. 创建交换机(exchangeDeclare),同上queueDeclare

4. 删除交换机(exchangeDelete)

5. 创建绑定(queueBind)

6. 解除绑定(queueUnbind)

7. 发布消息(basicPublish)

8. 订阅消息(basicConsume)

9. 确认消息(basicAck),消费者取走消息并不代表消费者处理了消息,因此需要让消费者告诉Broker,保证处理消息没有遗漏,在收到确认之前,Broker需要保存未确认的消息一段时间.

有的童靴可能会问,我们刚才提到的概念中,有"消费"这一概念,为啥没有提供相应的API呢?

很简单,因为RabbitMQ没有支持...

如果没有提供消费的API,Broker和Consumer的工作模式有两种:

1. Push,Broker主动把message发送给消费者,这种模式下MQ削峰填谷的作用不大(RabbitMQ支持)

2. Pull,消费者主动获取数据(这种模式下,message的时效性不高)

由于我们是以RabbitMQ为蓝图的,因此我们使用的也是第一种模式.

 1.5 网络通信

和阻塞队列不同,MessageQueue是为不同主机服务的.生产者和消费者都是客户端程序,Broker作为服务器,三者之间通过网络进行通信.

在网络通信的过程中,客户端要提供相应的API,来实现对服务器的操作.

也就是说,客户端的API只负责向服务器发送请求并接收服务器的响应,真正的业务由服务器端的API来做.这种远程调用的方式称为"RPC".

因此,客户端的API除了包含服务器提供的API外,还要有网络通信相关的API

1. 创建Connection,一个Connection对象代表一个TCP连接

2. 关闭Connection

3. 创建Channel,建立/销毁一个连接的成本比较高,Channel只是一个逻辑上的概念,一个Connection可以有多个Channel.

4. 关闭Channel

关于Connection和Channel,可以用下面这个栗子解释...

如果你到医院打针,要输三瓶药水,护士肯定只在你的胳膊上扎一次针,这就相当于建立了一个Connection,然后每换一瓶药水,不会重新扎针,而是用原来的针头,这就是一个Channel的创建.

5. 创建队列

6. 关闭队列

7. 创建交换机

8. 删除交换机

9. 创建绑定

10. 解除绑定

11. 发布消息

12. 订阅消息

13. 确认消息

1.6 消息应答

应答模式分为两种.

1. 自动应答: 消费者消费了消息,就默认应答完毕,Broker直接将该消息删除.

2. 手动应答: 消费者手动调用应答接口,Broker收到应答请求之后,才会将该消息彻底删除.

对于一些不太重要的消息,可以采用第一种方式.RabbitMQ支持了两种应答方式,因此我们的项目也会实现这两种模式.

1.7 模块划分

上面的一堆概念,想必童靴们听懂了但是没办法串联到一起,没关系,可以借助下面一张图来理解.

上面的图示中有几个问题需要解释: 

  • 序列化是指将结构化数据转换成无结构的二进制数据,反序列化是指将二进制数据转换成结构化数据.
  • 我们要实现的消息队列是一个通用业务,适合所有的生产者/消费者模型,因此网络通信部分由客户端来实现.
  • MessageQueue使用内存作为存储空间,硬盘只是辅助存储,用于持久化的时候使用.
  • MetaData是指元数据,交换机/队列/绑定等对象需要进行增删改查等操作,由数据库来管理比较合适
  • message作为消息主体,数量比较多,如果用数据库来存储,进行增加/删除的时候比较慢,并且消息不需要进行复杂的查找操作,因此使用文件进行管理.
  • message的删除只是逻辑删除,(如果是物理删除的话,需要进行文件上的数据移动,速度比较慢),如果有效消息比较少,就需要进行GC操作

二. 硬盘操作

想必童靴们看完上面的概念之后还是很懵逼,下面我们把上面的模块拆开来实现.

2.1 数据库操作

我们先创建一个MetaDataManager类来封装对数据库的操作.在前面提到过,数据库是用来管理交换机,队列和绑定的,它们统称为"meta data".下面是MetaDataManager类提供的接口.

  •   createExchange: 创建交换机
  •   deleteExchange: 删除交换机
  •   selectAllExchanges: 查询所有交换机,因为Broker Server的存储分为两部分,内存和硬盘,重启服务器时需要从硬盘恢复数据,用到了该方法.
  •   createQueue: 创建队列
  •   deleteQueue: 删除队列
  •   selectAllQueues: 查询全部队列
  •   createBinding: 创建绑定
  •   deleteBinding: 删除绑定
  •   selectAllBindings: 查询全部绑定
  •   createTables: 创建exchange表,queue表,binding表
  •   createDefaultData: RabbitMQ本身就存在一个交换机,类型是直接交换机,名字是"",可持久化,因此我们也实现一个匿名交换机.
  •   init: 封装createTables和createDefaultData方法
  •   deleteDB: 删除这个数据库的相关文件,便于测试数据的时候使用,防止每次测试时数据的污染.
  •  checkDBExists: 方便测试

2.1.1 SQLite数据库配置

对于Exchange,Queue,Binding,我们使用SQLite数据库持久化保存.

相比于MySQL,SQLite更轻量,只需要一个.exe文件即可运行.因为我们创建的是SpringBoot项目,只需要引入相关依赖,这个项目就会自动加载jar包和dll动态库文件,就能直接运行SQLite数据库了.

另外,因为我们要使用MyBatis来操作数据库,所以在pom.xml中也要引入mybatis依赖

下面是application.yml的相关配置.

spring:datasource:#表示将数据库建在data目录下的meta.db文件中,基准目录是该项目所在的目录url: jdbc:sqlite:./data/meta.db # SQLite不需要用户名和密码,因为SQLite不是一个C/S结构的数据库,它只在本地运行,所以不需要验证username:password:# 数据库的驱动类driver-class-name: org.sqlite.JDBC#mybatis的起手配置
mybatis:mapper-locations: classpath:mapper/*Mapper.xml configuration:#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #此处不打印对数据库操作的日志了,测试的时候可以打印一下map-underscore-to-camel-case: true

 2.1.2 Meta相关类的创建

先来创建Queue,Exchange,Binding类.

@Data
public class QueueEntity {//队列的唯一标识private String name;//是否可持久化private boolean durable=false;//如果没有消费者订阅这个队列,是否自动删除private boolean autoDelete=false;//是否为某个消费者私有private boolean exclusive=false;//额外参数private Map<String,Object> arguments=new HashMap<>();
}
@Data
public class Exchange {//交换机的唯一标识private String name;//交换机类型,默认是直接交换机private ExchangeType type=ExchangeType.DIRECT;//是否持久化private boolean durable=false;//如果没有队列绑定这个交换机,是否自动删除该交换机(暂时不实现)private boolean autoDelete=false;//额外参数private Map<String,Object> arguments=new HashMap<>();
}/*** 定义交换机类型,1表示直接交换机,2表示扇出交换机,3表示主题交换机*/
public enum ExchangeType{DIRECT(1),FANOUT(2),TOPIC(3);private final int type;ExchangeType(int type){this.type=type;}public int getType() {return type;}
}
@Data
public class Binding {private String exchangeName;private String queueName;//交换机根据bindingKey确定要将消息转发给哪些队列private String bindingKey;
}

 然后我们定义MetaMapper类进行数据库的ACUD操作.

@Mapper
public interface MetaMapper {//创建相关的表//因为涉及到服务器的重启操作,每次都要重新进行建库建表比较麻烦,所以调用代码来执行@Insert("create table if not exists exchange(" +"name varchar(50) primary key," +"type int," +"durable boolean," +"auto_delete boolean," +"arguments varchar(1024)"+")")void createExchangeTable();@Insert("create table if not exists queue(" +"name varchar(50) primary key," +"durable boolean,"+"auto_delete boolean," +"exclusive boolean," +"arguments varchar(1024)" +")")void createQueueTable();@Insert("create table if not exists binding(" +"exchangeName varchar(50)," +"queueName varchar(50)," +"binding_key varchar(100)" +")")void createBindingTable();@Insert("insert into exchange values" +"(#{name},#{type},#{durable},#{autoDelete},#{arguments})")void insertExchange(Exchange exchange);@Delete("delete from exchange where name=#{name}")void deleteExchange(String name);@Insert("insert into queue values" +"(#{name},#{durable},#{autoDelete},#{exclusive},#{arguments})")void insertQueue(QueueEntity queue);@Delete("delete from queue where name=#{name}")void deleteQueue(String name);@Insert("insert into binding values" +"(#{exchangeName},#{queueName},#{bindingKey})")void insertBinding(Binding binding);@Delete("delete from binding where exchangeName=#{exchangeName} and queueName=#{queueName}")void deleteBinding(Binding binding);@Select("select * from exchange")List<Exchange> selectAllExchanges();@Select("select * from queue")List<QueueEntity> selectAllQueues();@Select("select * from binding")List<Binding> selectAllBindings();
}

因为Queue和Mapper中的Arguments是一个Map对象,没办法直接插入到数据库中,只能先转换成字符串.所以需要多写两个这两个类的get和set方法.

有些同学可能会疑惑,我们使用了@Data注解,自己再创建一个get和set,不会冲突吗?

如果我们自己创建了get和set,Lombok就不会再帮助我们生成了

//在Exchange和Queue类中添加如下方法   @SneakyThrowspublic void setArguments(String jsonString){ObjectMapper mapper=new ObjectMapper();this.arguments=mapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {});}@SneakyThrowspublic String getArguments(){ObjectMapper mapper=new ObjectMapper();return mapper.writeValueAsString(arguments);}/*** 为了方便测试*/public void setArguments(String key,Object value){arguments.put(key,value);}public Object getArguments(String key){return arguments.get(key);}/*** 上层调用时可能会传入一个真正的Map作为额外参数*/public void setArguments(Map<String,Object> arguments){this.arguments=arguments;}

接下来就可以创建MetaManager类来进行对MetaMapper的封装了.

@Data
@Slf4j
public class MetaManager {private MetaMapper metaMapper;public void insertExchange(Exchange exchange){metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName){metaMapper.deleteExchange(exchangeName);}public void insertQueue(QueueEntity queue){metaMapper.insertQueue(queue);}public void deleteQueue(String queueName){metaMapper.deleteQueue(queueName);}public void insertBinding(Binding binding){metaMapper.insertBinding(binding);}public void deleteBinding(Binding binding){metaMapper.deleteBinding(binding);}public List<Exchange> selectAllExchanges(){return metaMapper.selectAllExchanges();}public List<QueueEntity> selectAllQueues(){return metaMapper.selectAllQueues();}public List<Binding> selectAllBindings(){return metaMapper.selectAllBindings();}/*** 进行建库建表操作* 1.建库,MyBatis进行数据库的操作的时候会自动建库* 2.建表,需要手动完成*/public void init(){//因为我们不需要将MetaManager交给Spring管理,但是如果想获取到MetaMapper对象就必须从Spring中取,所以采用下面的方式this.metaMapper= DemoApplication.context.getBean(MetaMapper.class);//如果数据库已经存在,就不需要再次建立if(checkDBExists()){log.info("数据库已经存在");}else{//数据库不存在,执行建表操作createTables();//进行数据的初始化createDefaultData();log.info("数据库初始化完成");}}/***创建一个默认交换机,name="",type=DIRECT,durable=true*/private void createDefaultData() {Exchange exchange=new Exchange();exchange.setName("");exchange.setDurable(true);exchange.setType(ExchangeType.DIRECT);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);}private void createTables() {metaMapper.createQueueTable();metaMapper.createExchangeTable();metaMapper.createBindingTable();}/*** 判断数据库是否存在,就看meta.db是否存在即可* @return*/private boolean checkDBExists() {File file=new File("./data/meta.db");return file.exists();}/*** 每次测试数据之后进行删库操作,防止数据污染*/public void deleteDb(){File file=new File("./data/meta.db");file.delete();}
}

因为MetaManager在初始化的时候需要用到DemoApplication类,所以对这个类的代码稍加修改.

@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) {context=SpringApplication.run(DemoApplication.class, args);}}

 下面是这些类的目录结构,可做参考.

2.1.3 测试MetaManager类

 单元测试是十分必要的,毕竟每个程序猿都不想自己一顿猛如虎的操作之后发现bug都有几千行!下面直接给出单元测试类的代码(仅供参考)

@SpringBootTest
class MetaManagerTest {private MetaManager metaManager=new MetaManager();@BeforeEachvoid setUp() {//因为metaManager初始化的时候需要用到DemoApplication.context,这里需要手动赋值DemoApplication.context= SpringApplication.run(DemoApplication.class);metaManager.init();}@Testvoid init() {//初始化完成后应该有一个默认交换机List<Exchange> exchanges=metaManager.selectAllExchanges();Assertions.assertEquals(1,exchanges.size());Exchange exchange=exchanges.get(0);Assertions.assertEquals("",exchange.getName());Assertions.assertEquals(ExchangeType.DIRECT,exchange.getType());Assertions.assertEquals(true,exchange.isDurable());Assertions.assertEquals(false,exchange.isAutoDelete());}@AfterEachvoid tearDown() {//因为MyBatis正在使用数据库,所以必须把Spring项目关掉之后才可以删除数据库文件DemoApplication.context.close();metaManager.deleteDb();}@Testvoid insertExchange() {Exchange expectedExchange=new Exchange();expectedExchange.setName("testExchange");expectedExchange.setType(ExchangeType.TOPIC);expectedExchange.setDurable(true);expectedExchange.setAutoDelete(false);expectedExchange.setArguments("aaa",1);metaManager.insertExchange(expectedExchange);List<Exchange> exchanges=metaManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());Exchange actualExchange=exchanges.get(1);Assertions.assertEquals("testExchange",actualExchange.getName());Assertions.assertTrue(actualExchange.isDurable());Assertions.assertFalse(actualExchange.isAutoDelete());Assertions.assertEquals(ExchangeType.TOPIC,actualExchange.getType());Assertions.assertEquals(1,actualExchange.getArguments("aaa"));}@Testvoid insertQueue() {QueueEntity expectedQueue=new QueueEntity();expectedQueue.setName("testQueue");expectedQueue.setDurable(false);expectedQueue.setExclusive(false);expectedQueue.setAutoDelete(false);expectedQueue.setArguments("bbb",2);metaManager.insertQueue(expectedQueue);List<QueueEntity> queues=metaManager.selectAllQueues();Assertions.assertEquals(1,queues.size());QueueEntity actualQueue=queues.get(0);Assertions.assertEquals("testQueue",actualQueue.getName());Assertions.assertFalse(actualQueue.isAutoDelete());Assertions.assertFalse(actualQueue.isDurable());Assertions.assertFalse(actualQueue.isExclusive());Assertions.assertEquals(2,actualQueue.getArguments("bbb"));}@Testvoid insertBinding() {Binding expectedBinding=new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("bindingKey");metaManager.insertBinding(expectedBinding);List<Binding> bindings=metaManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());Binding  actualBinding=bindings.get(0);Assertions.assertEquals("testQueue",actualBinding.getQueueName());Assertions.assertEquals("testExchange",actualBinding.getExchangeName());Assertions.assertEquals("bindingKey",actualBinding.getBindingKey());}@Testvoid deleteExchange() {Exchange expectedExchange=new Exchange();expectedExchange.setName("testExchange");expectedExchange.setType(ExchangeType.TOPIC);expectedExchange.setDurable(true);expectedExchange.setAutoDelete(false);expectedExchange.setArguments("aaa",1);metaManager.insertExchange(expectedExchange);List<Exchange> exchanges=metaManager.selectAllExchanges();Assertions.assertEquals(2,exchanges.size());metaManager.deleteExchange("testExchange");exchanges=metaManager.selectAllExchanges();Assertions.assertEquals(1,exchanges.size());}@Testvoid deleteQueue() {QueueEntity expectedQueue=new QueueEntity();expectedQueue.setName("testQueue");expectedQueue.setDurable(false);expectedQueue.setExclusive(false);expectedQueue.setAutoDelete(false);expectedQueue.setArguments("bbb",2);metaManager.insertQueue(expectedQueue);List<QueueEntity> queues=metaManager.selectAllQueues();Assertions.assertEquals(1,queues.size());metaManager.deleteQueue("testQueue");queues=metaManager.selectAllQueues();Assertions.assertEquals(0,queues.size());}@Testvoid deleteBinding() {Binding expectedBinding=new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("bindingKey");metaManager.insertBinding(expectedBinding);List<Binding> bindings=metaManager.selectAllBindings();Assertions.assertEquals(1,bindings.size());Binding binding=new Binding();binding.setExchangeName("testExchange");binding.setQueueName("testQueue");metaManager.deleteBinding(binding);bindings=metaManager.selectAllBindings();Assertions.assertEquals(0,bindings.size());}}

 2.2 文件操作

前面我们提到过,Message的持久化需要放进文件里,因为message的数量比较多,如果使用数据库来存储,性能会变慢; 并且message不需要复杂的增删改查,不需要用到数据库

2.2.1 Message相关类的创建

既然要操作的是Message,我们肯定要定义一个Message类了.

@Data
public class Message implements Serializable {//message的基本属性private BasicProperties basicProperties;//message存放的消息内容,因为涉及到网络传输和文件存储,使用字节数组存储比较合适private byte[] body;//辅助成员//表示在文件中的起始/终止位置,方便把该消息读取/存放到文件里//这两个属性没有必要存放在文件里,使用transient关键字,就不会被序列化private transient long offsetBegin;private transient long offsetEbd;//表示该消息是否有效,1表示有效,0表示无效private byte isValid=0x1;//因为消息的数目比较多,所以使用工厂方法,messageId由系统自动生成,以"M-"作为前缀public static Message createMessageWithId(String routingKey,BasicProperties properties,byte[] body){Message message=new Message();//一般properties传的参数都是nullif(properties!=null){message.setBasicProperties(properties);}message.setMessageId(generateMessageId());message.setRoutingKey(routingKey);message.setBody(body);return message;}private static String generateMessageId() {return "M-"+ UUID.randomUUID();}/*** 提供操作基本属性的接口,方便外界调用* @param messageId*/public void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getMessageId(){return basicProperties.getMessageId();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public boolean isDelivery() {return (basicProperties.getDelivery() == 0x2);}public void setDelivery(byte delivery){basicProperties.setDelivery(delivery);}
}
@Data
public class BasicProperties implements Serializable {//消息的唯一标识private String messageId;//交换机根据routingKey转发该消息private String routingKey;//是否持久化 1表示非持久化 2表示持久化private byte delivery=0x1;
}

 因为message是存放在Queue上的,所以我们需要为每个Queue创建一个目录目录名即为队列名,这个目录下有两个文件:

  1. queue_data.txt,用来存放这个队列的全部消息,以二进制的方式存储
  2. queue_stat.txt,用来统计这个队列的统计信息,方便进行GC操作

我们再来创建一个MessageFileManager类,用来操作message相关的文件.

 下面定义一下这个类要提供的接口

  1. readStat: 读取统计文件
  2. writeStat: 写入统计文件
  3. sendMessage: 向某个队列的data文件中写入消息
  4. deleteMessage: 删除某个队列的某条消息(逻辑删除)
  5. checkGC: 规定当消息总数超过1500,有效消息数低于50%时进行GC操作
  6. GC: 将该队列中的无效消息物理清空
  7. createQueueFiles: 创建队列对应的目录和文件
  8. deleteQueueFiles: 删除队列对应的目录和文件
  9. loadMessages: 从文件上获取某队列的全部有效消息,方便服务器重启后进行恢复操作 

 2.2.2 Queue对应文件规定

以队列为维度创建文件,每个队列都有一个目录,目录的名字为队列名,这个目录存放在data目录下,形如./data/testQeue,每个目录包含两个文件:

  • queue_data.txt: 二进制文件,用来存放消息的内容
  • queue_stat.txt: 文本文件,用来保存消息的统计信息

下面我们来规定一下这两种文件的格式.

现在看stat文件吧,它比较简单.

再来定义data文件

  • 它是二进制文件,文件后缀是啥无所谓,这是window用来区分文件格式的方式
  • 每个消息使用二进制的方式存储
  • 每个消息分成两部分,第一部分存储该消息的长度(以字节为单位),这一部分占4byte,第二部分存储该消息的内容
  • 消息之间首尾相连,没有分隔符

规定,当一个队列中的总消息数超过1500,且有效消息数低于50%时,就要进行GC操作,防止因为文件过大导致性能降低.

2.2.3 MessageFileManager类的创建

下面让我们一起来实现一下这个类,配合注释看更下饭哦~

@Data
@Slf4j
public class MessageFileManager {//定义统计文件对应的类public static class Stat{public int totalCount;public int validCount;}/*** MessageFileManager的初始化方法,实际上啥也没干,为了方便代码扩展*/public void init(){}/*** @param queueName* @return 返回队列对应的目录名*/private String getQueueDirPath(String queueName){return "./data/"+queueName;}/*** @param queueName* @return 返回队列对应的data文件名*/private String getQueueDataPath(String queueName){return getQueueDirPath(queueName)+"/queue_data.txt";}/*** @param queueName* @return 返回队列对应的统计文件名*/private String getQueueStatPath(String queueName){return getQueueDirPath(queueName)+"/queue_stat.txt";}/*** 创建队列对应的文件,因为如果有则不创建,所以不用加锁* @param queueName*/public void createQueueFiles(String queueName) throws IOException {File dir=new File(getQueueDirPath(queueName));if(!dir.exists()){boolean ok=dir.mkdirs();//有该目录则不创建,没有就创建if(!ok){throw new IOException("[MessageFileManager] 创建队列目录失败,queueName="+queueName);}}File dataFile=new File(getQueueDataPath(queueName));if(!dataFile.exists()){boolean ok=dataFile.createNewFile();if(!ok){throw new IOException("[MessageFileManager] 创建队列的data文件失败,queue_data.txt="+getQueueDataPath(queueName));}}File statFile=new File(getQueueStatPath(queueName));if(!statFile.exists()){boolean ok=statFile.createNewFile();if(!ok){throw new IOException("[MessageFileManager] 创建队列的stat文件失败,queue_stat.txt="+getQueueStatPath(queueName));}}//给统计文件写入初始数据,后续操作stat文件时不用对空文件进行判定Stat stat=new Stat();stat.totalCount=0;stat.validCount=0;writeStat(queueName,stat);}/*** 删除队列对应的文件,因为删除失败也没啥影响,所以就不加锁了*/public void deleteQueueFiles(String queueName) throws IOException {File dataFile=new File(getQueueDataPath(queueName));boolean ok1= dataFile.delete();File statFile=new File(getQueueStatPath(queueName));boolean ok2=statFile.delete();File dir= new File(getQueueDirPath(queueName));boolean ok3= dir.delete();if(!ok1||!ok2||!ok3){throw new IOException("[MessageFileManager] 删除队列文件失败,queueName="+queueName);}}/*** 对统计文件进行写操作* 本来对文件进行读写操作是要加锁的,但是外层调用者加了,这里就不加了* @param queueName* @param stat*/private void writeStat(String queueName,Stat stat) {try(OutputStream outputStream=new FileOutputStream(getQueueStatPath(queueName));PrintWriter writer=new PrintWriter(outputStream)){writer.write(stat.totalCount+"\t"+ stat.validCount);writer.flush();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}/*** 对统计文件进行读操作* @param queueName* @return*/private Stat readStat(String queueName){Stat stat=null;try(InputStream inputStream=new FileInputStream(getQueueStatPath(queueName));Scanner scanner=new Scanner(inputStream)) {stat=new Stat();stat.totalCount=scanner.nextInt();stat.validCount=scanner.nextInt();return stat;} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}return stat;}/*** 发送消息* 1. 得到消息的offsetBegin和offsetEnd* 2. 将该消息写到data文件上* 3. 更新stat文件*/public void sendMessage(QueueEntity queue, Message message) throws MqException, IOException {//涉及到多线程,要进行加锁synchronized (queue){//先检查这个队列对应的文件是否存在if(!isExists(queue.getName())){throw new MqException("[MessageFileManager] sendMessage,要发送的队列文件不存在");}File dataFile=new File(getQueueDataPath(queue.getName()));//先更新message的offsetBegin和offsetEndlong fileLength=dataFile.length();byte[] messageBinary= BinaryTool.toBytes(message);message.setOffsetBegin(fileLength+4);message.setOffsetEnd(fileLength+messageBinary.length+4);//再进行文件写入//outPutStream打开文件时会默认清空原有内容,需要将append参数设置为truetry(OutputStream outputStream=new FileOutputStream(dataFile,true);DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){//先写入长度,使用DataOutputStream类,固定写4bytedataOutputStream.writeInt(messageBinary.length);//再写二进制的messagedataOutputStream.write(messageBinary);}//更新stat文件Stat stat=readStat(queue.getName());stat.totalCount++;stat.validCount++;writeStat(queue.getName(),stat);}log.info("硬盘已成功添加message,queueName="+queue.getName()+",messageId="+message.getMessageId());}/*** 逻辑删除message: 1.将isValid字段置为0x0 2.把这个消息写回去 3.更新统计文件* @param queue* @param message 这个message是内存传过来的*/public void deleteMessage(QueueEntity queue,Message message) throws MqException, IOException, ClassNotFoundException {synchronized (queue){if(!isExists(queue.getName())){throw new MqException("[MessageFileManager] deleteMessage,消息所在的队列文件不存在");}//根据消息的offsetBegin和offsetEnd字段找到这条消息File dataFile=new File(getQueueDataPath(queue.getName()));//采用随机读取的方法,读取方式为写和读try(RandomAccessFile randomAccessFile=new RandomAccessFile(dataFile,"rw")){//先找到该消息的起始位置randomAccessFile.seek(message.getOffsetBegin());//将这个对象的isValid设置为0x0message.setIsValid((byte) 0x00);//将message转换成字节数组byte[] bodyDest=BinaryTool.toBytes(message);//将这个数组写入文件randomAccessFile.write(bodyDest);}Stat stat=readStat(queue.getName());stat.validCount--;writeStat(queue.getName(),stat);}log.info("已删除硬盘上的message,queueName="+queue.getName()+",messageId="+message.getMessageId());}/*** 判断某个队列对应的文件是否存在* @param queueName* @return*/private boolean isExists(String queueName) {File dir=new File(getQueueDirPath(queueName));if(!dir.isDirectory()){return false;}File dataFile=new File(getQueueDataPath(queueName));if(!dataFile.isFile()){return false;}File statFile=new File(getQueueStatPath(queueName));if(!statFile.isFile()){return false;}return true;}/*** 从硬盘上加载queue存放的全部Message,服务器重启(只有一个线程)/GC操作(GC方法已经加过锁了)会调用该方法,所以可以不用加锁* @param queueName* @return 返回LinkedList,方便进行头删*/public LinkedList<Message> loadMessages(String queueName) throws MqException, IOException, ClassNotFoundException {//synchronized (queue){if(!isExists(queueName)) {throw new MqException("[MessageFileManager] loadQueueMessages,要加载的队列文件不存在!");}LinkedList<Message> messages=new LinkedList<>();//记录当前message的位置long currentSet=0;try(InputStream inputStream=new FileInputStream(getQueueDataPath(queueName));DataInputStream dataInputStream=new DataInputStream(inputStream)){while(true){//先读取消息的长度int expectedLength=dataInputStream.readInt();byte[] body=new byte[expectedLength];int actualLength=dataInputStream.read(body);if(actualLength!=expectedLength){throw new MqException("[MessageFileManager] 消息存储的格式不正确!");}//转换成消息对象Message message=(Message) BinaryTool.fromBytes(body);if(message.getIsValid()==0x0){//这个消息是无效的currentSet+=4+body.length;continue;}//设置辅助属性currentSet+=4;message.setOffsetBegin(currentSet);currentSet+=actualLength;message.setOffsetEnd(currentSet);messages.add(message);}}catch (EOFException e){//dataInputStream读到文件结尾时,会抛出EOFException异常log.info("成功加载该队列的全部消息,queueName="+queueName);}return messages;//}}/*** 判断是否要进行GC操作,由上层调用,所以修饰符为public* total>1500&&valid<750时触发GC*/public boolean checkGC(String queueName){//根据统计文件判断Stat stat=readStat(queueName);if(stat.totalCount>1500&&stat.validCount*1.0 / stat.totalCount<0.5){return true;}else {return false;}}/*** 进行GC操作* 1. 创建一个新的data文件,queue_data_new.txt* 2. 读取旧文件的全部有效message* 3. 将message加载到新文件上* 4. 将新文件重命名* 5. 修改统计文件*/public void GC(QueueEntity queue) throws MqException, IOException, ClassNotFoundException {synchronized (queue){//统计一下GC耗时long start=System.currentTimeMillis();File newDataFile=new File(getQueueNewDataPath(queue.getName()));if(newDataFile.exists()){//可能上一次GC的时候没删掉,抛异常throw new MqException("[MessageFileManager] GC时新文件已存在,queue_data_new.txt="+getQueueNewDataPath(queue.getName()));}boolean ok=newDataFile.createNewFile();if(!ok){throw new IOException("[MessageFileManager] 无法创建新的data文件,queue_data_new,txt="+getQueueNewDataPath(queue.getName()));}//从旧文件中加载全部有效messageList<Message> messages=loadMessages(queue.getName());//将加载的message写到新文件中try(OutputStream outputStream=new FileOutputStream(getQueueNewDataPath(queue.getName()));DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){for(Message message:messages){byte[] body=BinaryTool.toBytes(message);dataOutputStream.writeInt(body.length);dataOutputStream.write(body);}}//让新文件取代旧文件File oldDataFile=new File(getQueueDataPath(queue.getName()));ok=oldDataFile.delete();if(!ok){throw new IOException("[MessageFileManager] 无法删除旧文件,queue_data.txt="+getQueueDataPath(queue.getName()));}ok=newDataFile.renameTo(oldDataFile);if(!ok){throw new IOException("[MessageFileManager] 新文件重命名失败,queueName="+queue.getName());}//更新统计文件Stat stat=new Stat();stat.totalCount= messages.size();stat.validCount= messages.size();readStat(queue.getName());long end=System.currentTimeMillis();log.info("GC耗时: "+(end-start)+"ms");}}private String getQueueNewDataPath(String queueName) {return getQueueDirPath(queueName)+"/queue_data_new.txt";}
}

2.2.4 测试MessageFileManager类

下面是单元测试的代码.(仅供参考,童靴们可以写更多的测试案例)

@SpringBootTest
class MessageFileManagerTest {private MessageFileManager fileManager=new MessageFileManager();private final String queueName1="testQueue1";private final String queueName2="testQueue2";@BeforeEachvoid setUp() throws IOException {//创建队列文件,队列名为testQueuefileManager.init();fileManager.createQueueFiles(queueName1);fileManager.createQueueFiles(queueName2);}@AfterEachvoid tearDown() throws IOException {//销毁队列对应的文件fileManager.deleteQueueFiles(queueName1);fileManager.deleteQueueFiles(queueName2);}@Testvoid createQueueFiles() {//检查两个队列的文件是否已经创建好//利用反射调用isExists方法Boolean ok1=ReflectionTestUtils.invokeMethod(fileManager,"isExists",queueName1);Boolean ok2=ReflectionTestUtils.invokeMethod(fileManager,"isExists",queueName2);Assertions.assertTrue(ok1);Assertions.assertTrue(ok2);}private QueueEntity createQueue(String queueName){QueueEntity queue=new QueueEntity();queue.setName(queueName);return queue;}private Message createMessage(String routingKey,String content){Message message=Message.createMessageWithId(routingKey,null,content.getBytes());return message;}@Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {QueueEntity queue=createQueue(queueName1);Message message=createMessage("testRoutingKey","hello");fileManager.sendMessage(queue,message);LinkedList<Message> messages=fileManager.loadMessages(queueName1);Assertions.assertEquals(1,messages.size());Message message1=messages.get(0);Assertions.assertEquals("testRoutingKey",message1.getRoutingKey());Assertions.assertEquals(message.getMessageId(),message1.getMessageId());Assertions.assertArrayEquals(message.getBody(),message1.getBody());}@Testvoid loadMessages() throws IOException, MqException, ClassNotFoundException {QueueEntity queue=createQueue(queueName1);List<Message> exceptedMessages=new ArrayList<>();for(int i=0;i<10;i++){Message message=createMessage("testRoutingKey","content"+i);exceptedMessages.add(message);fileManager.sendMessage(queue,message);}for(int i=0;i<10;i+=2){fileManager.deleteMessage(queue,exceptedMessages.get(i));}List<Message> actualMessages=fileManager.loadMessages(queueName1);for(int i=0;i<actualMessages.size();i++){Message exceptedMsg=exceptedMessages.get(i*2+1);Message actualMsg=actualMessages.get(i);Assertions.assertEquals(exceptedMsg.getMessageId(),actualMsg.getMessageId());Assertions.assertEquals(exceptedMsg.getRoutingKey(),actualMsg.getRoutingKey());Assertions.assertArrayEquals(exceptedMsg.getBody(),actualMsg.getBody());Assertions.assertEquals(exceptedMsg.getOffsetBegin(),actualMsg.getOffsetBegin());Assertions.assertEquals(exceptedMsg.getOffsetEnd(),actualMsg.getOffsetEnd());}}@Testvoid GC() throws IOException, MqException, ClassNotFoundException {//先插入几个messageQueueEntity queue=createQueue(queueName1);List<Message> messages=new ArrayList<>();for(int i=0;i<10;i++){Message message=createMessage("testRoutingKey","content"+i);fileManager.sendMessage(queue,message);messages.add(message);}//删除后五个messagefor(int i=5;i<10;i++){fileManager.deleteMessage(queue,messages.get(i));}//修改stat文件,因为是私有方法,这里使用反射MessageFileManager.Stat stat=new MessageFileManager.Stat();stat.totalCount=2000;stat.validCount=5;ReflectionTestUtils.invokeMethod(fileManager,"writeStat",queueName1,stat);assertTrue(fileManager.checkGC(queueName1));fileManager.GC(queue);List<Message> lastMessages=fileManager.loadMessages(queueName1);assertEquals(5,lastMessages.size());for(int i=0;i<5;i++){Message actualMsg=lastMessages.get(i);Message expectedMsg=messages.get(i);assertEquals(expectedMsg.getMessageId(),actualMsg.getMessageId());assertEquals(expectedMsg.getRoutingKey(),actualMsg.getRoutingKey());assertArrayEquals(expectedMsg.getBody(),actualMsg.getBody());assertEquals(expectedMsg.getOffsetBegin(),actualMsg.getOffsetBegin());assertEquals(expectedMsg.getOffsetEnd(),actualMsg.getOffsetEnd());}}
}

2.3 硬盘操作的封装

  为了便于虚拟主机调用硬盘操作上的方法,我们对硬盘上的操作进行封装,分为对数据库的操作和对文件的操作.

需要创建一个DiskManager类,下面是这个类的源码.因为MessageFileManager和MetaManager类我们之前已经检测过了,DiskManager类只是对这两个类封装了一层,所以就不进行单元测试了.

@Slf4j
public class DiskManager {private MessageFileManager messageFileManager=new MessageFileManager();private MetaManager metaManager=new MetaManager();public void init(){metaManager.init();messageFileManager.init();}//封装对交换机的操作public void insertExchange(Exchange exchange){metaManager.insertExchange(exchange);log.info("硬盘成功添加交换机,exchangeName="+exchange.getName());}public void deleteExchange(String exchangeName){metaManager.deleteExchange(exchangeName);log.info("硬盘成功删除交换机,exchangeName="+exchangeName);}public List<Exchange> selectAllExchanges(){return metaManager.selectAllExchanges();}//对队列的封装public void insertQueue(QueueEntity queue) throws IOException {metaManager.insertQueue(queue);log.info("硬盘成功添加队列,queueName"+queue.getName());//同时创建这个队列的文件messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {metaManager.deleteQueue(queueName);log.info("硬盘成功删除队列,queueName="+queueName);//同时删除这个队列的文件messageFileManager.deleteQueueFiles(queueName);}public List<QueueEntity> selectAllQueues(){return metaManager.selectAllQueues();}//对绑定的封装public void insertBinding(Binding binding){metaManager.insertBinding(binding);log.info("硬盘成功添加绑定,binding="+binding);}public void deleteBinding(Binding binding){metaManager.deleteBinding(binding);log.info("硬盘成功删除绑定,binding="+binding);}public List<Binding> selectAllBindings(){return metaManager.selectAllBindings();}//对message的封装public void sendMessage(QueueEntity queue,Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(QueueEntity queue,Message message) throws IOException, MqException, ClassNotFoundException {messageFileManager.deleteMessage(queue,message);//检查是否要进行GC操作if(messageFileManager.checkGC(queue.getName())){messageFileManager.GC(queue);}}public LinkedList<Message> loadAllMessages(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadMessages(queueName);}
}

三. 内存操作

3.1 MemoryManager类的实现

在概念篇中提到过,对于Broker Server来说,内存是主要的存储介质,硬盘只是辅助存储,用来进行数据恢复的.下面就创建一个MemoryManager类,用于管理内存中的数据.        

 下图是MemoryManager提供的API.

先来给出MemoryManager的成员变量,根据它们更容易理解为啥要提供这些API

public class MemoryManager {//考虑到线程安全,用到哈希表这个结构时使用ConcurrentMap//交换机的哈希表,key=exchangeNameprivate ConcurrentHashMap<String, Exchange> exchangesMap=new ConcurrentHashMap<>();//队列的哈希表,key=queueNameprivate ConcurrentHashMap<String, QueueEntity> queuesMap=new ConcurrentHashMap<>();//绑定的hash表,因为交换机需要根据routingKey和bindingKey之间的对应关系将message投递到指定队列,所以第一个Key=exchangeName,第二个key=queueNameprivate ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap=new ConcurrentHashMap<>();//message的hash表,因为消息是以队列为维度进行存储的,所以一个队列对应存储的一串消息,key=queueName//使用LinkedList,方便进行头删private ConcurrentHashMap<String, LinkedList<Message>> queueMessagesMap=new ConcurrentHashMap<>();//每个队列的消息被取出后,要等待客户端确认才能彻底删除,所以需要有一个数据结构专门存储未确认的消息//为了方便找到未确认的消息并进行删除,使用hash结构,第一个key是queueName,第二个key是messageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessagesWaitAckMap=new ConcurrentHashMap<>();//消息中心,用来存放全部的消息,key=messageIdprivate ConcurrentHashMap<String,Message> messagesCenter=new ConcurrentHashMap<>();
}

  •  insertExchange: 向内存中添加一个交换机
  • deleteExchange: 删除内存中的一个交换机
  • getExchange: 查询内存中的某个交换机
  • insertQueue: 向内存中插入一个队列
  • deleteQueue: 删除内存中的某个队列
  • getQueue: 查询内存中的某个队列
  • insertBinding: 插入一个绑定
  • deleteBinding: 删除一个绑定
  • getBinding: 查询一个绑定
  • insertMessage: 向消息中心插入一个消息
  • deleteMessage: 从消息中心删除一个消息
  • getMessage: 从消息中心查询一个消息
  • sendMessageToQueue: 向某个队列投递一条消息
  • getMessageFromQueue: 得到某个队列的第一条消息,并且从队列中删除该消息
  • loadDiskData: 从硬盘上恢复数据
  • insertMessageWaitAck: 向某个队列的未确认消息队列中插入一条数据
  • deleteMessageWaitAck: 某条消息收到确认后删除该消息
  • getMessageWaitAck: 查找某个队列的未确认消息

下面一起来实现这个类吧.

 该说的话都放在注释里了~~

@Data
@Slf4j
public class MemoryManager {//考虑到线程安全,用到哈希表这个结构时使用ConcurrentMap//交换机的哈希表,key=exchangeNameprivate ConcurrentHashMap<String, Exchange> exchangesMap=new ConcurrentHashMap<>();//队列的哈希表,key=queueNameprivate ConcurrentHashMap<String, QueueEntity> queuesMap=new ConcurrentHashMap<>();//绑定的hash表,因为交换机需要根据routingKey和bindingKey之间的对应关系将message投递到指定队列,所以第一个Key=exchangeName,第二个key=queueNameprivate ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap=new ConcurrentHashMap<>();//message的hash表,因为消息是以队列为维度进行存储的,所以一个队列对应存储的一串消息,key=queueName//使用LinkedList,方便进行头删private ConcurrentHashMap<String, LinkedList<Message>> queueMessagesMap=new ConcurrentHashMap<>();//每个队列的消息被取出后,要等待客户端确认才能彻底删除,所以需要有一个数据结构专门存储未确认的消息//为了方便找到未确认的消息并进行删除,使用hash结构,第一个key是queueName,第二个key是messageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queuesMessageWaitAckMap=new ConcurrentHashMap<>();//消息中心,用来存放全部的消息,key=messageIdprivate ConcurrentHashMap<String,Message> messagesCenter=new ConcurrentHashMap<>();//init方法啥也没干,为了方便扩展public void init(){}//queue的相关操作public void insertQueue(QueueEntity queue){queuesMap.put(queue.getName(),queue);log.info("内存成功插入Queue,queueName="+queue.getName());}public void deleteQueue(String queueName){queuesMap.remove(queueName);log.info("内存成功删除Queue,queueName="+queueName);}public QueueEntity getQueue(String queueName){return queuesMap.get(queueName);}//Exchange的相关操作public void insertExchange(Exchange exchange){exchangesMap.put(exchange.getName(),exchange);log.info("内存成功添加Exchange,exchangeName="+exchange.getName());}public void deleteExchange(String exchangeName){exchangesMap.remove(exchangeName);log.info("内存成功删除Exchange,exchangeName="+exchangeName);}public Exchange getExchange(String exchangeName){return exchangesMap.get(exchangeName);}//Binding的相关操作public void insertBinding(Binding binding) throws MqException {//先找到交换机的所有绑定的hash表//如果没有交换机没有绑定,创建一个ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.computeIfAbsent(binding.getExchangeName(),c->new ConcurrentHashMap<>());//查找这个绑定是否存在,如果存在,不能插入该绑定,因为如果是多线程插入绑定并且bindingKey不一样,就会覆盖//因为既涉及查询又涉及写操作,要加锁变成操作synchronized (bindingMap){if(bindingMap.get(binding.getQueueName())!=null){throw new MqException("[MemoryManager] 要插入的绑定已经存在!,binding="+binding);}//插入该绑定bindingMap.put(binding.getQueueName(),binding);log.info("内存插入绑定,binding="+binding);}}//一般情况下,多线程操作,多删一次没啥副作用,所以删除不加锁//但是如果删除失败,需要打印一下public void deleteBinding(Binding binding){//先查找交换机的所有绑定ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.get(binding.getExchangeName());if(bindingMap==null){log.info("内存删除绑定失败!该交换机没有绑定队列,exchangeName="+binding.getExchangeName());return;}//找到这个绑定并删除Binding toDelete=bindingMap.remove(binding.getQueueName());if(toDelete==null){log.info("内存删除绑定失败!没有找到该绑定,binding="+binding);return;}log.info("内存成功删除绑定,binding="+binding);}public Binding getBinding(String queueName,String exchangeName){ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.get(exchangeName);if(bindingMap==null){return null;}return bindingMap.get(queueName);}//message的操作public void insertMessage(Message message){messagesCenter.put(message.getMessageId(),message);}public void deleteMessage(String messageId){messagesCenter.remove(messageId);}public Message getMessage(String messageId){return messagesCenter.get(messageId);}//消息和队列public void sendMessageToQueue(String queueName,Message message){//找到给队列存储消息的hash表,不存在则创建LinkedList<Message> queueMessages=queueMessagesMap.computeIfAbsent(queueName,c->new LinkedList<>());//给这个消息表中插入一条消息synchronized (queueMessages){queueMessages.add(message);}//消息中心也要保存一份messagesCenter.put(message.getMessageId(),message);log.info("成功向队列发送消息并保存到消息中心,queueName="+queueName+",messageId="+message.getMessageId());}/*** 队列向消费者发送消息,同时从队列中删除这条消息,并加入到自己的未确认消息队列中*/public Message getMessageFromQueue(String queueName){LinkedList<Message> queueMessages=queueMessagesMap.get(queueName);if(queueMessages==null){return null;}Message message=null;synchronized (queueMessages){if(queueMessages.size()==0){return null;}message=queueMessages.pop();log.info("已从队列中取出第一条消息,queueName="+queueName+",messageId="+message.getMessageId());return message;}}/*** 添加一条未确认消息*/public void insertMessageWaitAck(String queueName,Message message){ConcurrentHashMap<String,Message> queueMessageWaitAckMap=queuesMessageWaitAckMap.computeIfAbsent(queueName,c->new ConcurrentHashMap<>());queueMessageWaitAckMap.put(message.getMessageId(),message);log.info("消息进入待确认队列,queueName="+queueName+",messageId="+message.getMessageId());}/*** 从待确认队列中删除一条消息*/public void deleteMessageWaitAck(String queueName,String messageId) {ConcurrentHashMap<String, Message> queueMessageWaitAckMap = queuesMessageWaitAckMap.get(queueName);if (queueMessageWaitAckMap == null) {return;}queueMessageWaitAckMap.remove(messageId);log.info("将消息从待确认队列中移除,queueName="+queueName+",messageId="+messageId);}/*** 查询未确认消息*/public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String,Message> queueMessageWaitAckMap=queuesMessageWaitAckMap.get(queueName);if(queueMessageWaitAckMap==null){return null;}return queueMessageWaitAckMap.get(messageId);}/*** 从硬盘上恢复数据*/public void loadDiskData(DiskManager manager) throws IOException, MqException, ClassNotFoundException {//恢复meta数据List<Exchange> exchanges=manager.selectAllExchanges();for(Exchange exchange:exchanges){exchangesMap.put(exchange.getName(),exchange);}List<QueueEntity> queueEntities=manager.selectAllQueues();for(QueueEntity queue:queueEntities){queuesMap.put(queue.getName(),queue);}List<Binding> bindings=manager.selectAllBindings();for(Binding binding:bindings){ConcurrentHashMap<String,Binding> bindingMap=bindingsMap.computeIfAbsent(binding.getExchangeName(),c->new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(),binding);}//恢复message//不需要恢复待确认消息,因为持久化的消息会被重新加载到内存中,待确认消息也在里面,可以让消费者重新取一遍for(QueueEntity queue:queueEntities){LinkedList<Message> queueMessages=manager.loadAllMessages(queue.getName());for(Message message:queueMessages){sendMessageToQueue(queue.getName(),message);}}log.info("从硬盘上恢复数据成功!");}/*** 查看某个队列的消息数*/public int getQueueSize(QueueEntity queue){synchronized (queue){LinkedList<Message> messages=queueMessagesMap.get(queue.getName());if(messages==null){return 0;}return messages.size();}}
}

 3.2 测试MemoryManager类

@SpringBootTest
class MemoryManagerTest {private MemoryManager memoryManager=null;@BeforeEachvoid setUp() {memoryManager=new MemoryManager();memoryManager.init();}@AfterEachvoid tearDown() {memoryManager=null;}private QueueEntity createQueue(String queueName){QueueEntity queue=new QueueEntity();queue.setName(queueName);return queue;}private Exchange createExchange(String exchangeName){Exchange exchange=new Exchange();exchange.setName(exchangeName);return exchange;}private Binding createBinding(String queueName,String exchangeName){Binding binding=new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey("bidingKey");return binding;}@Testvoid testQueue() {QueueEntity expectedQueue =createQueue("testQueue");memoryManager.insertQueue(expectedQueue);//因为内存中存放的是引用,直接比较两个引用是否是同一个即可QueueEntity actualQueue=memoryManager.getQueue("testQueue");Assertions.assertTrue(expectedQueue==actualQueue);memoryManager.deleteQueue("testQueue");actualQueue=memoryManager.getQueue("testQueue");Assertions.assertNull(actualQueue);}@Testvoid testExchange() {Exchange expectedExchange=createExchange("testExchange");memoryManager.insertExchange(expectedExchange);Exchange actualExchange=memoryManager.getExchange("testExchange");Assertions.assertTrue(expectedExchange==actualExchange);memoryManager.deleteExchange("testExchange");actualExchange=memoryManager.getExchange("testExchange");Assertions.assertNull(actualExchange);}@Testvoid insertBinding() throws MqException {Binding expectedBinding=createBinding("testQueue","testExchange");memoryManager.insertBinding(expectedBinding);Binding actualBinding=memoryManager.getBinding("testQueue","testExchange");Assertions.assertTrue(expectedBinding==actualBinding);memoryManager.deleteBinding(expectedBinding);actualBinding=memoryManager.getBinding("testQueue","testExchange");Assertions.assertNull(actualBinding);}private Message createMessage(String content){Message message=Message.createMessageWithId("routingKey",null,content.getBytes());return message;}@Testvoid testMessage() {Message expectedMessage=createMessage("hello");memoryManager.insertMessage(expectedMessage);Message actualMessage=memoryManager.getMessage(expectedMessage.getMessageId());Assertions.assertTrue(expectedMessage==actualMessage);memoryManager.deleteMessage(expectedMessage.getMessageId());actualMessage=memoryManager.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid messageToQueue() {Message expectedMessage=createMessage("hello");QueueEntity queue=createQueue("testQueue");memoryManager.sendMessageToQueue(queue.getName(),expectedMessage);Message actualMessage=memoryManager.getMessageFromQueue("testQueue");Assertions.assertEquals(expectedMessage,actualMessage);int size=memoryManager.getQueueSize(queue);Assertions.assertEquals(0,size);}@Testvoid messageWaitAck() {Message expectedMessage=createMessage("hello");memoryManager.insertMessageWaitAck("testQueue",expectedMessage);Message actualMessage=memoryManager.getMessageWaitAck("testQueue",expectedMessage.getMessageId());Assertions.assertTrue(expectedMessage==actualMessage);memoryManager.deleteMessageWaitAck("testQueue", expectedMessage.getMessageId());actualMessage=memoryManager.getMessageWaitAck("testQueue", expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testvoid loadDiskData() throws IOException, MqException, ClassNotFoundException {//先给硬盘添加数据DemoApplication.context= SpringApplication.run(DemoApplication.class);DiskManager diskManager=new DiskManager();diskManager.init();Binding binding=createBinding("testQueue","testExchange");QueueEntity queue=createQueue("testQueue");Exchange exchange=createExchange("testExchange");diskManager.insertBinding(binding);diskManager.insertExchange(exchange);diskManager.insertQueue(queue);LinkedList<Message> messages=new LinkedList<>();for(int i=0;i<10;i++){Message message=createMessage("hello"+i);messages.add(message);diskManager.sendMessage(queue,message);}for(int i=5;i<10;i++){diskManager.deleteMessage(queue,messages.get(i));}//从硬盘上恢复数据memoryManager.loadDiskData(diskManager);Binding binding1=memoryManager.getBinding("testQueue","testExchange");//因为这个对象是从硬盘上读出来的,所以引用指向的不是同一个对象Assertions.assertEquals(binding.getBindingKey(),binding1.getBindingKey());Assertions.assertEquals(binding.getQueueName(),binding1.getQueueName());Assertions.assertEquals(binding.getExchangeName(),binding1.getExchangeName());Exchange exchange1=memoryManager.getExchange("testExchange");Assertions.assertEquals(exchange.getType(),exchange1.getType());Assertions.assertEquals(exchange.getArguments(),exchange1.getArguments());Assertions.assertEquals(exchange.isDurable(),exchange1.isDurable());Assertions.assertEquals(exchange.isAutoDelete(),exchange1.isAutoDelete());Assertions.assertEquals(exchange.getName(),exchange1.getName());QueueEntity queue1=memoryManager.getQueue("testQueue");Assertions.assertEquals(queue.getName(),queue1.getName());Assertions.assertEquals(queue.getArguments(),queue1.getArguments());Assertions.assertEquals(queue.isExclusive(),queue1.isExclusive());Assertions.assertEquals(queue.isAutoDelete(),queue1.isAutoDelete());Assertions.assertEquals(queue.isDurable(),queue1.isDurable());int size=memoryManager.getQueueSize(queue1);Assertions.assertEquals(5,size);Message message=memoryManager.getMessageFromQueue(queue1.getName());Assertions.assertArrayEquals("hello0".getBytes(),message.getBody());//删除硬盘上的数据diskManager.deleteQueue(queue.getName());DemoApplication.context.close();File file=new File("./data/meta.db");file.delete();}
}

四. 实现Virtual Host

虚拟主机,是一个逻辑上的集合,里面包含交换机,绑定,队列和消息. 

有没有童靴发现,我们写的代码里,除了EOFException(文件读到结尾是抛出的异常,属于正常读取文件产生的异常),其他的异常我们都是直接抛出去,交给上层调用者解决.这个上层调用者就是Virtual Host,它负责提供9个API,同时要解决所有可能出现的异常.

  • exchangeDeclare: 声明一个交换机,有则不创建,没有则创建
  • exchangeDelete: 删除一个交换机
  • queueDeclare: 声明一个队列,有则不创建,没有则创建
  • queueDelete: 删除一个队列
  • queueBind: 创建绑定
  • queueUnbind: 解除绑定
  • basicPublish: 生产者发布一个消息
  • basicConsume: 消费者订阅一个队列,后续该队列收到消息时会将消息推送给这个消费者
  • basicAck: 消费者处理完消息后,手动应答该消息

4.1 VirtualHost的实现

下面是VH的代码

@Data
@Slf4j
public class VirtualHost {//管理硬盘数据private DiskManager diskManager=new DiskManager();//管理内存数据private MemoryManager memoryManager=new MemoryManager();//虚拟主机的唯一标识private String virtualHost;//多线程涉及到加锁操作private Object exchangeLocker=new Object();private Object queueLocker=new Object();//Router类负责检查routingKey和bindingKeyprivate Router router=new Router();public VirtualHost(){//初始化硬盘数据diskManager.init();//初始化内存数据memoryManager.init();//将硬盘数据加载到内存里try {memoryManager.loadDiskData(diskManager);} catch (Exception e) {log.info("内存恢复数据失败!");e.printStackTrace();}}//没有则创建,有则不创建public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments){//虚拟主机和交换机之间是1:n的关系,需要在exchangeName前面加上虚拟主机标识exchangeName=virtualHost+exchangeName;synchronized (exchangeLocker){try{if(memoryManager.getExchange(exchangeName)!=null){//有这个交换机,直接返回truelog.info("要创建的交换机已经存在,exchangeName="+exchangeName);return true;}Exchange exchange=new Exchange();exchange.setName(exchangeName);exchange.setDurable(durable);exchange.setType(type);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){//将这个交换机持久化diskManager.insertExchange(exchange);}memoryManager.insertExchange(exchange);log.info("创建交换机完成.exchangeName="+exchangeName);return true;}catch (Exception e){log.info("创建交换机失败.exchangeName="+exchangeName);e.printStackTrace();return false;}}}//删除操作,同样不加锁,因为多删一次没啥副作用public boolean exchangeDelete(String exchangeName){exchangeName=virtualHost+exchangeName;try{Exchange exchange=memoryManager.getExchange(exchangeName);if(exchange==null){//说明要删除的交换机不存在,打印一下日志log.info("要删除的交换机不存在,exchangeName="+exchangeName);return false;}if(exchange.isDurable()){//硬盘上删除该交换机diskManager.deleteExchange(exchangeName);}memoryManager.deleteExchange(exchangeName);log.info("删除交换机成功,exchangeName="+exchangeName);return true;}catch (Exception e){log.info("删除交换机失败,exchangeName="+exchangeName);e.printStackTrace();return false;}}public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> arguments){queueName=virtualHost+queueName;synchronized (queueLocker){try{if(memoryManager.getQueue(queueName)!=null){log.info("要创建的队列已存在");return true;}QueueEntity queue=new QueueEntity();queue.setName(queueName);queue.setDurable(durable);queue.setAutoDelete(autoDelete);queue.setExclusive(exclusive);queue.setArguments(arguments);//先进行硬盘操作,硬盘操作更有可能抛异常//如果先进行内存操作,但是硬盘操作抛异常,还要进行数据恢复if(durable){diskManager.insertQueue(queue);}memoryManager.insertQueue(queue);log.info("创建队列成功,queueName="+queueName);return true;} catch (Exception e) {log.info("创建队列失败,queueName="+queueName);e.printStackTrace();return false;}}}public boolean queueDelete(String queueName){queueName=virtualHost+queueName;try{QueueEntity queue=memoryManager.getQueue(queueName);if(queue==null){log.info("要删除的队列不存在,queueName="+queueName);return false;}if(queue.isDurable()){diskManager.deleteQueue(queueName);}memoryManager.deleteQueue(queueName);log.info("删除队列成功,queueName="+queueName);return true;}catch (Exception e){log.info("删除队列失败,queueName="+queueName);e.printStackTrace();return false;}}public boolean queueBind(String queueName,String exchangeName,String bindingKey){queueName=virtualHost+queueName;exchangeName=virtualHost+exchangeName;synchronized (exchangeLocker){synchronized (queueLocker){try{if(memoryManager.getBinding(queueName,exchangeName)!=null){//这个绑定已经存在,不允许throw new MqException("要创建的绑定已经存在,exchangeName="+exchangeName+",queueName="+queueName);}if(!router.checkBindingKey(bindingKey)){throw new MqException("创建绑定失败,bindingKey非法,bindingKey="+bindingKey);}Binding binding=new Binding();binding.setQueueName(queueName);binding.setExchangeName(exchangeName);binding.setBindingKey(bindingKey);//交换机和队列都持久化时,绑定自然就持久化了Exchange exchange=memoryManager.getExchange(exchangeName);if(exchange==null){throw new MqException("创建绑定失败,交换机不存在,exchangeName="+exchangeName);}QueueEntity queue=memoryManager.getQueue(queueName);if(queue==null){throw new MqException("创建绑定失败,队列不存在,queueName="+queueName);}if(exchange.isDurable()&&queue.isDurable()){diskManager.insertBinding(binding);}memoryManager.insertBinding(binding);log.info("创建绑定成功,binding="+binding);return true;} catch (Exception e) {log.info("创建绑定失败,exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}}}/*** 解绑有两种情况: 1. 先删除了交换机和队列,再解除绑定 2.只解除了绑定* 为了兼容第一种情况,不需要检验绑定的交换机和队列是否存在* @param queueName* @param exchangeName* @return*/public boolean queueUnbind(String queueName,String exchangeName){try{Binding binding=memoryManager.getBinding(queueName,exchangeName);if(binding==null){log.info("解除绑定失败,绑定不存在,exchangeName="+exchangeName+",queueName="+queueName);return false;}//即使绑定没有持久化,进行删除操作也没啥影响,否则还要判断队列和交换机是否持久化diskManager.deleteBinding(binding);memoryManager.deleteBinding(binding);log.info("解除绑定成功,exchangeName"+exchangeName+",queueName="+queueName);return true;} catch (Exception e) {log.info("解除绑定失败,exchangeName="+exchangeName+",queueName="+queueName);e.printStackTrace();return false;}}/*** 生产者调用该方法,指定发布消息到哪个交换机上* 1. 找到交换机,判断类型* 2. 创建消息,使用工厂方法创建* 3. 根据交换机类型和消息的routingKey转发该消息到不同的队列中* @return*/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body){try{if(exchangeName==null){//表示使用默认交换机exchangeName="";}else {exchangeName=virtualHost+exchangeName;}Exchange exchange=memoryManager.getExchange(exchangeName);if(exchange==null){throw new MqException("发布消息失败,指定交换机不存在,exchangeName="+exchangeName);}Message message=Message.createMessageWithId(routingKey,properties,body);//对交换机的类型进行判断,因为直接交换机的转发规则比较简单,放在vh中//topic和fanout交换机放在router类中进行转发判断if(exchange.getType()==ExchangeType.DIRECT){//此时的routingKey就是要转发的队列名字String queueName=virtualHost+routingKey;QueueEntity queue=memoryManager.getQueue(queueName);if(queue==null){throw new MqException("要转发的队列不存在,queueName="+queueName);}//判断消息是否持久化if(message.isDelivery()==0x2){diskManager.sendMessage(queue,message);}memoryManager.sendMessageToQueue(queueName,message);}else{//找到交换机绑定的queueConcurrentHashMap<String,Binding> bindingMap=memoryManager.getBindingsMap().get(exchangeName);if(bindingMap==null){//这个交换机没有绑定任何队列,不用转发消息,直接返回return true;}for(Map.Entry<String,Binding> bindingEntry:bindingMap.entrySet()){//得到队列QueueEntity queue=memoryManager.getQueue(bindingEntry.getKey());if(queue==null){//本来要抛异常,但是为了不影响其他队列的转发,这里只打印日志log.info("交换机绑定的队列不存在");continue;}//判断是否转发Binding binding=bindingEntry.getValue();if(router.route(exchange.getType(),message, binding)){//转发该消息//将消息写入硬盘if(message.isDelivery()==0x2){diskManager.sendMessage(queue,message);}//将消息写入内存memoryManager.sendMessageToQueue(queue.getName(),message);//TODO:通知订阅者取消息}}}log.info("成功发布消息,exchangeName="+exchangeName+",messageId="+message.getMessageId());return true;} catch (Exception e) {log.info("发布消息失败,exchangeName="+exchangeName);e.printStackTrace();return false;}}
}

下面是Router类的判断方法,用于检查RoutingKey和BindingKey是否合法,以及二者的匹配(这是自己规定的,童靴们也可以创建自己的匹配规则)

@Data
public class Router {/*** BindingKey格式规定:* 1. 由'.'作为各部分的分隔符* 2. 各部分可由字母,数字,下划线组成* 3. *表示匹配一个部分,#表示匹配0个或多个部分,通配符只能作为单独的一部分* 4. *和# , #和#不能相邻* @param bindingKey* @return*/public boolean checkBindingKey(String bindingKey){if(bindingKey==""){//如果创建绑定时绑定的交换机是直接交换机,或者交换机是扇出交换机,// 用不到bindingKey,会将bindingKey设置成""return true;}char[] chs=bindingKey.toCharArray();for(char ch:chs){if(ch>='a'&&ch<='z'){continue;}if(ch>='A'&&ch<='Z'){continue;}if(ch>='0'&&ch<='9'){continue;}if(ch=='.'||ch=='_'||ch=='*'||ch=='#'){continue;}return false;}//通配符只能作为一个独立的部分String[] strs=bindingKey.split(".");for(String str:strs){if(str.length()>1&&(str.contains("*")||str.contains("#"))){return false;}}//通配符相邻for(int i=0;i<strs.length-1;i++){if(strs[i].equals("*")&&strs[i+1].equals("#")){return false;}if(strs[i].equals("#")&&strs[i+1].equals("#")){return false;}if(strs[i].equals("#")&&strs[i+1].equals("*")){return false;}}return true;}/*** RoutingKey格式规定;* 1. 由'.'作为各部分的分隔符* 2. 各部分可由字母,数字,下划线组成* @param routingKey* @return*/public boolean checkRoutingKey(String routingKey){if(routingKey==""){//如果使用的是扇出交换机,这个消息会被转发到该交换机绑定的全部队列上,用不到routingKeyreturn true;}char[] chs=routingKey.toCharArray();for(char ch:chs){if(ch>='a'&&ch<='z'){continue;}if(ch>='A'&&ch<='Z'){continue;}if (ch=='.'||ch=='_'){continue;}if(ch>='0'&&ch<='9'){continue;}return false;}return true;}public boolean route(ExchangeType type, Message message, Binding binding) throws MqException {if(type==ExchangeType.FANOUT){//与该交换机绑定的全部队列都要存放该消息return true;}else if(type==ExchangeType.TOPIC){return topicRoute(message,binding);}else {//属于异常情况throw new MqException("交换机类型非法!exchangeType="+type);}}private boolean topicRoute(Message message, Binding binding) {String routingKey= message.getRoutingKey();String bindingKey= binding.getBindingKey();int routIndex=0;int bindIndex=0;String[] routKeys=routingKey.split("\\.");String[] bindKeys=bindingKey.split("\\.");while (routIndex<routKeys.length&&bindIndex<bindKeys.length){//System.out.println("routing[key]="+routKeys[routIndex]+",binding[key]="+bindingKey);//有通配符的情况if(bindKeys[bindIndex].equals("*")){bindIndex++;routIndex++;} else if(bindKeys[bindIndex].equals("#")){bindIndex++;if(bindIndex==bindKeys.length){//表明#匹配的是后面的全部return true;}routIndex=getRoutIndex(routIndex,routKeys,bindKeys[bindIndex]);if(routIndex==-1){return false;}bindIndex++;routIndex++;}else{//普通字符串匹配if(!routKeys[routIndex].equals(bindKeys[bindIndex])){return false;}bindIndex++;routIndex++;}}//任何一个索引没有到达终点if(bindIndex<bindKeys.length||routIndex<routKeys.length){return false;}//System.out.println("routKey="+routingKey+",bindingKey="+bindingKey);return true;}private int getRoutIndex(int routIndex, String[] routKeys, String bindKey) {for(int i=routIndex;i<routKeys.length;i++){if(routKeys[i].equals(bindKey)){return i;}}return -1;}}

 4.2 basicConsume方法

上面提供的Virtual Host类的代码中,没有实现basicConsume和basicAck方法,并且basicPublish方法也是不完整的.因为这个方法的实现比较复杂,需要另外创建一些类~~

Consume类,是一个函数式接口,里面是一个回调方法,消费者通过这个方法来处理消息.

/*** 一个函数型接口,消费者进行消息处理的逻辑*/
@FunctionalInterface
public interface Consume {/*** @param consumerTag 消费者标识* @param properties message的基本属性* @param body message的主体*/void handleDelivery(String consumerTag, BasicProperties properties,byte[] body)throws IOException, MqException;
}

 Consumer类,表示一个消费者.

/*** 表示一个完整的消费者*/
@Data
public class Consumer {//消费者唯一标识private String consumerTag;//订阅的队列的名字private String queueName;//是否自动回复private boolean autoAck;//消费消息的方法private Consume consume;public Consumer(String consumerTag,String queueName,boolean autoAck,Consume consume){this.consumerTag=consumerTag;this.queueName=queueName;this.autoAck=autoAck;this.consume=consume;}
}

为了建立消费者和队列之间的订阅关系,我们需要给QueueEntity类增加一个成员变量,用于管理该队列的订阅者.

//管理订阅者private LinkedList<Consumer> subscribers=new LinkedList<>();//采用轮询的方式向消费者投递消息,因此需要记录每次轮到哪个消费者了//涉及到线程安全,使用原子类private AtomicInteger consumeToken=new AtomicInteger(0);//这个消息被送到哪个订阅者手里public Consumer chooseConsumer(){synchronized (subscribers){if(subscribers.size()==0){return null;}int index=consumeToken.get()% subscribers.size();consumeToken.getAndIncrement();return subscribers.get(index);}}//添加订阅者public void addSubscriber(Consumer consumer){synchronized (subscribers){subscribers.add(consumer);}}

虚拟主机需要调用ConsumerManager类的方法,管理消费者和队列之间的关系.

下面我们来分析一下这个类的实现.

那么问题来了--扫描线程怎么知道哪个队列收到了消息呢,需要遍历每个队列对应的存储消息的LinkedList吗?

这样做未免太复杂了,我们可以使用一个tokenQueue阻塞队列,当某个队列收到消息后,它的queueName会被放进阻塞队列中,扫描线程只需要检查阻塞队列中是否有元素即可.(参见下图)

 有了这个逻辑,我们就来实现一下ConsumerManager类.

@Slf4j
public class ConsumerManager {//manager类需要执行向队列中添加订阅者,收到Ack后删除消息等操作,因此要有一个vh对象private VirtualHost virtualHost;//阻塞队列,每当queue收到一个消息后,它的queueName就会被放到阻塞队列中private BlockingQueue<String> tokenQueue=new LinkedBlockingQueue<>();//线程池,用于执行消费者的回调函数private ExecutorService workThreadPool= Executors.newFixedThreadPool(10);//扫描线程,用于检查阻塞队列中是否有queueNameprivate Thread scannerThread=null;public ConsumerManager(VirtualHost virtualHost){this.virtualHost=virtualHost;scannerThread=new Thread(()->{try {while(true){//阻塞等待tokenQueue的元素String queueName=tokenQueue.take();//log.info("扫描线程收到消息");//队列收到消息后,将消息发给订阅者QueueEntity queue=virtualHost.getMemoryManager().getQueue(queueName);if(queue==null){//队列不存在throw new MqException("投递消息的队列不存在,queueName="+queueName);}//将消息投递给消费者synchronized (queue){consumeMessage(queue);}}} catch (Exception e) {e.printStackTrace();}},"scannerThread");//将扫描线程设置为后台进程,服务器终止时线程也随之终止scannerThread.setDaemon(true);scannerThread.start();}//添加订阅者public void addSubscriber(String queueName, String consumerTag, boolean autoAck, Consume consume) throws MqException {QueueEntity queue=virtualHost.getMemoryManager().getQueue(queueName);if(queue==null){//没有该队列throw new MqException("[ConsumerManager] 添加订阅者失败,没有该队列,queueName="+queueName);}//创建一个消费者Consumer consumer=new Consumer(consumerTag,queueName,autoAck,consume);synchronized (queue){//该队列不能接收其它消息//将这个消费者添加到队列的订阅者列表中queue.addSubscriber(consumer);//如果之前有消息,将积压的消息处理掉int n=virtualHost.getMemoryManager().getQueueSize(queue);for(int i=0;i<n;i++){consumeMessage(queue);}}}//因为调用该方法时加锁了,所以这个方法就不加锁了private void consumeMessage(QueueEntity queue) {//选出一个订阅者Consumer luckyDog=queue.chooseConsumer();if(luckyDog==null){//当前队列没有订阅者,不进行投递log.info("当前队列没有订阅者,queueName="+queue.getName());return;}//得到消息Message message=virtualHost.getMemoryManager().getMessageFromQueue(queue.getName());if(message==null){//当前队列没有消息,这属于异常情况,不过问题不大,打印一下日志就行log.info("当前队列没有消费者,queueName="+queue.getName());return;}//使用多线程,执行订阅者的消费逻辑workThreadPool.submit(()->{try {//先复制一份保存在未确认序列中virtualHost.getMemoryManager().insertMessageWaitAck(queue.getName(),message);//消息中心也保存一份virtualHost.getMemoryManager().insertMessage(message);//执行消费者的消费逻辑luckyDog.getConsume().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody() );//如果消费者是自动应答,消费完后直接删除该消息的副本if(luckyDog.isAutoAck()){if(message.isDelivery()==0x2){//删除硬盘中的数据virtualHost.getDiskManager().deleteMessage(queue,message);}//删除内存中的数据virtualHost.getMemoryManager().deleteMessageWaitAck(queue.getName(), message.getMessageId());virtualHost.getMemoryManager().deleteMessage(message.getMessageId());log.info("收到Ack,删除消息成功,messageId="+message.getMessageId());}//如果不是,删除的逻辑放在basicAck里执行} catch (Exception e) {log.info("消费信息失败,consumerTag="+luckyDog.getConsumerTag()+",messageId="+message.getMessageId());e.printStackTrace();}});}//每次添加一个消息,要通知扫描线程public void notifyScanner(String queueName) throws InterruptedException {tokenQueue.put(queueName);}
}

实现完ConsumerManager类后,我们也就可以补全订阅/发布/确认消息的方法了.虚拟主机要添加一个ConsumerManager成员变量.

下面是这三个方法的实现.

    /*** 生产者调用该方法,指定发布消息到哪个交换机上* 1. 找到交换机,判断类型* 2. 创建消息,使用工厂方法创建* 3. 根据交换机类型和消息的routingKey转发该消息到不同的队列中* @return*/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body){try{if(exchangeName==null){//表示使用默认交换机exchangeName="";}else {exchangeName=virtualHost+exchangeName;}Exchange exchange=memoryManager.getExchange(exchangeName);if(exchange==null){throw new MqException("发布消息失败,指定交换机不存在,exchangeName="+exchangeName);}Message message=Message.createMessageWithId(routingKey,properties,body);//对交换机的类型进行判断,因为直接交换机的转发规则比较简单,放在vh中//topic和fanout交换机放在router类中进行转发判断if(exchange.getType()==ExchangeType.DIRECT){//此时的routingKey就是要转发的队列名字String queueName=virtualHost+routingKey;QueueEntity queue=memoryManager.getQueue(queueName);if(queue==null){throw new MqException("要转发的队列不存在,queueName="+queueName);}//判断消息是否持久化if(message.isDelivery()==0x2){diskManager.sendMessage(queue,message);}memoryManager.sendMessageToQueue(queueName,message);//通知订阅者取消息consumerManager.notifyScanner(queue.getName());}else{//找到交换机绑定的queueConcurrentHashMap<String,Binding> bindingMap=memoryManager.getBindingsMap().get(exchangeName);if(bindingMap==null){//这个交换机没有绑定任何队列,不用转发消息,直接返回return true;}for(Map.Entry<String,Binding> bindingEntry:bindingMap.entrySet()){//得到队列QueueEntity queue=memoryManager.getQueue(bindingEntry.getKey());if(queue==null){//本来要抛异常,但是为了不影响其他队列的转发,这里只打印日志log.info("交换机绑定的队列不存在");continue;}//判断是否转发Binding binding=bindingEntry.getValue();if(router.route(exchange.getType(),message, binding)){//转发该消息//将消息写入硬盘if(message.isDelivery()==0x2){diskManager.sendMessage(queue,message);}//将消息写入内存memoryManager.sendMessageToQueue(queue.getName(),message);//通知订阅者取消息consumerManager.notifyScanner(queue.getName());}}}log.info("成功发布消息,exchangeName="+exchangeName+",messageId="+message.getMessageId());return true;} catch (Exception e) {log.info("发布消息失败,exchangeName="+exchangeName);e.printStackTrace();return false;}}/*** 消费者通过该接口订阅一个队列的消息*/public boolean basicConsume(String queueName, String consumerTag, boolean autoAck, Consume consume){queueName=virtualHost+queueName;try{consumerManager.addSubscriber(queueName,consumerTag,autoAck,consume);log.info("成功添加订阅者,queueName="+queueName+",consumerTag="+consumerTag);return true;} catch (Exception e) {log.info("添加订阅者失败,queueName="+queueName+",consumerTag="+consumerTag);e.printStackTrace();return false;}}/*** 消费者主动调用该接口,确认某个消息*/public boolean basicAck(String queueName,String messageId){queueName=virtualHost+queueName;try{//找到该队列QueueEntity queue=memoryManager.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost] 确认消息失败,消息所在的队列不存在,queueName="+queueName);}//找到这个消息Message message=memoryManager.getMessage(messageId);if(message==null){throw new MqException("[VirtualHost] 确认消息失败,要确认的消息不存在,messageId="+messageId);}//删除硬盘数据if(message.isDelivery()==0x2){diskManager.deleteMessage(queue,message);}//从待确认队列中删除该消息memoryManager.deleteMessageWaitAck(queueName,messageId);//消息中心删除该消息memoryManager.deleteMessage(messageId);log.info("收到Ack,删除消息成功,messageId="+messageId);return true;}catch (Exception e){log.info("收到Ack,删除消息失败,messageId="+messageId);e.printStackTrace();return false;}}

4.3 测试VirtualHost类

@SpringBootTest
class VirtualHostTest {private VirtualHost virtualHost=null;@BeforeEachvoid setUp() {DemoApplication.context=SpringApplication.run(DemoApplication.class);virtualHost=new VirtualHost("default");}@AfterEachvoid tearDown() {virtualHost=null;DemoApplication.context.close();File file=new File("./data/meta.db");file.delete();}@Testvoid exchangeTest() {virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);Exchange exchange1=virtualHost.getMemoryManager().getExchange("defaulttestExchange");List<Exchange> exchanges=virtualHost.getDiskManager().selectAllExchanges();Assertions.assertNotNull(exchange1);Assertions.assertEquals(2,exchanges.size());virtualHost.exchangeDelete("testExchange");exchange1=virtualHost.getMemoryManager().getExchange("testExchange");exchanges=virtualHost.getDiskManager().selectAllExchanges();Assertions.assertEquals(1,exchanges.size());Assertions.assertNull(exchange1);}@Testvoid queueTest() {virtualHost.queueDeclare("testQueue",true,false,false,null);QueueEntity queue=virtualHost.getMemoryManager().getQueue(virtualHost.getVirtualHost()+"testQueue");List<QueueEntity> queueEntities=virtualHost.getDiskManager().selectAllQueues();Assertions.assertNotNull(queue);Assertions.assertEquals(1,queueEntities.size());virtualHost.queueDelete("testQueue");queue=virtualHost.getMemoryManager().getQueue(virtualHost.getVirtualHost()+"testQueue");queueEntities=virtualHost.getDiskManager().selectAllQueues();Assertions.assertNull(queue);Assertions.assertEquals(0,queueEntities.size());}@Testvoid queueBind() {virtualHost.queueDeclare("testQueue",false,false,false,null);virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);Assertions.assertTrue(virtualHost.queueBind("testQueue","testExchange",""));}@Testvoid queueUnbind() throws IOException {virtualHost.queueDeclare("testQueue",true,false,false,null);virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(virtualHost.queueBind("testQueue","testExchange",""));Assertions.assertTrue(virtualHost.queueUnbind("testQueue","testExchange"));virtualHost.getDiskManager().deleteQueue("defaulttestQueue");}@Testvoid directPublish() {virtualHost.queueDeclare("testQueue",false,false,false,null);//表示交给默认交换机进行转发,RoutingKey就是队列名Assertions.assertTrue(virtualHost.basicPublish(null,"testQueue",null,"hello".getBytes()));//从队列中取消息Message message=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue");Assertions.assertArrayEquals("hello".getBytes(),message.getBody());Assertions.assertEquals("testQueue",message.getRoutingKey());}@Testvoid fanoutPublish() throws IOException {virtualHost.exchangeDeclare("testExchange",ExchangeType.FANOUT,false,false,null);virtualHost.queueDeclare("testQueue1",false,false,false,null);virtualHost.queueDeclare("testQueue2",true,false,false,null);//建立绑定,因为是fanout交换机,bindingKey没啥用Assertions.assertTrue(virtualHost.queueBind("testQueue1","testExchange",""));Assertions.assertTrue(virtualHost.queueBind("testQueue2","testExchange",""));//向fanout交换机转发消息Assertions.assertTrue(virtualHost.basicPublish("testExchange","",null,"hello".getBytes()));//看看两个队列上是否都有这条消息Message message1=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue1");Assertions.assertArrayEquals("hello".getBytes(),message1.getBody());Message message2=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue2");Assertions.assertArrayEquals("hello".getBytes(),message2.getBody());Assertions.assertEquals(message1.getMessageId(),message2.getMessageId());virtualHost.getDiskManager().deleteQueue(virtualHost.getVirtualHost()+"testQueue2");}@Testvoid topicPublish() throws IOException {virtualHost.exchangeDeclare("testExchange",ExchangeType.TOPIC,false,false,null);virtualHost.queueDeclare("testQueue1",false,false,false,null);virtualHost.queueDeclare("testQueue2",false,false,false,null);//建立绑定,两个bindingKey和routingKey都匹配virtualHost.queueBind("testQueue1","testExchange","aaa.*.ccc");virtualHost.queueBind("testQueue2","testExchange","aaa.#");//发布消息virtualHost.basicPublish("testExchange","aaa.bbb.ccc",null,"hello".getBytes());//看看两个队列上是否都有这条消息Message message1=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue1");Assertions.assertArrayEquals("hello".getBytes(),message1.getBody());Message message2=virtualHost.getMemoryManager().getMessageFromQueue(virtualHost.getVirtualHost()+"testQueue2");Assertions.assertArrayEquals("hello".getBytes(),message2.getBody());Assertions.assertEquals(message1.getMessageId(),message2.getMessageId());//virtualHost.getDiskManager().deleteQueue(virtualHost.getVirtualHost()+"testQueue2");}//先发布再订阅,使用直接交换机,不需要建立绑定了@Testvoid basicConsume1() throws InterruptedException {virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);virtualHost.queueDeclare("testQueue",false,false,false,null);//发布消息virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Thread.sleep(200);//订阅消息,使用自动回复,手动回复下面会进行测试virtualHost.basicConsume("testQueue", "testConsumer", true, new Consume() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {Assertions.assertEquals("testConsumer",consumerTag);Assertions.assertArrayEquals("hello".getBytes(),body);}});}//先订阅再发布,使用直接交换机,不需要建立绑定了@Testvoid basicConsume2() throws InterruptedException {virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);virtualHost.queueDeclare("testQueue",false,false,false,null);//订阅消息,使用自动回复virtualHost.basicConsume("testQueue", "testConsumer", true, new Consume() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {Assertions.assertEquals("testConsumer",consumerTag);Assertions.assertArrayEquals("hello".getBytes(),body);}});
//        Thread.sleep(200);//发布消息virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());//留点时间等待消费者把消息处理掉Thread.sleep(500);}@Testvoid basicAck() throws InterruptedException {virtualHost.exchangeDeclare("testExchange",ExchangeType.DIRECT,false,false,null);virtualHost.queueDeclare("testQueue",false,false,false,null);Message message=new Message();virtualHost.basicConsume("testQueue", "testConsumer", false, new Consume() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {message.setMessageId(properties.getMessageId());Assertions.assertEquals("testConsumer",consumerTag);Assertions.assertArrayEquals("hello".getBytes(),body);}});virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());Thread.sleep(100);//消费者手动回复Assertions.assertTrue(virtualHost.basicAck("testQueue",message.getMessageId()));}
}

五. 实现Broker Server

前文做了那么多的铺垫,我们已经实现了Broker服务器内部的9个核心API,接下来我们就可以实现Broker服务器与客户端的交互了.

回顾一下交互模型.

左边的客户端是生产者,右边的客户端是消费者.它们与服务器之间都需要通过网络进行通信.

这里我们使用自定义的应用层协议,借助TCP协议进行客户端与服务器之间的通信.

客户端需要使用本地的API远程调用服务器的方法,也就是说,服务器提供的API和客户端本地提供的API是相互对应的.除了9个核心API之外,服务器还应该提供一些API.

  • 创建channel
  • 销毁channel
  • 创建交换机
  • 销毁交换机
  • 创建队列
  • 销毁队列
  • 创建绑定
  • 解除绑定
  • 发送消息
  • 订阅队列
  • 确认消息
  • 返回消息,消费者订阅一个队列后,当该队列收到消息时,会主动将消息发给订阅者

5.1 应用层协议的设计

学习网络通信的时候我们就谈到过,应用层协议是由程序猿自己定义的.而我们学过的http/https协议,是一些知名的应用层协议,并且是文本协议.

因为我们传输的消息是二进制的,不方便使用文本协议,我们可以自己定义二进制的应用层协议.

先来定义请求格式.

 下面是响应格式(和请求格式相比,不能说毫不相干,只能说一模一样)

 下面来规定一下type对应的API:

  • 0x1,创建channel
  • 0x2,关闭channel
  • 0x3,创建交换机
  • 0x4,销毁交换机
  • 0x5,创建队列
  • 0x6,销毁队列
  • 0x7,创建绑定
  • 0x8,解除绑定
  • 0x9,发布message
  • 0xa,订阅队列
  • 0xb,发送ack
  • 0xc,服务器返回订阅的消息

因为tcp是以字节为单位进行信息传输的,length部分是为了解决TCP协议的"粘包问题".

针对payLoad部分,如果是一个请求,payLoad里存放了调用方法需要的参数;如果是一个响应,payLoad里存放了方法的返回值.

5.2 请求/响应相关类的创建

首先建立Request和Response类,网络传输的主体就是它们.

/*** 因为请求和响应需要在网络上进行传输,需要进行序列化* 请求格式: type 4byte,length 4byte,payLoad*/
@Data
public class Request implements Serializable {private int type;private int length;private byte[] payLoad;
}
/*** 响应格式: type 4byte,length 4byte,payLoad*/
@Data
public class Response implements Serializable {private int type;private int length;private byte[] payLoad;
}

针对payLoad部分,我们需要定义基本格式.

对于请求来说,payLoad部分需要有调用方法所需要的参数,还需要定义一个rid保证保证请求和响应之间的对应关系,同时也需要一个channelId用于标识是哪个通道调用的这个方法.

(先剧透一下,一个客户端和服务器建立连接后,就相当于建立了一个Connection,我们前面谈到过,一个Connection可以包含多个通道,每个通道发送请求后,在收到响应之前是阻塞状态,这个channelId就是用来唤醒阻塞的线程的)

/*** 客户端发送请求时,要传的基本参数*/
@Data
public class BasicArguments implements Serializable {//与请求的rid形成对应关系private String rid;//标识一个通道private String channelId;    
}

因为方法的参数不同,要传输的对象也不同,我们需要针对每个方法分别定义参数类.

/*** 针对basicPublish方法*/
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties properties;private byte[] body;
}
/*** 针对basicConsume方法*/
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String queueName;private String consumerTag;private boolean autoAck;//basicConsume方法还有一个参数是回调函数,没有办法通过网络传输//对于Broker Server来说,它处理消息的逻辑就是将消息发送给客户端
}
/*** 对应exchangeDeclare方法*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType type;private boolean durable;private boolean autoDelete;private Map<String, Object> arguments;
}
/*** 对应exchangeDelete方法*/
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
/*** 针对queueBind方法*/
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}
/*** 对应queueDeclare方法*/
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean autoDelete;private boolean exclusive;private Map<String,Object> arguments;
}
/*** 针对queueDelete方法*/
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}
/*** 针对queueUnbind方法*/
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
/*** 针对basicAck方法*/
@Data
public class BasicAckArguments extends BasicArguments implements Serializable {private String queueName;private String messageId;
}
对于响应来说,基本格式中除了有rid和channelId外,因为服务器提供的方法,返回值都是Boolean类型,因此可以把它定义为响应的公共字段.
/*** 响应的基本格式*/
@Data
public class BasicReturns implements Serializable {private String rid;private String channelId;//调用9个api的返回值private boolean isOk;
}

请求返回的payLoad基本上都是BasicReturns对象,除了服务器主动给客户端推送消息时,因此也需要一个额外的类来定义这个payLoad.

@Data
public class MessageReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties properties;private byte[] body;
}

5.3 BrokerServer类的实现

@Slf4j
public class BrokerServer {//当前只实现一个虚拟主机,后续可自行进行扩展private VirtualHost virtualHost=new VirtualHost("default_");//key=channelId,value为socket对象,用于表示channel属于哪个TCP连接private ConcurrentHashMap<String, Socket> channelsMap=new ConcurrentHashMap<>();private ServerSocket serverSocket;//给多个客户端提供服务时,需要用到多线程private ExecutorService threadPool= Executors.newCachedThreadPool();//用于关闭服务器,本来只需要kill掉进程即可,这里是为了方便进行单元测试private volatile boolean run=true;public BrokerServer(int port) throws IOException {serverSocket=new ServerSocket(port);}public void start(){try{log.info("Broker Server启动服务!");while (run){//接收一个请求Socket clientSocket=serverSocket.accept();//多线程处理请求threadPool.submit(()->{processConnection(clientSocket);});}} catch (IOException e) {log.info("服务器关闭");//e.printStackTrace();}}public void stop() throws IOException {run=false;//关闭全部线程threadPool.shutdownNow();//关闭连接serverSocket.close();}private void processConnection(Socket clientSocket) {try(InputStream inputStream= clientSocket.getInputStream();OutputStream outputStream=clientSocket.getOutputStream()) {try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {//因为采用的是长连接,需要不断的读取请求while (true) {//接收请求并解析Request request = readRequest(dataInputStream);//处理请求Response response = processRequest(request, clientSocket);//将响应发送给客户端writeResponse(dataOutputStream, response);}} catch (EOFException | SocketException e) {log.info("信息传输结束,clientSocket=" + clientSocket.getRemoteSocketAddress());}}catch (IOException | ClassNotFoundException | MqException e) {log.info("信息传输异常");e.printStackTrace();}finally {//断开连接try {clientSocket.close();//断开连接之后,这个TCP连接包含的channel应该清除clearChannel(clientSocket);log.info("与客户端断开连接,clientSocket="+clientSocket.getRemoteSocketAddress());} catch (IOException e) {e.printStackTrace();}}}private void clearChannel(Socket clientSocket) {LinkedList<String> channels=new LinkedList<>();for(Map.Entry<String,Socket> entry: channelsMap.entrySet()){if(entry.getValue()==clientSocket){//对于集合类,不能一边遍历一边清除元素channels.add(entry.getKey());}}for(String channelId:channels){channelsMap.remove(channelId);}log.info("已清理客户端的全部channel,clientSocket="+clientSocket.getRemoteSocketAddress());}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayLoad());//刷新缓冲区dataOutputStream.flush();}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request=new Request();request.setType(dataInputStream.readInt());int expectedLength=dataInputStream.readInt();byte[] payLoad=new byte[expectedLength];int actualLength=dataInputStream.read(payLoad);if(expectedLength!=actualLength){throw new IOException("[BrokerServer] 请求的格式不正确!expectedLength="+expectedLength+",actualLength="+actualLength);}request.setLength(actualLength);request.setPayLoad(payLoad);return request;}private Response processRequest(Request request, Socket clientSocket) throws MqException, IOException, ClassNotFoundException {BasicArguments basicArguments= (BasicArguments) BinaryTool.fromBytes(request.getPayLoad());//log.info("收到的arguments="+basicArguments);boolean ok=true;//分析响应类型switch (request.getType()){case 0x1:{//创建channelString channelId=basicArguments.getChannelId();channelsMap.put(channelId,clientSocket);log.info("成功创建channel");break;}case 0x2:{//销毁channelString channelId= basicArguments.getChannelId();channelsMap.remove(channelId);log.info("销毁channel成功");break;}case 0x3:{//创建交换机//先转换成API需要的参数对象ExchangeDeclareArguments arguments=(ExchangeDeclareArguments) basicArguments;ok=virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getType(), arguments.isDurable(), arguments.isAutoDelete(),arguments.getArguments());break;}case 0x4:{//销毁交换机ExchangeDeleteArguments arguments=(ExchangeDeleteArguments) basicArguments;ok= virtualHost.exchangeDelete(arguments.getExchangeName());break;}case 0x5:{//创建队列QueueDeclareArguments arguments=(QueueDeclareArguments) basicArguments;ok= virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(), arguments.isAutoDelete(), arguments.isExclusive(),arguments.getArguments());break;}case 0x6:{//销毁队列QueueDeleteArguments arguments=(QueueDeleteArguments) basicArguments;ok= virtualHost.queueDelete(arguments.getQueueName());break;}case 0x7:{//创建绑定QueueBindArguments arguments=(QueueBindArguments) basicArguments;ok= virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());break;}case 0x8:{//解除绑定QueueUnbindArguments arguments=(QueueUnbindArguments) basicArguments;ok= virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());break;}case 0x9:{//发布消息BasicPublishArguments arguments=(BasicPublishArguments) basicArguments;ok= virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getProperties(), arguments.getBody());//log.info("消息内容,body="+new String(arguments.getBody()));break;}case 0xa:{//订阅队列BasicConsumeArguments arguments=(BasicConsumeArguments) basicArguments;ok= virtualHost.basicConsume(arguments.getQueueName(), arguments.getConsumerTag(), arguments.isAutoAck(), new Consume() {//这个回调函数负责将收到的消息发送给客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {//先找到要发给哪个客户端,consumerTag实际上就是channelIdSocket luckyDogSocket=channelsMap.get(consumerTag);if(luckyDogSocket==null||luckyDogSocket.isClosed()){//说明服务器和客户端的连接断开,属于异常情况throw new MqException("[BrokerServer] 客户端与服务器连接异常,clientSocket="+luckyDogSocket.getRemoteSocketAddress());}//构造MessageReturns对象MessageReturns messageReturns=new MessageReturns();//因为这个响应是由服务器主动发起的,没有对应的ridmessageReturns.setConsumerTag(consumerTag);messageReturns.setChannelId(consumerTag);//messageReturns.setRid("");messageReturns.setProperties(properties);messageReturns.setBody(body);//将这个对象变成字节数组,作为response的载荷byte[] payLoad=BinaryTool.toBytes(messageReturns);//构造一个ResponseResponse response=new Response();response.setType(0x0c);response.setLength(payLoad.length);response.setPayLoad(payLoad);//将响应发送给客户端DataOutputStream outputStream=new DataOutputStream(luckyDogSocket.getOutputStream());synchronized (luckyDogSocket){//多线程发给同一个客户端时message会发生错误writeResponse(outputStream,response);}}});break;}case 0xb:{//发送ackBasicAckArguments arguments=(BasicAckArguments) basicArguments;ok=virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());break;}default:{//异常情况throw new MqException("[BrokerServer] 请求类型不正确,requestType="+request.getType());}}//构造一个响应//构造一个return对象BasicReturns returns=new BasicReturns();returns.setOk(ok);returns.setChannelId(basicArguments.getChannelId());returns.setRid(basicArguments.getRid());byte[] payLoad=BinaryTool.toBytes(returns);//构造一个响应Response response=new Response();response.setType(request.getType());response.setLength(payLoad.length);response.setPayLoad(payLoad);log.info("处理得到响应,basicReturns="+returns);return response;}
}

因为网络通信不能只有服务器,所以我们要先实现客户端部分才能测试二者之间的通信

六. 实现Broker Client

对于客户端来说,我们规定有两个类,Connection类负责TCP的连接和通信,Channel类负责逻辑上对TCP连接的细分.

/*** 负责和服务器的通信*/
@Data
@Slf4j
public class Connection {//与服务器建立连接private Socket socket;//socket的输入流和输出流对象private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//记录当前TCP连接都包含了哪些channel,key=channelIdprivate ConcurrentHashMap<String,Channel> channelMap=new ConcurrentHashMap<>();//使用线程池,进行响应的处理private ExecutorService workThreadPool= Executors.newFixedThreadPool(10);public Connection(String host, int port) throws IOException {socket=new Socket(host,port);inputStream=socket.getInputStream();outputStream= socket.getOutputStream();dataInputStream=new DataInputStream(inputStream);dataOutputStream=new DataOutputStream(outputStream);//创建一个扫描线程,用于接收响应Thread scannerThread=new Thread(()->{try {while(!socket.isClosed()){Response response=readResponse();//log.info("收到响应,response="+response);processResponse(response);}} catch (SocketException | EOFException e) {log.info("信息传输完毕,连接断开,serverSocket="+socket.getRemoteSocketAddress());//e.printStackTrace();}catch (Exception e){log.info("连接异常,serverSocket="+socket.getRemoteSocketAddress());e.printStackTrace();}},"scannerThreadClient");scannerThread.setDaemon(true);scannerThread.start();}private void processResponse(Response response) {//因为客户端处理响应比较耗时,采用多线程workThreadPool.submit(()->{try {//解析payLoadbyte[] payLoad= response.getPayLoad();//响应分为两种类型: 1. 请求收到的响应 2. 订阅的消息if(response.getType()==0xc){//订阅的消息,没有对应的ridMessageReturns messageReturns=(MessageReturns) BinaryTool.fromBytes(payLoad);//调用channel的handleDelivery方法Channel channel=channelMap.get(messageReturns.getChannelId());if(channel==null){throw new MqException("[Connection] 接收message失败,消息对应的channel不存在,channelId="+messageReturns.getChannelId());}channel.getConsume().handleDelivery(messageReturns.getConsumerTag(), messageReturns.getProperties(), messageReturns.getBody());}else{//请求收到对应的响应BasicReturns returns= (BasicReturns) BinaryTool.fromBytes(payLoad);//将return加入到channel的map中String channelId=returns.getChannelId();Channel channel=channelMap.get(channelId);if(channel==null){//通道不存在,抛异常throw new MqException("[Connection] 处理响应失败,响应的通道不存在,channelId="+channelId);}//唤醒等待响应的线程channel.notifyWaitAck(returns);}} catch (IOException | ClassNotFoundException|MqException e) {log.info("处理响应失败");e.printStackTrace();}});}/*** 创建一个channel* channelId随机生成,以"C-"作为前缀*/public Channel createChannel() throws IOException, InterruptedException {String channelId="C-"+ UUID.randomUUID();Channel channel=new Channel(channelId,this);//将channel添加到本地channelMap.put(channelId,channel);//服务器那边也要创建channelboolean ok=channel.createChannel();if(!ok){log.info("服务器创建channel失败!");}else{log.info("创建channel成功!");}return channel;}public void deleteChannel(String channelId) throws IOException, InterruptedException {//先找到本地的channelChannel channel=channelMap.get(channelId);if(channel==null){log.info("删除channel失败,要删除的channel不存在,channelId="+channelId);return;}//这个channel发送删除请求boolean ok=channel.deleteChannel();if(ok){log.info("服务器删除channel成功");}else{log.info("服务器删除channel失败");}}/*** 读取响应*/public Response readResponse() throws IOException {Response response=new Response();response.setType(dataInputStream.readInt());int expectedLength=dataInputStream.readInt();byte[] payLoad=new byte[expectedLength];int actualLength=dataInputStream.read(payLoad);if(actualLength!=expectedLength){throw new IOException("[Connection] 响应的格式不正确,actualLength="+actualLength+",expectedLength="+expectedLength+",type="+response.getType() );}response.setLength(actualLength);response.setPayLoad(payLoad);return response;}/*** 输出请求*/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayLoad());dataOutputStream.flush();}//断开连接public void close() throws IOException {//关闭线程池workThreadPool.shutdownNow();//断开TCP连接dataOutputStream.close();dataInputStream.close();socket.close();channelMap=null;}
}
@Data
@Slf4j
public class Channel {//表示自己是属于哪个连接的private Connection connection;//唯一标识private String channelId;//每个channel对应一个消费逻辑private Consume consume;//每个channel管理自己的rid(发送请求后接收对应的响应)//key为rid,value为响应的payLoadprivate ConcurrentHashMap<String, BasicReturns> returnsMap=new ConcurrentHashMap<>();public Channel(String channelId,Connection connection){this.channelId=channelId;this.connection=connection;}public boolean createChannel() throws IOException, InterruptedException {//构造请求参数BasicArguments arguments=new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payLoad= BinaryTool.toBytes(arguments);//构造一个请求Request request=new Request();request.setType(0x1);request.setLength(payLoad.length);request.setPayLoad(payLoad);//让connection去发送connection.writeRequest(request);//等待服务器的响应BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}private BasicReturns waitReturns(String rid) throws InterruptedException {synchronized (this){while (true){if(returnsMap.get(rid)==null){wait();}return returnsMap.get(rid);}}}/*** 请求的rid随机生成,以"r-"作为前缀* @return*/private String generateRid() {return "r-"+ UUID.randomUUID();}public boolean deleteChannel() throws IOException, InterruptedException{//构造请求参数BasicArguments arguments=new BasicArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x2);request.setLength(payLoad.length);request.setPayLoad(payLoad);//发送请求connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> argument) throws IOException, InterruptedException {//构造请求参数ExchangeDeclareArguments arguments=new ExchangeDeclareArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setType(type);arguments.setDurable(durable);arguments.setAutoDelete(autoDelete);arguments.setArguments(argument);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x3);request.setLength(payLoad.length);request.setPayLoad(payLoad);//发送请求connection.writeRequest(request);//等待响应BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean exchangeDelete(String exchangeName) throws IOException, InterruptedException {//构造请求参数ExchangeDeleteArguments arguments=new ExchangeDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x4);request.setLength(payLoad.length);request.setPayLoad(payLoad);connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String,Object> argument) throws InterruptedException, IOException {QueueDeclareArguments arguments=new QueueDeclareArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());arguments.setArguments(argument);arguments.setAutoDelete(autoDelete);arguments.setExclusive(exclusive);arguments.setQueueName(queueName);arguments.setDurable(durable);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x5);request.setLength(payLoad.length);request.setPayLoad(payLoad);connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean queueDelete(String queueName) throws InterruptedException, IOException {QueueDeleteArguments arguments=new QueueDeleteArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());arguments.setQueueName(queueName);byte[] payLoad=BinaryTool.toBytes(arguments);Request request=new Request();request.setType(0x6);request.setLength(payLoad.length);request.setPayLoad(payLoad);connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException, InterruptedException {//构造请求参数QueueBindArguments arguments=new QueueBindArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x7);request.setLength(payLoad.length);request.setPayLoad(payLoad);//发送请求connection.writeRequest(request);//等待响应BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean queueUnbind(String queueName,String exchangeName) throws IOException, InterruptedException {//构造请求参数QueueUnbindArguments arguments=new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x8);request.setLength(payLoad.length);request.setPayLoad(payLoad);//发送请求connection.writeRequest(request);//等待响应BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean basicPublish(String exchangeName, String routingKey, BasicProperties properties,byte[] body) throws IOException, InterruptedException {//构造请求参数BasicPublishArguments arguments=new BasicPublishArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());arguments.setExchangeName(exchangeName);arguments.setRoutingKey(routingKey);arguments.setBody(body);arguments.setProperties(properties);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0x9);request.setLength(payLoad.length);request.setPayLoad(payLoad);connection.writeRequest(request);//等待响应BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}/*** 对应basicConsume方法,本来还应该有一个参数是consumerTag,* 但是因为consumerTag==channelId,所以就不作为参数传递了,直接在方法内部赋值*/public boolean basicConsume(String queueName,boolean autoAck,Consume consume) throws InterruptedException, IOException, MqException {//一个channel只能有一个回调函数if(consume!=null) {if (this.consume == null) {this.consume = consume;} else {throw new MqException("[Channel] 该通道已有消费逻辑!,channelId=" + channelId);}}//构造请求参数BasicConsumeArguments arguments=new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);arguments.setAutoAck(autoAck);arguments.setQueueName(queueName);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setType(0xa);request.setLength(payLoad.length);request.setPayLoad(payLoad);connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}public boolean basicAck(String queueName,String messageId) throws IOException, InterruptedException {//构造请求参数BasicAckArguments arguments=new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payLoad=BinaryTool.toBytes(arguments);//构造请求Request request=new Request();request.setPayLoad(payLoad);request.setType(0xb);request.setLength(payLoad.length);connection.writeRequest(request);BasicReturns returns=waitReturns(arguments.getRid());return returns.isOk();}/*** 因为Connection中使用了线程池来处理每个响应,所以应该唤醒多线程* @param returns*/public void notifyWaitAck(BasicReturns returns) {synchronized (this){returnsMap.put(returns.getRid(),returns);notifyAll();}}
}

可以另外建一个ConnectionFactory类,用来创建Connection对象.

@Data
public class ConnectionFactory {//指定服务器的Ip和端口private String host;private int port;Connection createConnection() throws IOException {return new Connection(host,port);}}

七. 测试服务器和客户端的通信

下面给出博主进行测试的代码

@SpringBootTest
class ConnectionTest {private BrokerServer server=null;private ConnectionFactory factory=new ConnectionFactory();//因为需要同时开启服务器和客户端,用到多线程private Thread thread=null;@BeforeEachvoid setUp() {DemoApplication.context= SpringApplication.run(DemoApplication.class);thread=new Thread(()->{try {server=new BrokerServer(8060);server.start();} catch (IOException e) {e.printStackTrace();}});thread.start();factory.setHost("127.0.0.1");factory.setPort(8060);}@AfterEachvoid tearDown() throws IOException {server.stop();DemoApplication.context.close();//删文件File dataDir=new File("./data");for(File file:dataDir.listFiles()){if(file.isDirectory()){FileUtils.deleteDirectory(file);}else if(file.isFile()){file.delete();}}}@Testvoid createConnection() throws IOException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);}@Testvoid createChannel() throws IOException, InterruptedException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);Channel channel=connection.createChannel();Assertions.assertNotNull(channel);}@Testvoid deleteChannel() throws IOException, InterruptedException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);Channel channel=connection.createChannel();Assertions.assertNotNull(channel);connection.deleteChannel(channel.getChannelId());connection.close();}@Testvoid testQueue() throws IOException, InterruptedException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);Channel channel=connection.createChannel();Assertions.assertNotNull(channel);boolean ok1=channel.queueDeclare("testQueue",true,true,false,null);boolean ok=channel.queueDelete("testQueue");channel.deleteChannel();connection.close();}@Testvoid testExchange() throws IOException, InterruptedException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);Channel channel=connection.createChannel();Assertions.assertNotNull(channel);boolean ok=channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);Assertions.assertTrue(ok);ok=channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);channel.deleteChannel();connection.close();}@Testvoid testBind() throws IOException, InterruptedException, MqException {Connection connection=factory.createConnection();Assertions.assertNotNull(connection);Channel channel=connection.createChannel();Assertions.assertNotNull(channel);boolean ok=channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);Assertions.assertTrue(ok);ok=channel.queueDeclare("testQueue1",true,false,false,null);Assertions.assertTrue(ok);ok=channel.queueDeclare("testQueue2",true,false,false,null);Assertions.assertTrue(ok);ok=channel.queueBind("testQueue1","testExchange","#.222");Assertions.assertTrue(ok);ok=channel.queueBind("testQueue2","testExchange","*.*");Assertions.assertTrue(ok);ok=channel.basicPublish("testExchange","111.222",null,"hello".getBytes());Assertions.assertTrue(ok);channel.basicConsume("testQueue1", true, new Consume() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties properties, byte[] body) throws IOException, MqException {Assertions.assertArrayEquals("hello".getBytes(),body);// System.out.println(new String(body));}});channel.basicConsume("testQueue2", true, null);}
}

经过测试之后,我们的消费队列就算完成了,当然,也可以创建一个生产者和一个客户端来模拟一下.

先来修改一下DemoApplication类,因为我们要使用Spring来启动程序.

@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context=SpringApplication.run(DemoApplication.class, args);BrokerServer server=new BrokerServer(9090);server.start();}}

再来创建一个demo

public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException, IOException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.createConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consume() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throwsMqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException, IOException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.createConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();//使用默认交换机boolean ok = channel.basicPublish("", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.deleteChannel();connection.close();}
}

八. 总结

上面的过程,我们完成了从BrokerServer的内部实现,到客户端与服务器进行TCP通信的过程.当然,这只是一个简化版的消息队列,对于RabbitMQ来说,还有很多功能没有实现,同学们可自行参考它进行扩展.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/310902.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java EE Servlet之Servlet API详解

文章目录 1. HttpServlet1.1 核心方法 2. HttpServletRequest3. HttpServletResponse 接下来我们来学习 Servlet API 里面的详细情况 1. HttpServlet 写一个 Servlet 代码&#xff0c;都是要继承这个类&#xff0c;重写里面的方法 Servlet 这里的代码&#xff0c;只需要继承…

MySQL基础学习: 由delete和insert操作导致的死锁问题

一、问题复现&#xff1a;表结构 CREATE TABLE user_props (user_id bigint NOT NULL ,prop_key varchar(100) NOT NULL ,prop_value varchar(100) NOT NULL,PRIMARY KEY (user_id,prop_key) )二、死锁测试 &#xff08;1&#xff09;开启两个事务 &#xff08;2&#xff09;…

2024年汉字小达人学校选拔备考——区级样题做一做:看拼音写汉字

昨天&#xff0c;六分成长结合过去几年的汉字小达人比赛的情况&#xff0c;介绍了区级比赛的题型&#xff0c;给出了备考建议。根据部分家长朋友的需求&#xff0c;接下来我来分享一些区级样题的过去真题&#xff0c;让孩子直观感受下题目&#xff0c;并分析这些题目的出题特点…

【数据结构】第2章线性表(头歌习题)【合集】

文章目录 第1关&#xff1a;实现顺序表各种基本运算的算法任务描述编程要求完整代码 第2关&#xff1a;实现单链表各种基本运算的算法任务描述编程要求完整代码 第3关&#xff1a;移除顺序表中所有值等于x的元素任务描述编程要求完整代码 第4关&#xff1a;逆置顺序表任务描述编…

模型 KANO卡诺模型

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。需求分析。 1 卡诺模型的应用 1.1 餐厅需求分析故事 假设你经营一家餐厅&#xff0c;你想了解客户对你的服务质量的满意度。你可以使用卡诺模型来收集客户的反馈&#xff0c;并分析客户的…

Java 运算符

&&运算比||运算的优先级高 C与Java

全方位了解DLP中的DMD芯片-DLP1

理解他人的文章 https://blog.csdn.net/weixin_40814482/article/details/117560420?spm1001.2101.3001.6661.1&utm_mediumdistribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-1-117560420-blog-106184582.235%5Ev40%5Epc_relevant_ant…

Linux中账号和权限管理

目录 一.用户账号和组账号&#xff1a; 1.用户账号类型&#xff1a; 2.组账号类型&#xff1a; 3.系统区别用户的方法 &#xff1a; 4.用户账号文件&#xff1a; 二.Linux中账户相关命令&#xff1a; 1.useradd&#xff1a; 2.passwd&#xff1a; 3.usermod&#xff1a…

Vue Methods 方法的使用(VUE学习5)

一、v-on 指令用于 <div> 元素来侦听"mousemove"事件。 当"mousemove"事件发生时&#xff0c;会调用"mousePos"方法&#xff0c;并且默认使用该方法发送事件对象&#xff0c;这样我们就可以获得鼠标指针的位置 1.VUE页面框架 <!DOCTY…

格密码基础:子格,q-ary垂直格与线性代数

目录 一.写在前面 二.子空间垂直 2.1 理论解释 2.2 举例分析 三. 零空间 3.1 零空间与q-ary垂直格 3.2 零空间与行/列空间 四. 格密码相关 一.写在前面 格密码中的很多基础原语都来自于线性代数的基本概念&#xff0c;比如举几个例子&#xff1a; 格密码中的非满秩格…

【Java | 多线程案例】定时器的实现

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 这里写…

x-cmd pkg | fzf - 命令行模糊查找器

目录 简介首次用户功能特点竞品和相关作品进一步阅读 简介 fzf 是一个由 Go 编写的命令行模糊搜索工具&#xff0c;用于在大量文本数据中快速定位和选择内容&#xff0c;可以与任何列表一起使用&#xff08;e.g. 文件、命令历史记录、进程、主机名、书签、git 提交等&#xff…