Spring Boot集成RabbitMQ-之6大模式总结

A.集成
一:添加依赖
在pom.xml文件中添加spring-boot-starter-amqp依赖,以便使用Spring Boot提供的RabbitMQ支持:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二:配置RabbitMQ连接信息

  rabbitmq:host: 13X.9.1XX.7Xport: 5672 #通过控制台可以查看    记得开启这个端口的防护username: adminpassword: admin

三:创建队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic Queue queue() {//name,名字;durable,是否开启持久化return new Queue("logs",false);}
}

启动就可以得到下队列
在这里插入图片描述

四:创建控制类来生产数据


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RabbitMQController {private static final Logger logger = LoggerFactory.getLogger(RabbitMQController.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("aaa")public void simpleTest() {logger.info("RabbitMQController开始!");rabbitTemplate.convertAndSend("logs","hello world!");logger.info("RabbitMQController结束!");}
}

因为只创建了生产,消费者没有创建,所以在RabbitMQ客户端可以查看,然后点击,消费可得数据
在这里插入图片描述

五:创建消费者,获取数据

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumeBean {private static final Logger logger = LoggerFactory.getLogger(ConsumeBean.class);@RabbitListener(queues={"logs"})public void getMsg(String message){logger.info("消费者:{}",message);}
}

这样就可以看出,消息自动就被接收,消费掉了
在这里插入图片描述

B.消息传递的开放标准协议(AMQP)

AMQP(Advanced Message Queuing Protocol)它定义了一种抽象的消息传递模型,包括以下几个主要组件:

消息(Message):AMQP中的基本单位,是要在消息队列系统中传递的数据。消息通常包括消息体和消息头,消息体是实际要传递的数据,而消息头包含元数据信息,如消息的路由键、优先级等。

生产者(Producer):负责创建并发送消息到消息队列中的实体。生产者将消息发布到交换机(Exchange),交换机根据路由规则将消息路由到一个或多个队列中。

消费者(Consumer):从消息队列中接收并处理消息的实体。消费者订阅一个或多个队列,并在有消息到达时接收并处理它们。

交换机(Exchange):用于接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列中。AMQP定义了不同类型的交换机,如直连交换机(Direct Exchange)、主题交换机(Topic Exchange)、扇出交换机(Fanout Exchange)等。

队列(Queue):存储消息的容器,消费者从队列中获取消息进行处理。消息可以被一个或多个消费者订阅,但每条消息只会被一个消费者接收。

绑定(Binding):用于将交换机和队列之间建立关联关系的规则。绑定定义了消息如何从交换机路由到队列,通常包括交换机名称、路由键等信息。

连接(Connection):生产者和消费者与消息代理(如RabbitMQ)之间建立的网络连接。连接是长期的、持久的,用于传输消息和管理通信。

通过这些抽象组件,AMQP定义了一个灵活且可扩展的消息传递模型,使得不同的消息队列系统可以遵循相同的协议进行通信和交互。这种抽象模型使得开发者可以更容易地实现消息传递系统,并实现消息的可靠传递和处理

六大模式
1.简单队列 一个生产者一个队列一个消费者
2.工作队列 一个生产者一个队列多个消费者
3.订阅模式 一个生产者一个交换机 多个队列多个消费者(对与消一对一)
4.路由模式 一个生产者一个交换机 分类进入队列 多个队列多个消费者(对与消一对一)
5.主题模式(通配符模式) 一个生产者一个交换机 通配符分类进入队列 多个队列多个消费者(对与消一对一)
6.RPC 是一种实现远程过程调用的方式,允许客户端应用程序调用远程服务器上的服务,并等待服务端返回结果。

1.简单队列

创建生产者(Producer):
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queueName", message);}
}
//创建消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {@RabbitListener(queues = "queueName")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}//队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue queue1() {return new Queue("queueName");}
}

2.工作队列

