参考RabbitMQ实现一个消息队列

文章目录

  • 前言
  • 小小消息管家
    • 1.项目介绍
    • 2. 需求分析
      • 2.1 API
      • 2.2 消息应答
      • 2.3 网络通信协议设计
    • 3. 开发环境
    • 4. 项目结构介绍
      • 4.1 配置信息
    • 5. 项目演示

前言

消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷

在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。

同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。

在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列

所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。

小小消息管家

1.项目介绍

将消息队列分成服务器模块、客户端模块、公共模块来实现。

  1. 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
  2. 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
  3. 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。

在这里插入图片描述

2. 需求分析

由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。

在这里插入图片描述

概念解读:

  • 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
  • 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
  • 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
    系。
  • 消息 (Message): 传递的内容。

2.1 API

我们的服务器提供以下API是西安消息队列的基本功能。

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

2.2 消息应答

被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:

  1. 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
  2. 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。

2.3 网络通信协议设计

我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。

请求和响应的格式如下:
在这里插入图片描述

其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)

其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。

3. 开发环境

数据库:MySQL5.7
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis、SQLite
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10

4. 项目结构介绍

在这里插入图片描述

  • common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
  • demo 创建了一个消费者和生产者用于测试项目。
  • mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
  • mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
  • mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
  • mqserver.mapper:实现对sqlite数据库的操作。

4.1 配置信息

由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.14</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq</artifactId><version>0.0.1-SNAPSHOT</version><name>mq</name><description>mq</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter-test</artifactId><version>2.3.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

application.yml: 配置数据库信息
在这里插入图片描述

5. 项目演示

创建一个生产者客户端向服务器发送一个消息:

package com.example.mq.demo;import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** 这个类用来表示一个生产者.*  通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");//创建一个连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);//得到逻辑链接,这个就类似于socketConnection connection = factory.newConnection();Channel channel = connection.createChannel();// 通过逻辑连接创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello 欢迎消费消息".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

创建一个消费者客户端,订阅消费消息

package com.example.mq.demo;import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** @author zq* @date 2023-08-05 19:29*//** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);//订阅消息,并定义如何消费消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 通过这个循环模拟一直等待消费完生产者生产的所有消息while (true) {Thread.sleep(500);}}
}

实现结果如下 :

在这里插入图片描述
在这里插入图片描述

通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/54761.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java项目作业~ 创建基于Maven的Java项目,连接数据库,实现对站点信息的管理,即实现对站点的新增,修改,删除,查询操作

需求&#xff1a; 创建基于Maven的Java项目&#xff0c;连接数据库&#xff0c;实现对站点信息的管理&#xff0c;即实现对站点的新增&#xff0c;修改&#xff0c;删除&#xff0c;查询操作。 以下是站点表的建表语句&#xff1a; CREATE TABLE websites (id int(11) NOT N…

外卖多门店小程序开源版开发

外卖多门店小程序开源版开发 外卖多门店小程序开源版的开发可以按照以下步骤进行&#xff1a; 确定需求&#xff1a;明确外卖多门店小程序的功能和特点&#xff0c;包括用户注册登录、浏览菜单、下单支付、订单管理等。技术选型&#xff1a;选择适合开发小程序的技术框架&…

在服务器上搭建gitlab

最终效果展示&#xff1a; 官方文档&#xff1a; 安装部署GitLab服务 1.在服务器上下载gitlab wget https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/gitlab-ce-12.9.0-ce.0.el7.x86_64.rpm rpm -ivh gitlab-ce-12.9.0-ce.0.el7.x86_64.rpm 2.编辑站点位置 vim …

python人工智能可以干什么,python人工智能能干什么

大家好&#xff0c;给大家分享一下python做人工智能需要什么水平&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 人工智能包含常用机器学习和深度学习两个很重要的模块&#xff0c;而python拥有matplotlib、Numpy、sklearn、keras等大量的…

Spring接口ApplicationRunner的作用和使用介绍

在Spring框架中&#xff0c;ApplicationRunner接口是org.springframework.boot.ApplicationRunner接口的一部分。它是Spring Boot中用于在Spring应用程序启动完成后执行特定任务的接口。ApplicationRunner的作用是在Spring应用程序完全启动后&#xff0c;执行一些初始化任务或处…

基于 Flink Paimon 实现 Streaming Warehouse 数据一致性管理

摘要&#xff1a;本文整理自字节跳动基础架构工程师李明&#xff0c;在 Apache Paimon Meetup 的分享。本篇内容主要分为四个部分&#xff1a; 背景 方案设计 当前进展 未来规划 点击查看原文视频 & 演讲PPT 一、背景 ​ 早期的数仓生产体系主要以离线数仓为主&#xf…

javaWeb项目--二级评论完整思路

先来看前端需要什么吧&#xff1a; 通过博客id&#xff0c;首先需要显示所有一级评论&#xff0c;包括评论者的头像&#xff0c;昵称&#xff0c;评论时间&#xff0c;评论内容 然后要显示每个一级评论下面的二级评论&#xff0c;包括&#xff0c;评论者的头像&#xff0c;昵称…

Linux6.32 Kubernetes kubeadm部署

文章目录 计算机系统5G云计算第三章 LINUX Kubernetes kubeadm部署一、kubeadm搭建 Kubernetes v1.20&#xff08;一主两从&#xff09;1.环境准备2.所有节点安装docker3.所有节点安装kubeadm&#xff0c;kubelet和kubectl4.部署K8S集群 二、kubeadm搭建 Kubernetes v1.20&…

x光下危险物品/违禁物品目标识别的模型训练与推理代码

前言 1.安检在公共场合的重要性不言而喻&#xff0c;保障群众人身安全是其首要任务。在各种场合&#xff0c;安检都是不可或缺的环节。x光安检机作为安检的重要工具&#xff0c;尽管其具有人工监控判断成像的特性&#xff0c;但是其局限性也十分明显。 为了解决这一局限性为出…

【MySQL】事务的多版本并发控制(MVCC)

目录 一、数据库并发的三种场景二、MVCC2.1 三个记录隐藏字段2.2 undo log&#xff08;撤销日志&#xff09;2.3 模拟MVCC2.3.1 模拟更新&#xff08;update&#xff09;2.3.1 模拟删除&#xff08;delete&#xff09;2.3.1 模拟插入&#xff08;insert&#xff09;2.3.1 模拟查…

阿里云率先荣获容器集群稳定性先进级认证

7 月 25 日&#xff0c;由中国信通院发起的“2023 稳保体系”评估结果在可信云大会现场公布&#xff0c;阿里云容器服务 ACK 成为首批通过“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c;并荣获“先进级”认证。 云原生技术正在激活应用构建新范式&#xff0c;构…

SPINN:基于设备和云的神经网络协同递进推理

SPINN&#xff1a;基于设备和云的神经网络协同递进推理 论文标题&#xff1a;SPINN: synergistic progressive inference of neural networks over device and cloud 原文链接&#xff1a;https://dl.acm.org/doi/10.1145/3372224.3419194 论文动机 现代CNN过多的计算需求&am…