RabbitMQ工作模式-主题模式

主题模式

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

使用topic类型的交换器,队列绑定到交换器、bingingKey时使用通配符,交换器将消息路由转发到具体队列时,会根据消息routingKey模糊匹配,比较灵活。

在Direct类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

这里再加入一个需求,不仅想根据日志级别进行划分,还想根据日志的来源分日志,如何来做呢?

使用topic类型的交换器, routingKey就不能随便写了,它必须是点分单词,单词可以随便写,一般按消息的特征,该点分单词字符串最长255字节。

bindingKey也必须是这种形式。top类型的交换器背后原理跟direct类型类似只要队列的bingingkey的值与消息的routingKey的匹配,队列就可以收到该消息。有两个不同

  1. * (star)匹配一个单词。
  2. # 匹配0到多个单词。

在这里插入图片描述

上报的数据的RoutingKey,格式如下

地区.业务.日志级别 如shanghai.busi.INFO 、 hangzhou.line.ERROR

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};private static final String[] BUSI_NAMES = {"product", "user", "schedule"};private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 50; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];String address =ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];String routingKey = address + "." + busiName + "." + level;String pushMsg = "地址[" + address + "],业务[" + busiName + "],级别[" + level + "],消息:" + i;channel.basicPublish("ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));}}
}

上海的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;/*** 上海地区的消费都,获取所有的上海信息*/
public class ShangHaiConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("shanghai.all.log",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);channel.basicConsume("shanghai.all.log",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("shanghai consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

所有错误日志的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class ErrorLogConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("log.all.error",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);channel.basicConsume("log.all.error",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("错误日志 consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

苏州用户的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class SuZhouUserConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("suzhou.user.consumer",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);channel.basicConsume("suzhou.user.consumer",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("suzhou consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

首先启动三个消费者,查看队列和交换器的信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ ex.busi.topic      │ topic   │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
│ source_name   │ source_kind │ destination_name     │ destination_kind │ routing_key          │ arguments │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.consumer │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ shanghai.all.log     │ queue            │ shanghai.all.log     │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ log.all.error        │ queue            │ log.all.error        │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ log.all.error        │ queue            │ #.ERROR              │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ shanghai.all.log     │ queue            │ shanghai.#           │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.*        │           │
└───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
[root@nullnull-os ~]# 

观察可以发现,此队列与消息的绑定已经成功。接下使用生产者发送消息。观察控制台输出:

错误日志消费者

错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:6
错误日志 consumer 收到数据:地址[suzhou],业务[product],级别[ERROR],消息:8
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:15
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:16
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:21
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
错误日志 consumer 收到数据:地址[hangzhou],业务[product],级别[ERROR],消息:28
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:33
错误日志 consumer 收到数据:地址[hangzhou],业务[schedule],级别[ERROR],消息:39
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46

上海地区的消费者

shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:0
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:1
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:2
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:5
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:32
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:35
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[INFO],消息:38
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:41
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:48

苏州用户的消费者

suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:37
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:45

至此topic模式操作成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/100449.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ardupilot 安装gcc-arm-none-eabi编译工具

目录 文章目录 目录摘要0简介1.下载网站2.安装摘要 本节主要记录ardupilot使用的编译器安装过程。 0简介 gcc-arm-none-eabi是GNU项目下的软件,是一个面向裸机arm的编译器。那么说了这么多介绍,它都包含什么具体功能又怎么安装与使用呢,我们继续。 1.下载网站 gcc-arm-n…

存储过程报Illegal mix of collations错误的解决方法

CREATE PROCEDURE maxAgeStudent(IN _gender CHAR) BEGINDECLARE maxage INT DEFAULT 0;SELECT max(age) INTO maxage FROM student where gender _gender;SELECT * from student WHERE age maxage and gender _gender; END; 在调用的时候 call maxAgeStudent(1) 产生了报…

git快速查看某个文件修改的所有commit

1. git blame file git blame 可以显示历史修改的每一行记录,有时候我们只想了解某个文件一共提交几次commit,只显示commit列表,这种方式显然不满足要求。 2.git log常规使用 (1)显示整个project的所有commit (2)显示某个文件的所有commit 这是git log不添加参数的常规…

c语言每日一练(14)【加强版】

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。博主有时会将一些难题综合成每日一练加强版&#xff0c;加强版是特殊的&#xff0c;它仅包含5道选择题&#x…

ChatGPT集锦

目录 1. 一条指令让ChatGPT变的更强大2. 对ChatGPT提问时,常见的10种错误描述3. Custom instructions如何设置1. 一条指令让ChatGPT变的更强大 在使用GPT的过程中,如何让AI更清晰地了解你的需求很重要?今天分享一个指令,可以让GPT成为你的好同事,与你一起分析和解决问题,…

线性空间、子空间、基、基坐标、过渡矩阵

线性空间的定义 满足加法和数乘封闭。也就是该空间的所有向量都满足乘一个常数后或者和其它向量相加后仍然在这个空间里。进一步可以理解为该空间中的所有向量满足加法和数乘的组合封闭。即若 V 是一个线性空间&#xff0c;则首先需满足&#xff1a; 注&#xff1a;线性空间里面…

智能小车—PWM方式实现小车调速和转向

目录 1. 让小车动起来 2. 串口控制小车方向 3. 如何进行小车PWM调速 4. PWM方式实现小车转向 1. 让小车动起来 电机模块开发 L9110s概述 接通VCC&#xff0c;GND 模块电源指示灯亮&#xff0c; 以下资料来源官方&#xff0c;具体根据实际调试 IA1输入高电平&#xff0c…

AR产业变革中的“关键先生”和“关键力量”

今年6月的WWDC大会上&#xff0c;苹果发布了头显产品Vision Pro&#xff0c;苹果CEO库克形容它&#xff1a; 开启了空间计算时代。 AR产业曾红极一时&#xff0c;但因为一些技术硬伤又减弱了声量&#xff0c;整个产业在起伏中前行。必须承认&#xff0c;这次苹果发布Vision P…

h5 ws 客户端 监听ws服务器广播的信息

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>AI智能写作</title><!-- Bootstrap CSS --><meta charset"utf-8"><meta name"viewport" content"widt…

RabbitMQ:hello结构

1.在Linux环境上面装入rabbitMQ doker-compose.yml version: "3.1" services:rabbitmq:image: daocloud.io/library/rabbitmq:managementrestart: alwayscontainer_name: rabbitmqports:- 6786:5672- 16786:15672volumes:- ./data:/var/lib/rabbitmq doker-compos…

基于神经网络结合紫外差分光谱的二氧化硫浓度定量预测

基于神经网络结合紫外差分光谱的二氧化硫浓度定量预测 前言一、代码运行1. 解压数据2. 导包3. 读取数据4. 构建网络5. 设置优化器6. 模型训练7. 可视化loss8. 模型验证 二、结果展示三、总结作者简介 前言 二氧化硫&#xff08;SO2&#xff09;是一种常见的环境污染物&#xff…

【项目经验】elementui抽屉(从下到上方向)实现向上拉伸

效果图 直接上代码 <template><div><el-button click"drawerBtn" type"primary" style"margin-left: 16px;">点我打开</el-button><el-drawer title"我是标题" :modal"false" :wrapperClosable…