//队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig2{@Beanpublic Queue taskQueue() {return new Queue("taskQueue");}
}
//创建生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendTask(String task) {rabbitTemplate.convertAndSend("taskQueue", task);}
}//创建消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TaskConsumer {@RabbitListener(queues = "taskQueue")public void processTask(String task) {System.out.println("Processing task: " + task);// Simulate task processingtry {Thread.sleep(1000); // Simulate task processing time} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Task processed: " + task);}
}

3.订阅模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer3 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("fanoutExchange", "", message);}
}//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumerA {@RabbitListener(queues = "queueFanout1")public void receiveMessage(String message) {System.out.println("Consumer 1 received message: " + message);}
}
@Component
public class MessageConsumerB {@RabbitListener(queues = "queueFanout2")public void receiveMessage(String message) {System.out.println("Consumer 2 received message: " + message);}
}//配置交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig3 {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@Beanpublic Queue queueFanout1() {return new Queue("queueFanout1");}@Beanpublic Queue queueFanout2() {return new Queue("queueFanout2");}@Beanpublic Binding binding1() {return BindingBuilder.bind(queueFanout1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(queueFanout2()).to(fanoutExchange());}
}

4.路由模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer4 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("directExchange", routingKey, message);}
}//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageConsumerly1 {@RabbitListener(queues = "queueDirect1")public void receiveMessage(String message) {System.out.println("Consumer 1 received message: " + message);}
}
@Component
public class MessageConsumerly2 {@RabbitListener(queues = "queueDirect2")public void receiveMessage(String message) {System.out.println("Consumer 2 received message: " + message);}
}//配置交换机和队列import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig4 {@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange");}@Beanpublic Queue queueDirect1() {return new Queue("queueDirect1");}@Beanpublic Queue queueDirect2() {return new Queue("queueDirect2");}@Beanpublic Binding bindingDirect1() {return BindingBuilder.bind(queueDirect1()).to(directExchange()).with("routingDirectKey1");}@Beanpublic Binding bindingDirect2() {return BindingBuilder.bind(queueDirect2()).to(directExchange()).with("routingDirectKey2");}
}

5.主题模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer5 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("topicExchange", routingKey, message);}
}//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer5 {@RabbitListener(queues = "queueTopic5")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}//配置交换机和队列import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig5 {@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topicExchange");}@Beanpublic Queue queueTopic5() {return new Queue("queueTopic5");}@Beanpublic Binding bindingTopic5() {return BindingBuilder.bind(queueTopic5()).to(topicExchange()).with("topic.*");}
}

6.RPC模式

//创建RPC客户端(Client)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RpcClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public String sendMessageAndReceiveResponse(String message) {return (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcQueue", message);}
}//创建RPC服务端(Server)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RpcServer {@RabbitListener(queues = "rpcQueue")public String processMessage(String message) {// Perform some processing based on the messagereturn "Processed: " + message;}
}//配置交换机和队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig6 {@Beanpublic DirectExchange rpcExchange() {return new DirectExchange("rpcExchange");}@Beanpublic Queue rpcQueue() {return new Queue("rpcQueue");}@Beanpublic Binding rpcBinding() {return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with("rpcQueue");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/674416.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CANdela/Diva系列2--CANdela Studio的工作树介绍1

本系列的第一篇文章&#xff08;CANdela/Diva系列1--CANdela Studio的基本介绍&#xff09;主要介绍了CANdela这个工具&#xff0c;本篇文章将对CANdela Studio的工作树的每个模块进行详细介绍&#xff0c;不啰嗦&#xff0c;直接开始&#xff01; 目录 1. ECU Information的…

看完这篇文章我奶奶都懂Opentracing了 (二)

二. 概念分析 1. Span和SpanContext 结合上述示例&#xff0c;我们从Span开始入手来进行概念分析&#xff0c;但是说在最前面&#xff0c;Span在不同的分布式链路实现中&#xff0c;其定义是不全一样的&#xff0c;尽管Opentracing已经进行了概念的统一&#xff0c;但是具体到…

