微服务08-认识和使用SpringAMQP

1.AMQP的认识

1.1 介绍

AMQP是什么?看完你就知道了_hello_读书就是赚钱的博客-CSDN博客_amqp

在这里插入图片描述
好处:
什么connection:消息队列的连接、channel:服务发送接收消息的通道、Queue:消息队列——>这些你都不需要自己编写

工作过程:
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

深入理解:
1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除

4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

1.2 实战:消息发送与消息接收

消息发送
在这里插入图片描述

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageSimpleQueue(){String queueName="simple.queue";String messgae="Hello,SpringAMQP";rabbitTemplate.convertAndSend(queueName,messgae);}
}

可以发现队列simple.queue中 多了一条消息
在这里插入图片描述

AMOP如何发送消息?

引入amqp的starter依赖,然后我们配置RabbitMQ的地址,最后利用RabbitTemplate中的convertAndSend方法发送消息

在这里插入图片描述

消息接收:
在这里插入图片描述

1、首先进行yaml配置,RabbitMQ连接信息

2、新建一个组件,里面编写消费逻辑(利用@RabbitListener监听队列,只要队列一有消息就进行接收)

 
@Component
public class SpringRabbitListener {/*** @RabbitListener监听队列* @param msg*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到的simple.queue中消息为:"+msg);}
}

在这里插入图片描述
在这里插入图片描述

注意:消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能

2.WorkQueue

场景:
采用多个工作队列处理消息,避免消息堆积;

在这里插入图片描述

2.1 实战:多个消费者绑定一个队列

在这里插入图片描述

1.首先配置类中把队列支棱起来,Publisher发布消息给消息队列simple.queue

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageWorkQueue() throws InterruptedException {String queueName="simple.queue";String messgae="华为手机,遥遥领先--";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,messgae+i);Thread.sleep(20);}}

2.consumer服务中定义两个消费者,利用@RabbitListener监听队列

@Component
public class SpringRabbitListener {/*** 我们的思想是:两个消费者,一个1s能够消费50条,一个消费1s5条,一共50条信息,能者多劳* 现实:默认是平均分配->造成处理消息时间过长(因为第二个消费者处理消息很慢)* @param msg* @throws InterruptedException*/@RabbitListener(queues = "simple.queue")public void listenWorkQueue(String msg) throws InterruptedException {System.out.println("消费者1......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());//每s50个消息Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2.......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());Thread.sleep(200);}

在这里插入图片描述
我们可以采用prefetch设置消费者预取的消息数量,这样就不会出现平均分配消息的现象了——>而是根据每个消费者的能力来处理消息

在这里插入图片描述

3.Fanout Exchange

前言:

发布(Publish)和订阅(Subscribe)

因为一个消息只可能被一个消费者消费,消费完就会删除该消费,所以我们需要利用发布和订阅来进行处理,这样就可以将同一个消息发送给多个消费者——>
利用exchange交换机将消息发送给多个队列,然后队列是可以将消息进行储存的,发给多个消费者

在这里插入图片描述
常见的exchange类型:Fanout(广播)、Direct(路由)、Topic(话题)

注意:exchange负责消息路由,而不是储存,路由失败则会造成消息丢失(也就是说路由到哪个消息队列)。

3.1 介绍

exchange: 只能作为消息的转发,记住不能作为消息的缓存,如果路由失败消息就丢了

交换机exchange:通过FanoutExchange返回交换机

消息队列Queue:通过Queue返回消息队列

将消息队列与交换机进行绑定Bingding:可以通过Binding中的bind方法进行绑定

3.2 实战

在这里插入图片描述

1:
在这里插入图片描述
consumer服务中声明交换机和队列,将队列与交换机进行绑定(利用Binding中bind方法)

@Configuration
public class FanoutConfig {//itcast.fanout交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//fanout.queue1任务队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//消息队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//将消息队列与交换机进行绑定:类型和名称要保持一致,不然无法注入@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//第二个消息队列与交换机进行绑定@Beanpublic Binding fanouting2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

2.在这里插入图片描述

@Component
public class SpringRabbitListener {/*** 以下是利用交换机通知信息给处理消息的消费者*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("消费者接收到的fanout.queue1的消息为:"+msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("消费者收到fanout.queue2的消息为:"+msg);}
}

3.在这里插入图片描述

@RunWith(SpringRunner.class)
@EnableRabbit
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送信息给交换机exchange*/@Testpublic void testSendFanoutExchange(){//1.交换机名称String exchangeName="itcast.fanout";//2.消息String message="hello,every one!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}
}

在这里插入图片描述

3.3 总结

在这里插入图片描述

4.Direct Exchang

DirectExchang:可以根据规则路由到指定的消息队列

4.1实战

在这里插入图片描述
1.在consumer服务中声明Exchange交换机与不同key之消息队列的绑定,利用@RabbitListener->还监听了一波消息队列

对比与之前的Fanout,它们的队列以及交换机是在一个配置类中定义并绑定的,利用了Binding,并注入容器

消费者代码
在这里插入图片描述

提供者代码

@Testpublic void testSendDirectExchange() {//交换机名称String exchangeName = "denghao.direct";//消息String message = "hello, blue!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);}

个人编写与老师资料略有不同 敬请谅解
在这里插入图片描述

Direct交换机与Fanout交换机之间的差异+@RabbitListener注解中声明队列与交换机的常见注解
在这里插入图片描述

5.TopicExchange在这里插入图片描述

为什么要用Topic交换机,因为以后有的时候,比如地区天气新闻,有“长沙的,成都的,四川的,重庆的。。。。。。 如果使用DirectExchange的话,那么会要定义很多很多的key 而用TopicExchange的话 可以直接用 #.news 表示接收所有地方的天气新闻”

5.1 实战

介绍:

两个队列:一个是中国的新闻,一个所有的新闻消息;

两个消费者分别监听这两个队列

Publisher将消息发送给交换机Exchange,然后交换机根据key(+通配符)->决定将消息路由到哪个队列中

区别:

与DirectExchange区别就是,DirectExchange没有通配符,TopicExchange有通配符:xxx.xxx

最明显的地方就是:队列与交换机进行绑定时,key不一样

在这里插入图片描述

消费者的监听器

  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");}

服务者测试方,发布消息

@Testpublic void testSendTopicExchange() {//交换机名称String exchangeName = "denghao.topic";//消息String message1 = "红色天气预报警告,马上长沙迎来大升温,明天提升六度,请注意,不要感冒,不要鼻炎!!!";String message2 = "重磅新闻来袭!!!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"china.news",message1);rabbitTemplate.convertAndSend(exchangeName,"jp.news",message2);}

结果
在这里插入图片描述
在这里插入图片描述

6. 消息转换器

场景: 因为我们Publisher服务者发送消息到消息队列(将消息序列化为二进制),消息是乱码的——>因为消息类型content-type是java序列化变来的类型:缺点:消息体大、传输速度慢、安全问题、占内存;

在这里插入图片描述
处理:

1.我们定义一个bean,利用boot的自动配置思想,将默认的替换

@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}

2.然后导入依赖,Jacksonxxxx

  <!--序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

3.Publisher发送消息给到消息队列

 //Publisher发送消息给到队列object.queue@Testpublic void testSendObjectQueue(){HashMap<Object, Object> msg = new HashMap<>();msg.put("姓名","柳岩");msg.put("年龄",38);//发送消息到队列rabbitTemplate.convertAndSend("object.queue",msg);}

在这里插入图片描述

4.消费者接收消息

@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("消费者接收到object.queue的消息:【"+ msg +"】");}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/107933.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机视觉 | 目标检测】干货:目标检测常见算法介绍合集(二)

文章目录 十六、EfficientDet十七、Deformable DETR十八、YOLOX十九、Sparse R-CNN二十、Contour Proposal Network二十一、VarifocalNet二十二、Libra R-CNN二十三、Stand-Alone Self Attention二十四、ThunderNet二十五、Hierarchical Transferability Calibration Network二…

【杂记】git管理工具的相关应用

这里记录一些用git管理工具进行开发的命令&#xff0c;便于自己查看&#xff0c;我认为下面两篇博客写的很详细&#xff0c;但是为了自己方便查看&#xff0c;所以自己写了一些命令供自己进一步理解。gitee相对git来说更方便一些&#xff08;毕竟国内的不用担心墙&#xff09;&…

UMA 2 - Unity Multipurpose Avatar☀️六.Advanced Occlusion高级遮挡功能解决皮肤服饰穿模

文章目录 🟥 本节功能效果展示🟧 基础项目配置🟨 本节项目配置🟩 配置MeshHideAsset1️⃣ 创建MeshHideAsset2️⃣ 配置SlotDataAsset3️⃣ 配置遮挡信息🟦 将 MeshHideAsset 配置到 Recipe🟥 本节功能效果展示 未遮挡前的穿模问题: 遮挡后效果:

SpringMVC_拦截器

4.拦截器 4.1拦截器概述 概述&#xff1a;一种动态拦截方法调用的机制&#xff0c;在SpringMVC中动态拦截控制器方法的执行实际开发中&#xff0c;静态资源&#xff08;HTML/CSS&#xff09;不需要交给框架处理&#xff0c;需要拦截的是动态资源 4.2图示 图示 4.3案例实现 …

闭包的理解

1.什么是闭包&#xff1f; 变量的私有化。一个函数内的变量,随着函数的执行完毕,对于的变量也会随着销毁,闭包可以让变量在函数执行完毕之后不必销毁,通常将这个变量通过匿名函数的形式return出去,这个变量只能被访问,不能被修改。 2.证明变量执行玩被销毁 (1)函数体没有被包…

朋友圈大佬都去读研了,这份备考书单我码住了

作者简介&#xff1a; 辭七七&#xff0c;目前大二&#xff0c;正在学习C/C&#xff0c;Java&#xff0c;Python等 作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; 七七的闲谈 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f…

day3_C++

day3_C 思维导图用C的类完成数据结构 栈的相关操作用C的类完成数据结构 循环队列的相关操作 思维导图 用C的类完成数据结构 栈的相关操作 stack.h #ifndef STACK_H #define STACK_H#include <iostream> #include <cstring>using namespace std;typedef int datat…

【数据结构】堆的创建

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

jmeter采集ELK平台海量业务日志( 采用Scroll)

由于性能测试需要&#xff0c;需采集某业务系统海量日志&#xff08;百万以上&#xff09;来使用。但Elasticsearch的结果分页size单次最大为10000&#xff08;运维同事为保证ES安全&#xff09;。为了能够快速采集ELK平台业务日志&#xff0c;可以使用以下2种方式采集&#xf…

spring spring-boot spring-cloud spring-cloud-alibaba之间版本对应关系

spring 版本与 jdk 的对应关系 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions 从 spring 6.0 开始使用 jdk 17 进行编译 对应的相关 servlet 容器&#xff08;tomcat、undertow、jetty等&#xff09;的 servlet 规范转移到 eclipse&…

线程安全问题(3)--- wait(),notify()

前言 在多线程的环境下&#xff0c;我们常常要协调多个线程之间的执行顺序&#xff0c;而为了实现这一点&#xff0c;Java提供了一些方法来帮助我们完成这一点。 一&#xff0c;wait() 作用&#xff1a; 使当前线程进入等待状态 释放当前的锁 (即该方法必须和 synchrnized 关键…

【C++】泛型编程 | 函数模板 | 类模板

一、泛型编程 泛型编程是啥&#xff1f; 编写一种一般化的、可通用的算法出来&#xff0c;是代码复用的一种手段。 类似写一个模板出来&#xff0c;不同的情况&#xff0c;我们都可以往这个模板上去套。 举个例子&#xff1a; void Swap(int& a, int& b) {int tmp …