java中使用rabbitmq

文章目录

  • 前言
  • 一、引入和配置
    • 1.引入
    • 2.配置
  • 二、使用
    • 1.队列
    • 2.发布/订阅
      • 2.1 fanout(广播)
      • 2.2 direct(Routing/路由)
      • 2.3 Topics(主题)
      • 2.4 Headers
  • 总结


前言

mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安装链接。


一、引入和配置

1.引入

Spring AMQP高级消息队列协议有两部分组成,spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在这里插入图片描述

2.配置

配置参考RabbitProperties.java

spring:rabbitmq:host: 192.168.137.192port: 5672username: guestpassword: guestvirtualHost: /

二、使用

1.队列

在这里插入图片描述
RabbitConfiguration

package com.student.rabbit.queue;import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.Queue;
/*** Create by zjg on 2024/3/9*/
@Configuration
public class RabbitConfiguration {protected final String queueName = "queue";@Beanpublic Queue queue() {return new Queue(this.queueName);}
}

Producer

package rabbit.queue;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/9*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class Producer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate Queue queue;AtomicInteger count = new AtomicInteger(0);@Testpublic void send() {for (int i = 0; i < 10; i++) {StringBuilder builder = new StringBuilder("Hello");builder.append(" "+count.incrementAndGet());String message = builder.toString();template.convertAndSend(queue.getName(), message);System.out.println(" [x] Sent '" + message + "'");}}
}

Consumer

package com.student.rabbit.queue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/9*/
@Component
public class Consumer {private static final Logger log = LoggerFactory.getLogger(Consumer.class);protected final String queueName = "queue";@RabbitListener(queues = queueName)public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = queueName)public void receive2(String message){log.debug("receive2:"+message);}
}

每个队列都消费了5条消息
在这里插入图片描述

2.发布/订阅

交换机类型有fanout,direct, topic, headers四种,接下来我们来学习每种方式的使用以及它们的区别。

2.1 fanout(广播)

P(生产者)产生消息给到X(交换机),X分发给绑定的所有队列。

在这里插入图片描述

RabbitFanoutConfiguration
我们定义了AnonymousQueue,它创建了一个具有生成名称的非持久、独占、自动删除队列

package com.student.rabbit.fanout;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitFanoutConfiguration {@Beanpublic FanoutExchange fanout() {return new FanoutExchange("sys.fanout");}private static class ReceiverConfig {@Beanpublic Queue fanoutQueue1() {return new AnonymousQueue();}@Beanpublic Queue fanoutQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingFanout1(FanoutExchange fanout,Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanout);}@Beanpublic Binding bindingFanout2(FanoutExchange fanout,Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanout);}}
}

FanoutProducer

package rabbit.fanout;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class FanoutProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate FanoutExchange fanout;@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < 10; i++) {StringBuilder builder = new StringBuilder("Hello");builder.append(" "+count.incrementAndGet());String message = builder.toString();template.convertAndSend(fanout.getName(), "", message);System.out.println(" [x] Sent '" + message + "'");}}
}

FanoutConsumer

package com.student.rabbit.fanout;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class FanoutConsumer {private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);@RabbitListener(queues = "#{fanoutQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{fanoutQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

总共发送10条消息,每个队列都消费了10条
在这里插入图片描述

2.2 direct(Routing/路由)

可以将根据不同的路由规则分发消息,很灵活,消费者需要哪种就订阅哪种消息。

在这里插入图片描述
在这里插入图片描述
RabbitDirectConfiguration

package com.student.rabbit.direct;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitDirectConfiguration {@Beanpublic DirectExchange direct() {return new DirectExchange("sys.direct");}private static class ReceiverConfig {@Beanpublic Queue directQueue1() {return new AnonymousQueue();}@Beanpublic Queue directQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingDirect1a(DirectExchange direct,Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(direct).with("orange");}@Beanpublic Binding bindingDirect1b(DirectExchange direct,Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(direct).with("black");}@Beanpublic Binding bindingDirect2a(DirectExchange direct,Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(direct).with("green");}@Beanpublic Binding bindingDirect2b(DirectExchange direct,Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(direct).with("black");}}
}

DirectProducer

package rabbit.direct;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class DirectProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange direct;private final String[] keys = {"orange", "black", "green"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);String message = builder.toString();template.convertAndSend(direct.getName(), key, message);System.out.println(" [x] Sent '" + message + "'");}}
}

