RabbitMQ通过代码实现交换机分发模式

news/2025/3/30 16:16:13/文章来源:https://www.cnblogs.com/5ran2yl/p/18795042

首先导入rabbitMQ的依赖:

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.25.0</version></dependency>

 

一.fanout模式实现

图示:

在代码实现中,我们只需要关注生产者和消费者,因为交换机的创建和队列的绑定一般都是由图形化界面完成的,故而使用Java代码时候,就只需要实现生产者和消费者了

producer.Java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,回忆也交给时间";String  exchangeName = "fanout_exchange";String routingKey = "";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

 

Consumer.java:

import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, (s,delivery)-> {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}, (s) -> {});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();new Thread(runnable, "queue3").start();}
}

 

使用三个线程模拟多个消费者,可以同时拿取多个队列

 二.direct模式

图示:

producer.Java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,马明";String  exchangeName = "direct_exchange";String routingKey1 = "q1";String routingKey2 = "q3";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

 

consumer.Java

import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();new Thread(runnable, "queue3").start();}
}

 

结果消费截图:

三.Topic模式 

图示:

 producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,马明";String  exchangeName = "topics_exchange";String routingKey1 = "name.user";//都可以收到 queue-1 queue-3String routingKey2 = "name";//都可以收到 queue-2// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

 

 

consumer.java

import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static Runnable runnable = () -> {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//获取队列的名称final String queueName = Thread.currentThread().getName();Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/**  如果队列不存在,则会创建*  Rabbitmq不允许创建两个相同的队列名称,否则会报错。**  @params1: queue 队列的名称*  @params2: durable 队列是否持久化*  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭*  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。*  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println(queueName + ":开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}};public static void main(String[] args) {// 启动三个线程去执行new Thread(runnable, "queue1").start();new Thread(runnable, "queue2").start();new Thread(runnable, "queue3").start();}
}

 

结果消费截图:

 四.使用代码声明交换机和队列

在 RabbitMQ 中,使用 Java 代码声明交换机和队列通常通过 RabbitMQ 提供的客户端库 amqp-client 来实现。以下是一个完整的示例,展示如何使用 Java 代码声明交换机(Exchange)和队列(Queue),并将其绑定在一起。

  • Direct Exchange: 消息通过路由键精确匹配到队列。

  • Fanout Exchange: 广播消息到所有绑定的队列,忽略路由键。

  • Topic Exchange: 支持通配符的路由键匹配。

  • Headers Exchange: 根据消息头(Header)进行匹配,而不是路由键

 在生产者中声明:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("8.137.76.12");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容String message = "你好,马明";//需要声明交换机的名字String  exchangeName = "direct_message_exchange";//需要声明交换机的模式String exchangeMode = "direct";//声明交换机 交换机的名称  交换机的模式 是否持久化channel.exchangeDeclare(exchangeName,exchangeMode,true);//声明队列  队列名称  是否持久化 是否具有排他性 时候自动删除  是否携带参数(headers模式需要)channel.queueDeclare("que5",true,false,false,null);channel.queueDeclare("que6",true,false,false,null);//绑定队列到交换机 队列名称  交换机名称 routingKeychannel.queueBind("que5",exchangeName,"order");channel.queueBind("que6",exchangeName,"message");String routingKey1 = "order";//都可以收到 queue-1 queue-3String routingKey2 = "message";//都可以收到 queue-2// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

核心的声明代码:

//需要声明交换机的名字String  exchangeName = "direct_message_exchange";//需要声明交换机的模式String exchangeMode = "direct";//声明交换机 交换机的名称  交换机的模式 是否持久化channel.exchangeDeclare(exchangeName,exchangeMode,true);//声明队列  队列名称  是否持久化 是否具有排他性 是否自动删除  是否携带参数(headers模式需要)channel.queueDeclare("que5",true,false,false,null);channel.queueDeclare("que6",true,false,false,null);//绑定队列到交换机 队列名称  交换机名称 routingKeychannel.queueBind("que5",exchangeName,"order");channel.queueBind("que6",exchangeName,"message");

 

当然,也可以在消费者中声明,因为通道都是同一个,都可以完成声明

需要注意的是,我们不能向一个不存在的交换机投递消息,同样的道理,我们不能向一个不存在的队列消费消息,都是会产生异常的操作

需要使用的交换机和队列请先声明

 

------END------

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/906653.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue复基本语法

以下为你用表格形式呈现 Vue 3 基本语法:类别 语法示例 说明模板语法文本插值 <div>{{ message }}</div> 使用双大括号将数据显示在模板中属性绑定 <img :src="imageUrl" alt="示例图片"> :src 是 v-bind:src 的简写,用于动态绑定属性…

vue3-webseek网页版AI问答|Vite6+DeepSeek+Arco流式ai聊天打字效果

