SpringAMQP

SpringAMQT

  • RabbitMQ
    • 安装与部署
    • RabbitMQ结构
    • 简单队列模型
  • SpringAMQP
    • 依赖引入
    • 配置RabbitMQ连接信息
    • 基本模型
      • 简单队列模型
      • WorkQueue模型
    • 发布订阅模型
      • FanoutExchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

消息队列是实现异步通讯的一种方式,我们将从RabbitMQ为例开始介绍SpringAMQT。

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件。

安装与部署

由于RabbitMQ运行需要安装Erlang,为了方便部署,我们采用docker的方式来部署RabbitMq

首先拉取RabbitMQ的镜像,带有management的Tag的说明该镜像含有Web控制台

docker pull rabbitmq:3-management

执行下面的命令来运行RabbitMQ容器

docker run \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

username和password是进入RabbitMQ控制台时使用的账号密码,15672端口是控制台所占用的端口,5672是MQ服务所占用的端口。

RabbitMQ结构

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列中
  • queue: 缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
    在这里插入图片描述

简单队列模型

消息发布者

@SpringBootTest
class PublisherApplicationTests {@Testvoid publisher() throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.111.135");factory.setPort(5672);factory.setUsername("username");factory.setPassword("password");factory.setVirtualHost("/");Connection connection=factory.newConnection();//创建通道Channel channel=connection.createChannel();//创建队列String queueName="simple.queue";channel.queueDeclare(queueName,false,false,false,null);//发布消息String message="hello,rabbitmq!";for (int i = 0; i < 100; i++) {channel.basicPublish("",queueName,null,message.getBytes());}channel.close();connection.close();}

消息消费者

public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.111.135");factory.setPort(5672);factory.setUsername("username");factory.setPassword("password");factory.setVirtualHost("/");Connection connection=factory.newConnection();//建立通道Channel channel=connection.createChannel();//消费端也创建队列是为了防止消费端先启动找不到队列String queueName="simple.queue";channel.queueDeclare(queueName,false,false,false,null);//为通道绑定消费者channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息已被处理");}});}
}

其他的模型将在SpringAMPQ中进行介绍

SpringAMQP

AMQP (Adavance Message Queuing Protocol 高级消息队列协议)是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两个部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

SpringAMQP的特点:

  • 提供监听容器用于异步处理入站消息
  • 提供RabbitTemplate用于发送和接收消息
  • 提供RabbitAdmin来自动声明队列,交换机和绑定

依赖引入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

spring:rabbitmq:host: 192.168.111.135port: 5672username: usernamepassword: passwordvirtual-host: /

基本模型

简单队列模型

消息发布端

@SpringBootTest
class PublisherApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid publisher(){String queueName="simple.queue";String message="hello,rabbitmq!";rabbitTemplate.convertSendAndReceive(queueName,message);}}

消息消费端

@Component
public class RabbitListenerTest {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String message){System.out.println(message);}
}

WorkQueue模型

模型特点:多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

在默认情况下,消费者会进行消息预取,预取的数量为无限大,这会导致性能不同的消费者处理相同数量的消息,可以通过设置prefetch来控制消费者预取的消息数量

spring:rabbitmq:listener:simple:prefetch: 1

发布订阅模型

发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

常见的exchange类型有:

  • Fanout: 广播
  • Direct: 路由
  • Topic: 话题

exchange只负责消息路由而不进行存储,路由失败则消息丢失

发布订阅模型分三部

  1. 在consumer服务中声明Exchange、Queue、Binding
  2. 在consumer服务中声明多个消费者
  3. 在publisher服务发送消息到Exchange

FanoutExchange

声明Exchange、Queue、Binding。除了通过在配置类中通过@Bean注解绑定队列和交换机外还可以在@RabbitListener注解中绑定,第二种绑定方式将在下一部分使用

@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("simple.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}@Beanpublic Binding bingingQueue1(Queue fanoutQueue1,FanoutExchange exchange){return BindingBuilder.bind(fanoutQueue1).to(exchange);}@Beanpublic Binding bingingQueue2(Queue fanoutQueue2,FanoutExchange exchange){return BindingBuilder.bind(fanoutQueue2).to(exchange);}
}

声明消费者与简单模型没什么差别,就不赘述了。

publisher端将消息发送给交换机有一点小区别

@SpringBootTest
class PublisherApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid publisher(){String exchangeName="simple.fanout";String message="hello,rabbitmq!";//三个参数分别为交换机名、routingkey和消息rabbitTemplate.convertSendAndReceive(exchangeName,"",message);}}

DirectExchange

在上一种交换机中发送消息的第二个参数routingkey我们设置为了空,实际上,每一个Queue与Exchange间都可以设定多个bindingkey,通过routingkey参数,交换机将把消息路由到与其匹配的队列中。

@Component
public class RabbitListenerTest {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "simple.queue1"),exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenSimpleQueueMessage(String message){System.out.println(message);}
}

TopicExchange

TopicExchange与DirectExchange类似,区别是:

  • TopicExchange的routingkey必须是多个单词的列表,并且以英文句号.分隔。
  • bindingkey支持使用通配符,#匹配零或多个单词,*匹配一个单词。

通过以下代码,你很容易可以发现它的特点