DirectConsumer

package com.student.rabbit.direct;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class DirectConsumer {private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);@RabbitListener(queues = "#{directQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{directQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

共发送了3条消息,有两个队列都绑定了black,所以black的消息消费2次
在这里插入图片描述

2.3 Topics(主题)

主题模式在路由的基础上增加了routingKey的模糊匹配。
*(星)可以代替一个词。
#(hash)可以代替零个或多个单词。

在这里插入图片描述
RabbitTopicConfiguration

package com.student.rabbit.topic;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitTopicConfiguration {@Beanpublic TopicExchange topic() {return new TopicExchange("sys.topic");}private static class ReceiverConfig {@Beanpublic Queue topicQueue1() {return new AnonymousQueue();}@Beanpublic Queue topicQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingTopic1a(TopicExchange topic,Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");}@Beanpublic Binding bindingTopic1b(TopicExchange topic,Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");}@Beanpublic Binding bindingTopic2a(TopicExchange topic,Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");}@Beanpublic Binding bindingTopic2b(TopicExchange topic,Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topic).with("quick.brown.*");}}
}

TopicProducer

package rabbit.topic;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class TopicProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate TopicExchange topic;private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox","lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);String message = builder.toString();template.convertAndSend(topic.getName(), key, message);System.out.println(" [x] Sent '" + message + "'");}}
}

TopicConsumer

package com.student.rabbit.topic;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class TopicConsumer {private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);@RabbitListener(queues = "#{topicQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{topicQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

队列1匹配了中间值为orange和rabbit结尾的消息,队列2匹配了lazy开头和quick.brown开头的消息
在这里插入图片描述

2.4 Headers

关于headers模式,在官方没有找到文档,但包里还有,索性还是写一下吧。

RabbitHeadersConfiguration

package com.student.rabbit.headers;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitHeadersConfiguration {@Beanpublic HeadersExchange headers() {return new HeadersExchange("sys.headers");}private static class ReceiverConfig {@Beanpublic Queue headersQueue1() {return new AnonymousQueue();}@Beanpublic Queue headersQueue2() {return new AnonymousQueue();}@Beanpublic Queue headersQueue3() {return new AnonymousQueue();}@Beanpublic Binding bindingHeaders1(HeadersExchange headers,Queue headersQueue1) {Map<String,Object> headerValue=new HashMap<>();headerValue.put("user","sys");return BindingBuilder.bind(headersQueue1).to(headers).whereAll(headerValue).match();}@Beanpublic Binding bindingHeaders2(HeadersExchange headers,Queue headersQueue2) {Map<String,Object> headerValue=new HashMap<>();headerValue.put("user","admin");return BindingBuilder.bind(headersQueue2).to(headers).whereAll(headerValue).match();}@Beanpublic Binding bindingHeaders3(HeadersExchange headers,Queue headersQueue3) {return BindingBuilder.bind(headersQueue3).to(headers).where("user").exists();}}
}

HeadersProducer

package rabbit.headers;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class HeadersProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate HeadersExchange headers;private final String[] keys = {"sys", "admin"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);MessageProperties messageProperties=new MessageProperties();messageProperties.setHeader("user",key);Message message = MessageBuilder.withBody(builder.toString().getBytes()).andProperties(messageProperties).build();template.send(headers.getName(), "", message);System.out.println(" [x] Sent '" + message + "'");}}
}

HeadersConsumer

package com.student.rabbit.headers;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class HeadersConsumer {private static final Logger log = LoggerFactory.getLogger(HeadersConsumer.class);@RabbitListener(queues = "#{headersQueue1.name}")public void receive1(Message message){log.debug("receive1:"+new String(message.getBody()));}@RabbitListener(queues = "#{headersQueue2.name}")public void receive2(Message message){log.debug("receive2:"+new String(message.getBody()));}@RabbitListener(queues = "#{headersQueue3.name}")public void receive3(Message message){log.debug("receive3:"+new String(message.getBody()));}
}

第一个队列接收sys消息,第二个队列接收admin消息,第三个队列只要包含user头的消息都接收。
在这里插入图片描述


总结

回到顶部
安装看这里
官方文档
官方网站
其他项目,可参考官方案例
路漫漫其修远兮,吾将上下而求索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/527195.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