leetcode17. 电话号码的字母组合

题目描述&#xff1a; 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits "…

八.吊打面试官系列-Tomcat优化-深入源码剖析Tomcat如何打破双亲委派

前言 上篇文章《Tomcat优化-深入Tomcat底层原理》我们从宏观上分析了一下Tomcat的顶层架构以及核心组件的执行流程。本篇文章我们从源码角度来分析Tomcat的类加载机制&#xff0c;且看它是如何打破JVM的ClassLoader双亲委派的 Tomcat ClassLoader 初始化 Tomcat的启动类是在…

基于ambari hdp的kafka用户授权读写权限

基于ambari hdp的kafka用户授权读写权限 版本Kafka 2.0.0添加自定义配置修改admin密码重启kafka授权读取授权写入有效通配符部分举例 版本Kafka 2.0.0 添加自定义配置 authorizer.class.name kafka.security.auth.SimpleAclAuthorizer super.users User:admin allow.everyo…

【LLM】AMD GPU上实现高性能LLM推理

【LLM】AMD GPU上实现高性能LLM推理 参考链接1&#xff1a;https://zhuanlan.zhihu.com/p/649088095 参考链接2&#xff1a;https://github.com/mlc-ai/mlc-llm 家里有台游戏机&#xff0c;配置相对训练大模型要求的资源较低&#xff0c;而且是AMD显卡&#xff0c;拿来玩玩推理…

DeepSeek发布全新开源大模型,GPT-4级别能力 价格仅百分之一

最新国产开源MoE大模型&#xff0c;刚刚亮相就火了。 DeepSeek-V2性能达GPT-4级别&#xff0c;但开源、可免费商用、API价格仅为GPT-4-Turbo的百分之一。 因此一经发布&#xff0c;立马引发不小讨论。 从公布的性能指标来看&#xff0c;DeepSeek-V2的中文综合能力超越一众开源…

python - rst file to html

文章目录 python - rst file to html概述笔记下载安装PyCharm最新的学习版新建虚拟环境为Conda的工程添加docutils库新建python文件&#xff0c;添加转换代码运行自己写的python文件&#xff0c;执行转换转换结果END python - rst file to html 概述 开源工程中有一个.rst文件…

linux或ubuntu环境下需要自行安装vivado USB Program下载程序驱动

如果在linux或ubuntu环境下&#xff0c;不安装驱动是无法下载FPGA程序的。在linux或ubuntu环境下安装程序不要自动安装。 johnjohn-wang:~/vitis2021.2/Vivado/2021.2/data/xicom/cable_drivers/lin64/install_script/install_drivers$ sudo ./install_drivers

【负载均衡在线OJ项目日记】编译与日志功能开发

目录 日志功能开发 常见的日志等级 日志功能代码 编译功能开发 创建子进程和程序替换 重定向 编译功能代码 日志功能开发 日志在软件开发和运维中起着至关重要的作用&#xff0c;目前我们不谈运维只谈软件开发&#xff1b;日志最大的作用就是用于故障排查和调试&#x…

如何从Windows 10电脑远程登录Ubuntu系统

要从Windows 10电脑远程登录Ubuntu系统&#xff0c;您可以使用以下步骤&#xff1a; 在Ubuntu上安装xRDP: 首先&#xff0c;在Ubuntu电脑上打开终端&#xff0c;然后输入以下命令来安装xRDP服务&#xff1a; sudo apt update sudo apt install xrdpxRDP是一个开源的远程桌面协议…

鸿蒙OpenHarmony实战开发-MiniCanvas

介绍 基于OpenHarmony的Cavas组件封装了一版极简操作的MiniCanvas&#xff0c;屏蔽了原有Canvas内部复杂的调用流程&#xff0c;支持一个API就可以实现相应的绘制能力&#xff0c;该库还在继续完善中&#xff0c;也欢迎PR。 使用说明 1.添加MiniCanvas依赖 在项目entry目录…