2025 AI实战vue3+deepseek+arcoDesign仿DeepSeek/豆包网页版AI聊天助手。 vue3-web-deepseek 实战网页PC版智能AI对话,基于vite6+vue3.5+openAI对接DeepSeek-Chat聊天对话模型。实现流动式打字返回效果、支持亮色+暗黑主题、各种代码高亮、本地会话存储等功能。🐬技术栈编码…

仓库呆滞物料堆积如山?十年的仓库主管跟我说了这五点

今天咱们聊聊一个经常让很多仓库管理者头疼的问题—— 仓库呆滞物料堆积 。 说实话,仓库呆滞物料堆得满满的,不仅占用宝贵的仓储空间,还让企业的资金周转变得紧张,真的是个大麻烦! 不过,之前我和一位拥有 十年经验的仓库主管 聊了聊,得到了很多宝贵的经验,特别是在如何…

【汇总】sqlcmd 命令选项说明-MSSQL2012

一、原始帮助D:\>sqlcmd /? Microsoft (R) SQL Server 命令行工具 版本 11.0.2100.60 NT x64 版权所有 (c) 2012 Microsoft。保留所有权利。用法: Sqlcmd [-U 登录 ID] [-P 密码][-S 服务器] [-H 主机名] [-E 可信连接][-N 加密连…

导入SpaceClaim的iges模型尺寸被放大1000倍的问题

ANSYS APDL 和 Workbench 联合仿真中,导入 Workbench 的 iges 模型尺寸被放大1000倍问题的解决方案问题 ANSYS APDL 和 Workbench 联合仿真时,导入 SpaceClaim 的 .iges 模型尺寸被放大 1000 倍数。 如 APDL 生成的尺寸为 10 mm(注:此处的 mm 是在 APDL 编码中设置的一致单…

Linux的vim编辑器

Linux的vi/vim编辑器 命令模式 编辑模式 输入模式 vi/ vim编辑器的基本操作Linux的vi/vim编辑器基本概念:vim文本编辑器,是由vi编辑器发展演变过来的文本编辑器,因其具有使用简单、功能强大、是 Linux 众多发行版的默认文本编辑器。很多人习惯将 vim 称为 vi,其实 vim 是 v…

高度场流体模拟

【USparkle专栏】如果你深怀绝技,爱“搞点研究”,乐于分享也博采众长,我们期待你的加入,让智慧的火花碰撞交织,让知识的传递生生不息!一、原理参考这个论文:《Real-time Simulation of Large Bodies of Water with Small Scale Details》 核心是这两个公式: 我在这篇《…

Navicat将微软数据库MS-SQLServer表内容导入MySQL数据库

前言全局说明一、说明 1.1 环境: Windows 7 旗舰版 MSSQL 2012 Navicat for MySQL 10.1.7二、MySQL准备 用 Navicat 在 mysql 新建数据库,要和 MSSQL 数据库同名注意:编码也要一致2.1 mysql 新建数据 空白处新,建 test 数据库,2.2 数据库右键查看在mysql里新建数据库编码三…

深度解析:通过 AIBrix 多节点部署 DeepSeek-R1 671B 模型

本文详细介绍了如何通过 AIBrix 分布式推理平台实现 DeepSeek-R1 671B 的多节点部署。DeepSeek-R1 通过渐进式训练框架展现出优秀的逻辑推理能力 —— 在 6710 亿总参数量中,其动态激活的 370 亿参数与 128k 上下文窗口,使其在复杂任务处理中表现卓越。然而,如此庞大的模型规…

玄机靶场 第一章 应急响应-webshell查杀

玄机靶场 第一章 应急响应-webshell查杀 1.黑客webshell里面的flag flag2. 黑客使用的什么工具的shell github地址的md5 flag 哥斯拉webshell的特征3.黑客隐藏shell的完整路径的md5 flag{md5} 注 : /xxx/xxx/xxx/xxx/xxx.xxx 发现隐藏4.黑客免杀马完整路径 md5 flag 查看这是一…

玄机靶场 第一章 应急响应-Linux日志分析

玄机靶场 第一章 应急响应-Linux日志分析 1.有多少IP在爆破主机ssh的root帐号,如果有多个使用","分割 /var/log/auth.log里面存放了相关的登录信息 直接下载看根据user=root发现三个ip 网上发现神奇妙妙脚本 cat auth.log.1 | grep -a "Failed password for ro…

【每日一题】20250327

改变不了的事,不值得烦恼。【每日一题】 1.(15分) \(\hspace{0.7cm}\)已知数列 \(\{a_n\}\),\(\{b_n\}\) 和 \(\{c_n\}\) 满足: \[a_{n+1}=\frac14b_n \]\[b_{n+1}=a_n+c_n+\frac12b_n \]\[c_{n+1}=\frac14b_n \]且 \(a_n+b_n+c_{n}=1\),\(a_1=0\),\(b_1=1\),\(\displa…