保姆级认识AVL树【C++】(三种insert情况 || 四种旋转方法)

目录 前言 一&#xff0c;AVL概念 二&#xff0c;基础框架 三&#xff0c;insert 1. 插入三种情况 2. 四种旋转方法 法一&#xff1a;左单旋法 法二&#xff1a;右单旋法 法三&#xff1a;先左后右双旋法 法四&#xff1a;先右后左双旋法 测试&#xff08;判断一棵树…

修改简化docker命令

修改|简化docker命令 使用命令打开 .bashrc 文件&#xff1a; vim ~/.bashrc在文件中添加类似以下行来创建别名&#xff1a; # 查看所有容器 alias disdocker images # 查看运行容器 alias dpsdocker ps # 查看所有容器 alias dpsadocker ps -a # 停止容器 alias dsdocker s…

基于智慧灯杆的智慧城市解决方案(2)

功能规划 智慧照明功能 智慧路灯的基本功能仍然是道路照明, 因此对照明功能的智慧化提升是最基本的一项要求。 对道路照明管理进行智慧化提升, 实施智慧照明, 必然将成为智慧城市中道路照明发展的主要方向之一。 智慧照明是集计算机网络技术、 通信技术、 控制技术、 数据…

Data Concerns Modeling Concerns

How was the data you are using collected? What assumptions is your model making by learning from this dataset? Is this dataset representative enough to produce a useful model? How could the results of your work be misused? What is the intended use and …

第15章——西瓜书规则学习

1.序贯覆盖 序贯覆盖是一种在规则学习中常用的策略&#xff0c;它通过逐步构建规则集来覆盖训练数据中的样本。该策略采用迭代的方式&#xff0c;每次从训练数据中选择一部分未被覆盖的样本&#xff0c;学习一条能够覆盖这些样本的规则&#xff0c;然后将这条规则加入到规则集中…

【Python】成功解决ModuleNotFoundError: No module named ‘matplotlib‘

【Python】成功解决ModuleNotFoundError: No module named ‘matplotlib’ &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448…

Linux系统安装及简单操作

目录 一、Linux系统安装 二、Linux系统启动 三、Linux系统本地登录 四、Linux系统操作方式 五、Linux的七种运行级别&#xff08;runlevel&#xff09; 六、shell 七、命令 一、Linux系统安装 场景1&#xff1a;直接通过光盘安装到硬件上&#xff08;方法和Windows安装…

基于springboot实现摄影网站系统项目【项目源码】

基于springboot实现摄影网站系统演示 摘要 随着时代的进步&#xff0c;社会生产力高速发展&#xff0c;新技术层出不穷信息量急剧膨胀&#xff0c;整个社会已成为信息化的社会人们对信息和数据的利用和处理已经进入自动化、网络化和社会化的阶段。如在查找情报资料、处理银行账…

虚拟化

什么是虚拟化 虚拟化&#xff08;Virtualization&#xff09;是一种资源分配和管理技术&#xff0c;是将计算机的各种实体资源,比如CPU、内存、磁盘空间、网络适配器等&#xff0c;进行抽象转换后虚拟的设备,可以实现灵活地分割、组合为一个或多个计算机配置环境&#xff0c;并…

el-form-item内的el-select如何自适应宽度

最近在使用element-ui做后台管理的时候&#xff0c;有个需求是在弹窗组件里面&#xff0c;添加一个el-select下拉框选项&#xff0c;但是给el-select设置的宽度无法自适应&#xff0c;原因很简单&#xff0c;我们不需要设置固定宽度&#xff0c;设置百分比就行了&#xff0c;让…

CURE-Net: A Cascaded Deep Network for Underwater Image Enhancement

文章目录 论文结构 及 读论文的方法总结论文理解看图AbstractIntroductionRELATED WORKPROPOSED METHODA Philosophy of Model DesignB Framework of CURE-NetC Proposed GESNet and ORSNetD Proposed DEB and SRBE Loss Function Experiment And ResultA Implementation Detai…

Python算法题集_在排序数组中查找元素的第一个和最后一个位置

Python算法题集_在排序数组中查找元素的第一个和最后一个位置 题34&#xff1a;在排序数组中查找元素的第一个和最后一个位置1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【二分法两次左边界】2) 改进版一【二分法左右边界】3) 改进版二【第三…