五种服务异步通信(MQ)-详解、代码案例

简介:本篇文章主要是介绍了常用的异步通信原理,主要是RabbitMQ技术

目录

1、初始MQ(异步通讯)

1.1 同步通讯

1.2 异步通讯

1.3 MQ常见框架

2、RabbitMQ快速入门

2.1 RabbitMQ概述和安装

2.2 常见消息模型

2.3 快速入门

3、SpringAMQP

3.1 什么是SpringAMQP

3.2 SimpleQueue案例

3.3 SpringAMQP(发布、订阅模式)

3.3.1 广播模式

3.3.2 路由模式代码演示

3.3.3 话题模式

4、SpringAMQP-消息转换器

5、总结


1、初始MQ(异步通讯)

1.1 同步通讯

图 1.1-1 同步通讯存在的问题
上图中展示的就是同步通讯的问题

1.2 异步通讯

图 1.2-1 异步通讯优缺点

异步通信的优点:

  • 耦合度地
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

异步通信的缺点:

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂了、业务没有明显的流程线、不好追踪管理
上图中展示的就是异步通信的优缺点

1.3 MQ常见框架

图 1.3-1  MQ产品
上图中展示的便是四款常见的MQ产品,他们之间的优势性能也有清晰地比对

2、RabbitMQ快速入门

2.1 RabbitMQ概述和安装

图 2.1-1 RabbitMQ安装
所需要的安装包、详细记录安装步骤的MD文件,因为内容过多,我放在网盘里面了
百度网盘地址:https://pan.baidu.com/s/1FZtWCWMl_QpZEIcGNnpwKA 
提取码:6666
图 2.1-2 RabbitMQ概述
上图中展示的便是RabbitMQ的内部流程、逻辑,即消息发送者发送消息后传递给交换机,交换机将其消息存储到queue队列中,等待消息接受者获取

2.2 常见消息模型

图 2.2-1 五种消息模型
上图中展示的就是常用的五种消息队列模型,其官网地址:RabbitMQ Tutorials | RabbitMQ

2.3 快速入门

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}
package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

3、SpringAMQP

3.1 什么是SpringAMQP

图 3.1-1 SpringAMQP介绍
上图中展示的是关于SpringAMQP的消息发送和接收的标准

3.2 SimpleQueue案例

图 3.2-1 消息发送者
上图中展示的是消息发送者的代码案例:即配置连接信息、编写测试代码

3.3 SpringAMQP(发布、订阅模式)

图 3.3-1 发布、订阅模式
上图中展示的是三种通过路由器转发消息的模型,即广播模式、路由模式、话题模式
3.3.1 广播模式

1、消息发送者代码

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutExchange(){// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello,everyone!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
}

2、交换机、队列配置类代码

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 1.声明广播交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 2.交换机绑定队列一@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 3.交换机绑定队列二@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

3、消息接受者代码