@SpringBootTest
class PublisherApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid publisher(){String exchangeName="simple.fanout";String message="hello,rabbitmq!";rabbitTemplate.convertSendAndReceive(exchangeName,"china.weather",message);}}
@Component
public class RabbitListenerTest {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "simple.queue1"),exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.Topic),key = "china.*"))public void listenSimpleQueueMessage(String message){System.out.println(message);}
}

消息转换器

我们在使用RabbitTemplate的时候可以发现,它所发送的消息的类型为Object,这意味着它可以发送所有对象。默认它所使用的是jdk的序列化,这样的效率较低。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,只需定义一个MessageConverter类型的Bean即可修改序列化方式。

以jackson的序列化为例

引入jackson的依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

定义MessageConverter类型的Bean

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/144537.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker 部署mysql

Centos7为例 NAME"CentOS Linux" VERSION"7 (Core)" ID"centos" ID_LIKE"rhel fedora" VERSION_ID"7" PRETTY_NAME"CentOS Linux 7 (Core)" ANSI_COLOR"0;31" CPE_NAME"cpe:/o:centos:centos:7&qu…

Java File与IO流学习笔记

内存中存放的都是临时数据&#xff0c;但是在断电或者程序终止时都会丢失 而硬盘则可以长久存储数据&#xff0c;即使断电&#xff0c;程序终止&#xff0c;也不会丢失 File File是java.io.包下的类&#xff0c;File类的对象&#xff0c;用于代表当前操作系统的文件(可以是文…

C语言 ——宽字符

前言&#xff1a; 过去C语⾔并不适合⾮英语国家&#xff08;地区&#xff09;使⽤。 C语⾔最初假定字符都是单字节的。但是这些假定并不是在世界的任何地⽅都适⽤。 C语⾔字符默认是采⽤ASCII编码的&#xff0c;ASCII字符集采⽤的是单字节编码&#xff0c;且只使⽤了单字节中…

Linux高性能服务器编程——ch7笔记

第7章 Linux服务器程序规范 7.1 日志 Linux提供rsyslogd守护进程接收用户进程输出的日志和内核日志。 应用程序使用syslog函数与rsyslogd守护进程通信。 void syslog(int priority, const char* message, …); openlog函数&#xff1a;改变syslog的默认输出方式。 setlogm…

SD NAND对比TF卡优势(以CSNP4GCR01-AMW为例)

最近做的一个项目&#xff0c; 需要加大容量存储&#xff0c;这让我想到之前在做ARM的开发板使用的TF卡方案&#xff0c;但是TF卡需要携带卡槽的&#xff0c;但是有限的PCB板布局已经放不下卡槽的位置。 这个时候就需要那种能够不用卡槽&#xff0c;直接贴在板子上面&#xff0…

手把手教你使用Express框架在Node服务端实现图片渲染

手把手教你使用Express框架在Node服务端实现图片渲染 1.前言2.node-canvas库3.搭建node服务端环境3.1 初始化项目3.2 使用内置http模块创建服务3.3 使用Express创建服务 4.服务端渲染图片4.1 创建Express路由4.2 绘制三角形4.3 静态资源中间件4.4 写入图片文件4.5 渲染Echarts图…

使用 Data Assistant 快速创建测试数据集

使用 Data Assistant 快速创建测试数据集 Data Assistant 提供超过 100 种数据类型&#xff0c;为任何开发、测试或演示目的生成大量、异构、真实的数据。 官网地址&#xff1a; http://www.redisant.cn/da 主要功能 Windows 原生 Data Assistant 使用 Windows Native 技术…

SpringBoot集成Redisson操作Redis

目录 一、前言二、基础集成配置&#xff08;redis单节点&#xff09;2.1、POM2.2、添加配置文件2.3、添加启动类2.4、添加测试类测试redisson操作redis 一、前言 Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格&#xff0c;Redisson相比较与Jedis和Lettuce来说最…

表结构的操作【MySQL】

文章目录 创建表例子 查看表结构修改表新增列属性修改列属性修改列名修改表名删除列 删除表 阅读前导&#xff1a; 一般来说&#xff0c;对表的操作可以分为对表结构和对表内容的操作。 对表结构的操作&#xff0c;就是用数据定义语言 DDL 来创建、修改或删除表中的对象&#…

数字秒表设计仿真VHDL跑表,源码,视频

名称&#xff1a;简单秒表设计仿真VHDL跑表 软件&#xff1a;Quartus 语言&#xff1a;VHDL 代码功能&#xff1a; 数字秒表功能描述 本次练习只需要一个数码管(假设该数码管已被选中),实现数码管显示功能,具体要求如下(设数码管为共阳&#xff09; 1)实现秒表计时功能。…

在Word中,图片显示不全

在今天交作业的时候&#xff0c;发现了一个非常SB的事情&#xff0c;把图片复制过去显示不完全&#xff1a; 使用文心一言查看搜索了一下&#xff0c;发现可能是以下几种原因&#xff1a; 图片所在行的行高设置不正确。可以重新设置行高&#xff0c;具体步骤包括打开图片显示…

C++迭代器失效

在STL中&#xff0c;有些操作会导致迭代器失效&#xff0c;即之前获取的迭代器无法再安全地使用。这是因为这些操作可能会改变容器的结构&#xff0c;例如插入、删除元素等。 具体来说&#xff0c;以下情况下迭代器会失效&#xff1a; 1. 当插入或删除元素导致容器中的内存重新…