系列十一(实战)、发送 接收带标签的消息(Java操作RocketMQ)

一、发送 & 接收带标签的消息

1.1、概述

        消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。

        其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。

1.2、订阅关系一致性

        订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考

订阅关系一致文档,这里不再赘述。

1.3、Demo07MQTestApp 

/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 13:03* @Description: 发送 & 接收带标签的消息*/
@Slf4j
public class Demo07MQTestApp {/*** 发送带标签的消息*/@Testpublic void demo7Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建消息String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};for (int i = 1; i <= 6; i++) {String tag = tags[i % tags.length];String content = "";switch (tag) {case "NBA":content = "this is a message about NBA,消息编号[" + i + "]";break;case "run":content = "this is a message about run,消息编号[" + i + "]";break;case "star":content = "this is a message about star,消息编号[" + i + "]";break;case "mobile":content = "this is a message about mobile,消息编号[" + i + "]";break;case "tourism":content = "this is a message about tourism,消息编号[" + i + "]";break;default:content = "this is a message about foods,消息编号[" + i + "]";break;}Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));// 5、发送消息producer.send(message);log.info("【demo7Producer】发送消息成功,消息内容:{}",content);}// 关闭producerproducer.shutdown();}/*** 接收带标签的消息(Push方式)*/@Testpublic void demo7PushConsumer1() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}@Testpublic void demo7PushConsumer2() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA || star || mobile");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}

1.4、测试

        先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。

1.5、Topic和Tag如何选择

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/298763.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【浏览器】同源策略和跨域

1. 什么是跨域 在说跨域之前,先说说同源策略,什么是同源策略呢?同源策略是浏览器的一种安全机制,减少跨站点脚本攻击(XSS,Cross Site Scripting)、跨站点请求伪造(CSRF,Cross Site Request Forgery)攻击等,因为非同源的请求会被浏览器拦截掉。 同源就是协议、域名(…

doris基本操作,04-表结构的变更

整体思路 因为doris完全兼容mysql协议&#xff0c;因此操作上基本与mysql没有太大区别&#xff0c; 之需要注意与doris相关的关键字即可&#xff0c;如&#xff1a;预聚合&#xff0c;key等 当前表结构 create table table1 {siteid int default 10,citycode smallint,usern…

【Unity地形】使用地形工具创建场景环境-Terrain

如上图Unity的地形工具可以让我们实现创建复杂、丰富的3D室外环境。 我们创建地形很简单&#xff0c;在层级面板中右键-3Dobject-Terrain 就可以创建一个默认的地形模型&#xff01;这个模型是Unity内置的。 接下来的地形编辑功能全部集中在这个地形的组件上 主要功能如下&…

Goland配置leetcode

1. 安装 首先在goland的setting界面上找到Plugins&#xff0c;然后搜索关键字leetcode&#xff0c;找到LeetCode Editor&#xff0c;安装它。 在安装后&#xff0c;第一次需要对其进行配置&#xff0c;在Tools中找到LeetCode Plugins&#xff0c;如下图所示进行配置。首先国内…

关于调试和开发中对文件写操作导致乱码问题

背景基于上文log机制重定向问题&#xff0c;将代码打印单独存放文件中出现双击文件&#xff0c;如下图现象所示(银河麒麟系统) 使用vim打开文件发现有许多/00的乱码。 怀疑是数据没有同步至硬盘导致的。 于是在每次输入到文件后加入fdatasync函数&#xff0c;部分代码如下&am…

three.js使用精灵模型Sprite渲染森林

效果&#xff1a; 源码&#xff1a; <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div><div class"box-right&quo…

React学习计划-React16--React基础(六)路由

路由 一、版本5路由 1. react-router-dom 2. 路由的使用 1. 基础使用 安装&#xff1a;yarn add react-router-dom5明确好界面中的导航区、展示区导航区Link标签包裹 <Link to"/home">Home</Link>展示区写在Route标签进行匹配 <Route path/home …

2024年PMP考试新考纲-PMBOK第七版-项目管理原则真题解析(续3)

马上就要进入2024年了&#xff0c;要参加2024年PMP一季度考试的小伙伴可以准备起来了。2024年的PMP考试将继续采用新考试大纲&#xff0c;考试内容包括PMBOK第六版、PMBOK第七版和敏捷实践指南&#xff0c;而且敏捷&#xff08;或者叫混合&#xff09;的项目环境将占比超过50%&…

2024年最新Python爬虫入门『最强教程』新鲜出炉!

近年来&#xff0c;大数据成为业界与学术界最火热的话题之一&#xff0c;数据已经成为每个公司极为重要的资产。互联网大量的公开数据为个人和公司提供了以往想象不到的可以获取的数据量。而掌握网络爬虫技术可以帮助你获取这些有用的公开数据集。 爬虫能干什么呢&#xff1f;一…

chatglm3本地部署(综合Demo版本)教程

1.下载代码 我是在本地c盘中&#xff0c;创建了一个glm3文件夹&#xff0c;然后把代码拉到这个文件夹里&#xff0c;所以在clone之前&#xff0c;需要在cmd中&#xff0c;把路径切换到glm3文件夹下&#xff0c;然后执行如下代码。 git clone https://github.com/THUDM/ChatGL…

第十九节TypeScript 模块

1、TypeScript模块&#xff1a; 模块是在其自身的作用域里执行&#xff0c;并不是在全局作用域&#xff0c;这意味着定义在模块里面的变量、函数和类等在模块外部是不可见的&#xff0c;除非明确地使用 export 导出它们。类似地&#xff0c;我们必须通过 import 导入其他模块导…

源码系列 之 ThreadLocal

简介 ThreadLocal的作用是做数据隔离&#xff0c;存储的变量只属于当前线程&#xff0c;相当于当前线程的局部变量&#xff0c;多线程环境下&#xff0c;不会被别的线程访问与修改。常用于存储线程私有成员变量、上下文&#xff0c;和用于同一线程&#xff0c;不同层级方法间传…