【RabbitMQ 实战】09 客户端连接集群生产和消费消息

一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法
3分钟部署一个RabbitMQ集群
上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
docker compse文件如下

version: '3'services:stats:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=stats- RABBITMQ_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '15672:15672'- '5602:5672'volumes:- 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'queue-disc1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-disc- RABBITMQ_NODE_NAME=rabbit@queue-disc1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5612:5672'volumes:- 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'queue-ram1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-ram- RABBITMQ_NODE_NAME=rabbit@queue-ram1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5622:5672'volumes:- 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'volumes:rabbitmqstats_data:driver: localrabbitmqdisc1_data:driver: localrabbitmqram1_data:driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:port: 8080
spring:application:name: rabbitmq-demo#配置rabbitMq 服务器rabbitmq:
#单节点直接可以写host和port
#    host: 192.168.56.201
#    port: 5672#集群连接写ip和端口addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622username: userpassword: bitnami#虚拟hostvirtual-host: virtual01template:mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功publisher-returns: true #是否开启生产者returnslistener:simple:acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法prefetch: 10 #每个消费者可拉取的,还未ack的消息数量concurrency: 3 #消费端(每个Listener)的最小线程数max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 正常发送并被broker接收* @return*/@RequestMapping("send")public String send() {for (int i = 0; i < 10; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);}return "ok";}/*** 设置一个非法的路由键,模拟消息被broker退回的情况,前提是* spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息* <p>* spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功** @return*/@RequestMapping("send-return")public String sendAndReturn() {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId("111");orderInfo.setProductName("小米13");//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);//下面这个RoutingKey是没有绑定的,所以发不出去rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);return "ok";}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData == null) {return;}String messageId = correlationData.getId();if (ack) {log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);} else {log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
}

消费者

@Slf4j
@Component
public class RabbitOrderConsumer {private static final String EXCHANGE_NAME = "my_exchange";private static final String QUEUE_NAME = "my_queue";private static final String ROUTING_KEY = "my_routing";@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);channel.basicAck(tag, false);}
}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/
在这里插入图片描述

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点在这里插入图片描述
查看队列的情况,队列是在rabbit@stats节点上
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/127788.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM内存管理

文章目录 一、JVM自动内存管理1、java运行时数据区1.1、程序计数器1.2、虚拟机栈1.3、本地方法栈1.4、java堆1.5、方法区1.6、直接内存 二、对象已死的判定算法三、垃圾收集算法1.标记-清除算法2.标记-复制算法3.标记-整理算法4.分代收集算法 四、垃圾收集器1.Serial收集器2.Pa…

7346-2015 控制电机基本外形结构型式

声明 本文是学习GB-T 7346-2015 控制电机基本外形结构型式.pdf而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了控制电机的机座号、外形及安装尺寸、轴伸型式、出线方式、标记及铭牌。 本标准适用于各类控制电机(以下简称电机),其…

大语言模型之十六-基于LongLoRA的长文本上下文微调Llama-2

增加LLM上下文长度可以提升大语言模型在一些任务上的表现&#xff0c;这包括多轮长对话、长文本摘要、视觉-语言Transformer模型的高分辨4k模型的理解力以及代码生成、图像以及音频生成等。 对长上下文场景&#xff0c;在解码阶段&#xff0c;缓存先前token的Key和Value&#…

tomcat安装,创建web后端项目,部署项目过程

1&#xff0c;安装服务器&#xff0c;使用 Apache免费提供的服务器TomCat&#xff0c;注意JDK版本。 TomCat官方站点 文件解压目录。 启动服务器&#xff1a;bin目录下点击startup.bat&#xff0c;出现小黑框&#xff0c;浏览器默认访问http://127.0.0.1:8080/ 关闭服务器&…

设计模式_模板方法模式

模板方法模式 前言 行为型设计模式 关注对象和行为的分离。 关于父类与子类 调用时候 具体调用的哪一个&#xff1f; 普通方法调用编译时决定左边决定抽象/虚方法调用运行时决定右边决定 介绍 设计模式定义案例模板方法模式父类 定义了业务流程&#xff0c;其中一部分 延…

2.2.3 vim操作合集

1 vim VIM 是 Linux 系统上一款文本编辑器,学习 VIM 最好的文档,应该是阅读学习 VIM 的帮助文档,可以使用本地的帮助文件(vim--->:help),或者使用在线帮助文档。同时针对vim的使用,相应的相书籍也很多,如下 2 vim操作模式 命令模式:默认模式,该模式下可以移动光标…

One Thread One Loop主从Reactor模型⾼并发服务器

One Thread One Loop主从Reactor模型⾼并发服务器 文章目录 One Thread One Loop主从Reactor模型⾼并发服务器一些补充HTTP服务器Reactor 模型eventfd通用类Any 目标功能模块划分&#xff1a;SERVER模块Buffer模块&#xff1a;编写思路&#xff1a;接口设计&#xff1a;具体实现…

“新”国货@2023:质疑、回归与转机

【潮汐商业评论/ 原创】 “我是真爱买国货&#xff0c;上到冰箱电视洗衣机&#xff0c;这样的家电大件儿&#xff0c;下到日化洗护用品&#xff0c;这样的日常小件儿&#xff0c;统统首选国货品牌&#xff0c;也只考虑国货品牌。”此时此刻&#xff0c;Grace正与大家分享着自己…

点餐小程序实战教程06-首页开发

用户注册功能开发好了之后&#xff0c;我们就要开发小程序&#xff0c;首先我们是规划小程序的功能模块&#xff0c;我们一共是四个模块&#xff0c;分别是首页、订单、消息和我的。 首页我们主要是点餐的功能&#xff0c;可以选择菜品&#xff0c;加入到购物车&#xff0c;然…

【微信小程序开发】宠物预约医疗项目实战-登录实现

【微信小程序开发】宠物预约医疗项目实战-登录实现 第二章 宠物预约医疗项目实战-注册实现 文章目录 【微信小程序开发】宠物预约医疗项目实战-登录实现前言一、打开项目文件二、编写代码2.1 wxss代码编写2.2 wxml代码编写2.3 js代码编写2.3.1 登录接口获取&#xff1a; 2.4 j…

【uniapp】小程序开发6:自定义状态栏

一、自定义状态栏 可以设置某个页面的状态栏自定义或者全局状态栏自定义。 这里以首页状态栏为例。 1&#xff09;pages.json 中配置"navigationStyle": "custom"&#xff0c;代码如下&#xff1a; {"pages": [ {"path": "pa…

数据结构与算法(六):堆

参考引用 Hello 算法 Github&#xff1a;hello-algo 1. 堆 堆&#xff08;heap&#xff09;是一种满足特定条件的完全二叉树&#xff0c;主要可分为下图所示的两种类型 小顶堆 min heap&#xff1a;任意节点的值 ≤ 其子节点的值大顶堆 max heap&#xff1a;任意节点的值 ≥ 其…