RabbitMQ工作模式2 整合springboot 和MQ高级特性

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码");connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mydirect1",false,false,false,null);channel.queueDeclare("mydirect2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mydirect1","ex_direct","error");channel.queueBind("mydirect2","ex_direct","test");channel.queueBind("mydirect2","ex_direct","test2");//交换机     routingkey     根据routingkey在队列上发布消息channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());}
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号"); connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号); connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("mq:-----aaa"+s);}};channel.basicConsume("mydirect1",true,consumer);}
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {@Testpublic void ccc() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete,  受否自动删除* Map<String, Object> arguments  参数*/channel.queueDeclare("mytopics1",false,false,false,null);channel.queueDeclare("mytopics2",false,false,false,null);//绑定交换机和队列   设置routingkeychannel.queueBind("mytopics1","ex_topics","test.#");channel.queueBind("mytopics2","ex_topics","*.aaa");channel.queueBind("mytopics2","ex_topics","test.*");//交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());}
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicMqConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name}")private String QUEUENAME1;@Value("${mq.queue.name}")private String QUEUENAME2;//创建交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//创建队列@Bean("queue1")public Queue getQueue1(){Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue1;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue2;}//绑定交换机和队列@Bean("binding1")public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();return binding1;}@Bean("binding2")public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();return binding2;}
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {@RabbitListener(queues = "test_queue2")public void xxx(Message message){byte[] body = message.getBody();String s = new String(body);System.out.println(s);}
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("发送消息成功");}else {System.out.println("发送消息失败,原因:"+s);}}});rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");}
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){//  消息回退rabbitTemplate.setMandatory(true);//rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "test_queue2")public void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(2000);byte[] body = message.getBody();String s = new String(body);System.out.println(s);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println(1/0);channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("拒绝签收");channel.basicNack(deliveryTag,true,true);}}
}

启动测试

有异常拒绝签收

无异常签收成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/223289.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高端影像仪:打破微小产品测量局限

在现代工业生产中&#xff0c;影像仪以CCD数位影像为基石&#xff0c;将计算机屏幕测量技术与空间几何运算的能力融为一体&#xff0c;可以用于测量微小产品的各种尺寸和形状&#xff0c;为生产过程中的质量控制提供重要的参考依据。 影像仪产品内置高精度光学电动双倍镜头&am…

【Java数据结构 -- 包装类和泛型】

包装类和泛型 1. 包装类1.1 基本数据类型和对应的包装类1.2 装箱和拆箱1.3 自动装箱和自动拆箱1.4 自动装箱实际上是调用了valueOf&#xff08;&#xff09;1.5 Integer包装类赋值注意点 2 什么是泛型3 引出泛型4 泛型的使用4.1 语法4.2 类型推导 5 裸类型6 泛型如何编译6.1 擦…

Java远程连接本地开源分布式搜索引擎ElasticSearch

文章目录 前言1. Windows 安装 Cpolar2. 创建Elasticsearch公网连接地址3. 远程连接Elasticsearch4. 设置固定二级子域名 前言 简单几步,结合Cpolar内网穿透工具实现Java远程连接操作本地Elasticsearch。 什么是elasticsearch&#xff1f;一个开源的分布式搜索引擎&#xff0…

[C++]六大默认成员函数详解

☃️个人主页&#xff1a;fighting小泽 &#x1f338;作者简介&#xff1a;目前正在学习C和Linux &#x1f33c;博客专栏&#xff1a;C入门 &#x1f3f5;️欢迎关注&#xff1a;评论&#x1f44a;&#x1f3fb;点赞&#x1f44d;&#x1f3fb;留言&#x1f4aa;&#x1f3fb; …

Python实现艺术设计?提取图片中颜色并绘制成可视化图表,从大师作品中提取配色方案

文章目录 导入模块并加载图片提取颜色并整合成表格绘制图表实战环节关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼职渠…

1.ORB-SLAM3中如何保存多地图、关键帧、地图点到二进制文件中

1 保存多地图 1.1 为什么保存(视觉)地图 因为我们要去做导航&#xff0c;导航需要先验地图。因此需要保存地图供导航使用&#xff0c;下面来为大家讲解如何保存多地图。 1.2 保存多地图的主函数SaveAtlas /*** brief 保存地图* param type 保存类型*/ void System::SaveAtlas(…

Java - Stream Filter 多条件筛选过滤

Java Stream流中Filter用于通过设置的条件过滤出元素 &#xff0c;示例如下&#xff1a; List strings Arrays.asList(“abc”, “”, “bc”, “efg”, “abcd”,"", “jkl”);List filtered strings.stream().filter(string -> !string.isEmpty()).collect(C…

生物神经系统的基本原理 神经元Neuron

生物神经系统的基本原理涉及一系列复杂的生物学和生理学机制&#xff0c;主要可以分为以下几个方面&#xff1a; 神经元与突触&#xff1a;神经系统的基本单位是神经元&#xff0c;它们通过突触连接彼此。神经元接收并处理来自身体其他部分或环境的信息&#xff0c;然后通过电信…

用函数初始化数组

将数组全部初始化为相同值 对于一般情况 一般是用函数&#xff0c;传什么数就初始化为什么数 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> void init(int arr[], int len, int num) {int i;for (i 0; i < len; i){arr[i] num;} } int main() {int arr[…

快速开发出一个公司网站

问题描述&#xff1a;参加一个创业活动&#xff0c;小组要求做一个公司网站&#xff0c;简单介绍一下自己公司的业务。需要快速完成。 问题解决&#xff1a;从网上找一个网站模板&#xff0c;类似于做PPT&#xff0c;搭建一个网站即可。 这里推荐的是京美建站、wordpress、he…

第1章 爬虫基础

目录 1. HTTP 基本原理1.1 URI 和 URL1.2 HTTP 和 HTTPS1.3 请求1.3.1 请求方法1.3.2 请求的网址1.3.3 请求头1.3.4 请求体 1.4 响应1.4.1 响应状态码1.4.2 响应头1.4.3 响应体 2. Web 网页基础2.1 网页的组成2.1.1 HTML2.1.2 CSS2.1.3 JavaScript 2.2 网页的结构2.3 节点树及节…

PT里如何针对某个模块设置false path

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 如题&#xff0c;这个问题实际上讲的是get_cells的用法&#xff0c;我们要抓取某个模块内的全部cell&#xff0c;在ICC2里可以get_flat_cells xx/xx/module_name*&#xff0c;但…