目录
一、设计数据结构
二 、实现管理方法
🍅 1、实现交换机管理
🍅 2、实现队列管理
🍅 3、实现绑定管理
🎈插入绑定操作
🎈删除绑定
🍅 4、进行消息管理
🍅 5、发送消息到指定队列
🍅 6、表示“未被确认”的消息管理
🍅 7、从硬盘上读取数据
三、测试交换机操作
🍅 1、“准备工作”和收尾工作
🍅 2、测试 交换机
🍅 3、测试 队列
🍅 4、测试 绑定
🍅 5、测试 消息的增删查
🍅 6、测试 发送消息
🍅 7、测试 “未确认”的信息
🍅 8、测试 从硬盘上读取数据
四、小结
一、设计数据结构
对于MQ来说,主要是以内存存储数据为主,硬盘存储数据为辅。
关于内存数据管理,作出如下的数据结构:
交换机:使用HashMap,key是name, value是Exchange对象
队列:使用HashMap,key是name,value是MSGQueue对象
绑定:使用嵌套的HashMap,key是exchangeName,value是一个HashMap(其中key是queueName,value是Binding对象)
消息:使用HashMap,key是messageId,value是Message
表示队列和消息之间的关联:使用嵌套的HashMap,key是queueName,value是一个LinkedList。LinkedList中每个元素又是一个Message对象。
表示“未被确认”的消息:关于未被确认:存储了当前队列中哪些消息被消费者取走了,但是还没有应答。使用嵌套的HashMap,key是queueName,value是HashMap(其中key是messageId,value是Message对象)。后续实现消息确认的逻辑,需要根据ack响应的内容,会提供一个messageId,根据该messageId把结构中的Message对象找到并且移除
这里有两种应答模式(ACK):
1.自动应答,消费者取了元素,该消息就算是被应答,就可以被删除了
2.手动应答,消费者取了元素,该消息还不断被应答, 需要消费者主动再调用一个basicAck方法,此时才被认为是真的应答了,才能删除这个消息。
由于, 这个类会涉及到多线程的请求,所以这里的HashMap都统一使用ConcurrentHashMap,因为HashMap是线程不安全的,ConcurrentHashMap相对而来线程安全,所以上面说的使用HashMap都使用ConcurrentHashMap
public class MemoryDataCenter {
// 交换机:key是exchangeMame,value是exchange对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// 队列:key表示queueName,value表示MSGQueue对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// 绑定:第一个key表示exchangeName,第二个key表示queueName,value都表示Binding对象private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
// 消息:key是messageId,value是Message对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// 表示队列和消息之间的关联:key表示queueName,value表示一个Message链表,里面存放的是Message对象private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 表示“未被确认”的消息:第一个key表示queueName,第二个key表示messageId,value表示Message对象private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}
二 、实现管理方法
🍅 1、实现交换机管理
public void insertExchange(Exchange exchange){exchangeMap.put(exchange.getName(),exchange);System.out.println("[MemoryDataCenter]新交换机添加成功!exchangeName = " + exchange.getName());}public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter]交换机删除成功! exchangeName = " + exchangeName);}
🍅 2、实现队列管理
public void insertQueue(MSGQueue queue){queueMap.put(queue.getName(),queue);System.out.println("[MemoryDataCenter]队列删除成功!queueName = " + queue.getName());}public MSGQueue getQueue(String queueName){return queueMap.get(queueName);}public void deleteQueue(String queueName){queueMap.remove(queueName);System.out.println("[MemoryDataCenter]删除队列成功!queueName = " + queueName);}
🍅 3、实现绑定管理
🎈插入绑定操作
注意点:
(1)这里是嵌套的HashMap,所以再插入钱,要先使用exchangeName查找对应的哈希表是否存在,不存在就创建。
(2)线程安全问题。插入要先判断绑定是否存在,不存在才插入。不是原子操作,要加锁。
🎈得到绑定
注意:这里有两个版本
(1)根据exchangeName和queueName确定一个binding
(2)根据exchangeName获取到所有的binding
public Binding getBinding(String exchangeName,String queueName){ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null){return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}
🎈删除绑定
public void deleteBinding(Binding binding) throws MqException {
// 现根据exchangeName找到所有的bindingConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null){
// 该交换机没有绑定任何队列,报错throw new MqException("[MemoryDataCenter]绑定不存在!exchangeName = " + binding.getExchangeName()+ ",queueName" + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter]绑定删除成功!exchangeName = " + binding.getQueueName()+ ",queueName = " + binding.getQueueName());}
🍅 4、进行消息管理
// 添加消息public void addMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter]新消息添加成功!messageId = " + message.getMessageId());}// 根据id查询消息public Message getMessage(String messageId){return messageMap.get(messageId);}// 根据id删除消息public void removeMessage(String messageId){messageMap.remove(messageId);System.out.println("[MemoryDataCenter]消息被移除!messageId = " + messageId);}
🍅 5、发送消息到指定队列
// 发送消息到指定队列public void sendMessage(MSGQueue queue,Message message){
// 把消息放到对应的队列数据结构中
// 现根据队列的名字,找到该队列对应的消息链表LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k -> new LinkedList<>());
// 把数据加到messages里面synchronized (messages){messages.add(message);}
// 把该消息往消息中心中插入,假设message已经在消息中心存在,重复插入也没有关系
// 主要就是相同messageId,对应的message内容一定是一样的addMessage(message);System.out.println("[MemoryDataCenter]消息被投递到到队列中! messageId = " + message.getMessageId());}// 从队列中取消息public Message pollMessage(String queueName){
// 根据队列名,查找对应的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);
// 如果没找到,说明队列中没有任何消息if (messages == null) {return null;}synchronized (messages){if (messages.size() == 0){return null;}
// 链表中铀元素,就进行头删Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter]消息从队列中取出!messageId = " + currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中的消息的个数public int getMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if (messages == null){
// 队列中没有消息return 0;}synchronized (messages){return messages.size();}}
🍅 6、表示“未被确认”的消息管理
// 添加未确认的消息public void addMessageWaitAck(String queueName,Message message){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageHashMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter]消息进入待确认队列!messageId = " + message.getMessageId());}// 删除之前未确认,但是现在已经确认的消息public void removeMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if (messageHashMap == null){return;}messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter]消息从待确认队列删除!messageId = " + messageId);}// 获取指定的未确认的消息public Message getMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String ,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if (messageHashMap == null){return null;}return messageHashMap.get(messageId);}
🍅 7、从硬盘上读取数据
把之前硬盘中持久化存储的各个维度的数据都恢复到内存中来,
主要是以下几步:
(1)恢复所有的交换机数据
(2)恢复所有的队列
(3)恢复所有的绑定数据
(4)恢复所有的消息数据
注意:关于“未被确认的消息”:“未被确认的消息”是内存中的数据,不需要从硬盘上恢复,一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”,就会恢复成“未被取走的消息”,这些消息在硬盘在硬盘在硬盘上存储的时候,就当作是“未被”取走
// 从硬盘上读取数据,
// 把之前硬盘中持久化存储的各个维度的数据都恢复到内存中来public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 先清空之前的数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();// 1.恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(),exchange);}
// 2.恢复所有的队列List<MSGQueue> queues = diskDataCenter.selectAllQueues();for (MSGQueue queue : queues) {queueMap.put(queue.getName(),queue);}
// 3.恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for (Binding binding : bindings){ConcurrentHashMap<String ,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}
// 4.恢复所有的消息数据
// 遍历所有队列,再根据每个队列的名字获取到有的消息for (MSGQueue queue :queues){LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(),messages);for (Message message : messages){messageMap.put(message.getMessageId(),message);}}}
三、测试交换机操作
🍅 1、“准备工作”和收尾工作
@SpringBootTest
public class MemoryDataCenterTests {private MemoryDataCenter memoryDataCenter = null;@BeforeEachpublic void setUp(){memoryDataCenter = new MemoryDataCenter();}@AfterEachpublic void tearDown(){memoryDataCenter =null;}
}
创建以一个测试交换机和队列,以便于后面使用:
// 创建测试交换机private Exchange createTestExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);return exchange;}// 创建一个测试队列private MSGQueue createTestQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);return queue;}// 创建一个测试消息public Message createTestMessage(String content){Message message = Message.createMessageWithId("testRoutinKey",null,content.getBytes());return message;}
🍅 2、测试 交换机
// 针对交换机进行测试@Testpublic void testExchange(){
// 1.先构造一个交换机并且插入Exchange expectedExchange = createTestExchange("testExchange");memoryDataCenter.insertExchange(expectedExchange);
// 2.查询这个交换机,比较结果是否一致Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange,actualExchange);
// 3.删除这个交换机memoryDataCenter.deleteExchange("testExchange");
// 4.再次查找actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertNull(actualExchange);}
🍅 3、测试 队列
@Testpublic void testQueu(){
// 1、创建一个队列并且插入MSGQueue expectedQueue = createTestQueue("testQueue");memoryDataCenter.insertQueue(expectedQueue);
// 2、查询该队列并且比较MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue,actualQueue);
// 3、删除这个队列memoryDataCenter.deleteQueue("testQueue");
// 4、再次查询看是否能够查询到actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertNull(actualQueue);}
// 针对绑定进行测试@Testpublic void testBinding() throws MqException {Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");memoryDataCenter.insertBinding(expectedBinding);Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding,actualBinding);ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(1,bindingMap.size());Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));memoryDataCenter.deleteBinding(expectedBinding);actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertNull(actualBinding);}
🍅 4、测试 绑定
// 针对绑定进行测试@Testpublic void testBinding() throws MqException {Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");memoryDataCenter.insertBinding(expectedBinding);Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding,actualBinding);ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(1,bindingMap.size());Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));memoryDataCenter.deleteBinding(expectedBinding);actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertNull(actualBinding);}
🍅 5、测试 消息的增删查
@Testpublic void testMessage(){Message expectedMessage = createTestMessage("testMessage");memoryDataCenter.addMessage(expectedMessage);Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);memoryDataCenter.removeMessage(expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}
🍅 6、测试 发送消息
// 测试发送消息@Testpublic void testSendMessage(){
// 1、创建一个队列,创建十条消息,把这些消息都插入到队列中MSGQueue queue = createTestQueue("testQueue");List<Message> expectedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + 1);memoryDataCenter.sendMessage(queue,message);expectedMessages.add(message);}
// 2、从队列中取出这些消息List<Message> actualMessages = new ArrayList<>();while (true){Message message = memoryDataCenter.pollMessage("testQueue");if (message == null){break;}actualMessages.add(message);}
// 3、比较取出的消息和之前的消息是否一致Assertions.assertEquals(expectedMessages.size(),actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Assertions.assertEquals(expectedMessages.get(i),actualMessages.get(i));}}
🍅 7、测试 “未确认”的信息
@Testpublic void testMessageWaitAck(){Message expectedMessage = createTestMessage("expectedMessage");memoryDataCenter.addMessageWaitAck("testQueue",expectedMessage);Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);memoryDataCenter.removeMessageWaitAck("testQueue",expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}
🍅 8、测试 从硬盘上读取数据
测试这个用例,主要分为3步:
(1)在硬盘上构造好数据
(2)执行恢复操作
(3)对比结果
@Testpublic void testRecovery() throws IOException, MqException, ClassNotFoundException {// 后续操作雨MyBatis有关,所以需要启动SpringApplicationTigerMqApplication.context = SpringApplication.run(TigerMqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange expectedExchange = createTestExchange("testExchange");diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue = createTestQueue("testQueue");diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage = createTestMessage("testContent");diskDataCenter.sendMessage(expectedQueue, expectedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());Message actualMessage = memoryDataCenter.pollMessage("testQueue");Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());// 4. 清理硬盘的数据, 把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录).TigerMqApplication.context.close();File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}
四、小结
这一块的内容,主要就是借助内存中的一系列数据结构,保存、管理交换机、队列、绑定、消息,使用到了哈希表、链表、嵌套的结构等。
这里还频繁的使用了加锁的操作,具体场景考虑是否要加锁(特别是有插入操作)。