package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
@Component
public class SpringRabbitListener {/*    @RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException{System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());}*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("消费者1接收到消息: 【" + msg + "】" + LocalTime.now());}
}
3.3.2 路由模式代码演示

1、消息发送者代码

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutExchange(){// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello,everyone!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}
}

2、消息接受者代码

package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}
}
3.3.3 话题模式

1、消息发送者代码

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;
/*@Testpublic void testFanoutExchange(){// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello,everyone!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}*/@Testpublic void testTopicExchange(){// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "hello,everyone!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}
}

2、消息接受者代码

package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;@Component
public class SpringRabbitListener {/*** 话题路由器*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}
/*    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue1的消息: 【" + msg + "】");}*/
}

4、SpringAMQP-消息转换器

图 4-1 SpringAMQP的作用
我们知道 RabbitTemplate 传递的参数中,消息对象是以字节数组传递的,经过序列化(默认是通过JDK实现的)后显示为正常的数据,但是如果传递的是Map,List集合这种数据,SpringCloud自带的序列化就会出现异常,为了解决这一问题,我们需要引入SpringAMQP-消息转换器
图 4-1 项目的总pom文件
在项目的总pom文件中添加相对应的依赖
图 4-3 消息发送端、接收端
在项目的消息发送端、接收端的启动类中创建Bean对象

5、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/640717.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决“该扩展程序未列在 Chrome 网上应用店中,并可能是在您不知情的情况下添加的”的方法

一、问题 安装插件出现时“该扩展程序未列在 Chrome 网上应用店中,并可能是在您不知情的情况下添加的” 二、解决方法 1、把需要安装的第三方插件,后缀.crx 改成 .rar,然后解压,得到一个文件夹 2、再打开chrome://extensions/谷歌…

node.js如何实现留言板功能?

一、实现效果如下: 20240422_160404 二、前提配置: 配置:需要安装并且导入underscore模板引擎 安装:在控制台输入npm install underscore -save 文件目录配置: 1》在文件里建一个data文件夹,此文件夹下…

boss:整个卡尔曼滤波器的简单案例——估计机器人位置

⭐️ 卡尔曼滤波 卡尔曼滤波(Kalman Filtering)是一种用于状态估计的强大技术,常用于处理具有随机噪声的系统的状态估计问题。在目标跟踪等应用中,卡尔曼滤波常被用来预测目标的位置和速度等状态变量,并根据观测数据进…

Go并发安全,锁和原子操作

一. 并发安全 有时候在Go代码中可能存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。 1.1 互斥锁 互斥锁是一种常见的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mut…

uboot大致流程总结

文章目录 一、uboot介绍二、uboot的配置编译过程2.1 make xxx_defconfig2.2 make 一、uboot介绍 uboot是一个bootloader,用于在嵌入式设备中引导linux内核启动,在嵌入式设备中常见的组织结构如下: 芯片内部固化代码 -> bootloader -> …

40+ Node.js 常见面试问题 [2024]

今天就开始你的Node.js生涯。在这里,我们探讨了最佳Node.js面试问题和答案,以帮助应届生和经验丰富的候选人获得理想的工作。 Node.js 是许多大公司技术堆栈的重要组成部分,例如 PayPal、Trello、沃尔玛和 NASA。 根据 ZipRecruiter 的数据&…

算法练习|Leetcode49字母异位词分词 ,Leetcode128最长连续序列,Leetcode3无重复字符的最长子串,sql总结

目录 一、Leetcode49字母异位词分词题目描述解题思路方法:哈希总结 二、Leetcode128最长连续序列题目描述解题思路方法:总结 三、Leetcode3无重复字符的最长子串题目描述解题思路方法:双指针法总结sql总结 一、Leetcode49字母异位词分词 题目描述 给你一个字符串数组&#xf…

模板初阶

泛型编程: 泛型编程:编写与类型无关的通用代码,模板是泛型编程的基础 class Test { public:void Swap(int& left, int& right){int tmp left;left right;right tmp;}void Swap(double& left, double& right){double tmp…

AR HUD_VSLAM+显示技术

智能座舱的一个重要技术方向是表达与展示。HUD可以将驾驶相关的信息,如车速、导航等投射到驾驶员的视线上方,避免驾驶员的目光离开前方道路。这种显示方式可以提供关键信息的实时展示,减少驾驶员的分心。 HUD的技术原理就是通过光学系统将信息…

网络工程师----第十一天

OSPF: 对称加密算法: 也称为私钥加密或单密钥算法,是一种加密方式,其中加密和解密使用相同的密钥。这种算法的优点包括加密解密速度快、计算量小,适用于大量数据的加密。然而,它的缺点是密钥的安全性难以保…

入坑 Node.js 1

原文:https://blog.iyatt.com/?p14717 前言 前面刚刚对 Spring Boot 有了个概念,再来学学 Node.js,顺便当学 JavaScript,为后面入前端做准备。 环境 Node.js 20.12.2 官方 API 文档:https://nodejs.org/docs/lat…

前端CSS基础6(CSS列表与表格的相关属性,边框的样式调整)

前端CSS基础6(CSS列表与表格的相关属性,边框的样式调整) CSS列表相关属性CSS表格相关属性回忆表格边框相关属性单元格边框相关属性回忆单元格的跨行和跨列操作单元格边框的相关属性 CSS列表相关属性 在 CSS 中,列表(L…