RocketMQ5.0消息过滤

前言

消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Broker 端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。

以电商交易场景为例,用户从下单到拿到商品,中间会产生很多消息,被不同的下游系统订阅消费。下游系统往往只关心自己需要处理的消息,比如支付系统只关心支付消息,这时候生产者就可以在发送消息的时候给消息打上标签,下游系统按需订阅即可。
image.png

过滤方式

RocketMQ 支持两种消息过滤方式。

Tag标签过滤

生产者在发送消息前,可以先给消息打上标签,每条消息最多设置一个 Tag 标签:

Message message = provider.newMessageBuilder().setTopic("Trade_Topic").setTag("pay").setBody("xxx".getBytes()).build();
producer.send(message);

消费者配置 Tag 标签过滤规则:

consumer.subscribe("Trade_Topic", new FilterExpression("pay", FilterExpressionType.TAG));

Tag 标签过滤规则:

  • 单 Tag 匹配:过滤表达式为目标 Tag,相同 Tag 的消息才会投递给消费者
  • 多 Tag 匹配:过滤表达式为多个目标 Tag 用||分割,消息符合任一 Tag 就会被投递
  • 全部匹配:过滤表达式为*,所有消息都会投递

SQL属性过滤

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,每个消息都可以额外设置用户属性和系统属性,消费者订阅时可设置 SQL 语法的过滤表达式过滤多个属性。

SQL 过滤也可以实现 Tag 标签过滤的效果,Tag 属于系统属性,属性名称是 TAGS

首先,生产者发送消息前给消息设置自定义属性:

Message message = provider.newMessageBuilder().setTopic("Trade_Topic").setBody("xxx".getBytes()).addProperty("price", "99800").addProperty("region", "杭州").build();
producer.send(message);

消费者配置 SQL 过滤规则,这里以 杭州区域价格大于 100 的订单 为例:

consumer.subscribe("Trade_Topic", new FilterExpression("region='杭州' AND price>10000", FilterExpressionType.SQL92));

SQL 属性过滤使用 SQL92 语法作为过滤规则表达式,语法规范如下:
image.png

如何选择

尽量用 Tag 标签过滤,实现更加轻量级,效率更高,在扫描 ConsumeQueue 时就可以先通过 TagHash 过滤一遍。而消息属性是存储在 CommitLog 文件里的,意味着 SQL 属性过滤必须读到完整的消息才能判断是否要过滤,性能较差。

设计实现

org.apache.rocketmq.store.MessageFilter是 RocketMQ 抽象出来的消息过滤接口,两个方法:

  • isMatchedByConsumeQueue:通过 ConsumeQueue 里的 tagsCode 先匹配一次,也就是 Tag 标签的哈希码,tagsCode 不同 Tag 肯定不同
  • isMatchedByCommitLog:根据 CommitLog 里的完整消息属性匹配
public interface MessageFilter {boolean isMatchedByConsumeQueue(final Long tagsCode,final ConsumeQueueExt.CqExtUnit cqExtUnit);boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,final Map<String, String> properties);
}

RocketMQ 的处理逻辑是:先根据 ConsumeQueue 里的 tagsCode 过滤,通过了再读取 CommitLog 里的完整消息走 SQL 属性过滤,实现类会根据配置的过滤规则在不关心的过滤方法里直接返回 true。

public GetMessageResult getMessage(){......// 先通过consumequeue里的tagsCode过滤if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}// 再从CommitLog读取完整消息SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);// 再执行SQL属性过滤if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}selectResult.release();continue;}......
}

Tag 标签过滤的实现
Broker 把消息写入 CommitLog 后,ReputMessageService 线程会每隔 1ms 把新消息写入到 consumequeue 文件,以加速消费者的消费效率。ConsumeQueue 文件由若干个 CqUnit 组成,每个 CqUnit 占用固定的 20 个字节:

CqUnit{long offset; // 消息在 CommitLog 偏移量int size; // 消息长度long tagsCode; // Tag哈希码
}

image.png
消费者在消费 ConsumeQueue 时就可以直接通过 tagsCode 进行标签过滤:

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {// by tags code.if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {if (tagsCode == null) {return true;}// '*' 订阅所有if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {return true;}// 消息的tagsCode是否包含在消费者订阅的Tags里面return subscriptionData.getCodeSet().contains(tagsCode.intValue());}
}

因为是哈希码,所以 tagsCode 存在哈希冲突的可能性,不过概率极小。万一冲突了,Broker 还是会继续投递消息,RocketMQ 5.0 版本会由 Proxy 再进行一次 Tag 的精准匹配,如果不匹配不会投递给消费者;RocketMQ 4.x 版本由消费者收到消息后自行判断,Tag 不匹配的消息会直接丢弃。

SQL 属性过滤的实现
为了执行 SQL 语法实现属性过滤,SQL 语法会先被编译成 Expression 对象,再由Expression#evaluate方法得出执行结果。

Expression expression = FilterFactory.INSTANCE.get(ExpressionType.SQL92).compile("a>10 AND b<10 OR c=10");
expression.evaluate(context);

要对消息属性过滤,首先要把消息属性提取出来,消息属性由若干个 String 类型的键值对组成,然后执行 SQL。

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {// tag过滤 直接返回trueif (ExpressionType.isTagType(subscriptionData.getExpressionType())) {return true;}ConsumerFilterData realFilterData = this.consumerFilterData;// 消息属性Map<String, String> tempProperties = properties;// 没有SQL表达式if (realFilterData == null || realFilterData.getExpression() == null|| realFilterData.getCompiledExpression() == null) {return true;}if (tempProperties == null && msgBuffer != null) {// 从CommitLog解码出消息属性tempProperties = MessageDecoder.decodeProperties(msgBuffer);}Object ret = null;try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);// 执行SQL92表达式过滤ret = realFilterData.getCompiledExpression().evaluate(context);} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);}if (ret == null || !(ret instanceof Boolean)) {return false;}return (Boolean) ret;
}

尾巴

消息过滤是 RocketMQ 防止 Broker 端因为投递大量消费者不感兴趣的消息而导致资源浪费的一种手段,消费者可以根据自己感兴趣的消息类型配置过滤规则,分为 Tag 标签过滤 和 SQL 属性过滤两种方式。Tag 标签过滤效率高,因为 Broker 在构建 consumequeue 文件时会写入消息 Tag 的哈希码,直接比较哈希码可以避免通过 CommitLog 读取完整消息。SQL 针对消息属性过滤,此时必须读取到完整的消息才能过滤,效率较低。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/317570.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

海外住宅IP代理的工作原理和应用场景分析,新手必看

海外住宅IP代理作为一种技术解决方案&#xff0c;为用户提供了访问全球网络资源和维护隐私安全的方法。本文将介绍海外住宅IP代理的工作原理和应用场景&#xff0c;帮助读者更好地理解和利用这一技术。 一、工作原理 海外住宅IP代理的工作原理基于代理服务器和IP地址的转发。它…

Docker实战02|Namespace

在上一文《Docker实战01&#xff5c;容器与开发语言》中主要介绍了Docker的基本概念与Docker安装、Go语言安装等实战技巧。 本文继续针对Namespace技术展开讲解并利用Go语言进行实践。 本系列所有代码均已经开源。关公众号回复「Go语言实现Docker」即可获得。 目录 2.1.2 U…

Spring ApplicationEvent事件处理

Spring的事件 ApplicationEvent以及Listener是Spring为我们提供的一个事件监听、订阅的实现&#xff0c;内部实现原理是观察者设计模式&#xff0c;设计初衷也是为了系统业务逻辑之间的解耦&#xff0c;提高可扩展性以及可维护性。 ApplicationEvent就是Spring的事件接口Applic…

谷歌推出创新SynCLR技术:借助AI生成的数据实现高效图像建模,开启自我训练新纪元!

谷歌推出了一种创新性的合成图像框架&#xff0c;这一框架独特之处在于它完全不依赖真实数据。这个框架首先从合成的图像标题开始&#xff0c;然后基于这些标题生成相应的图像。接下来&#xff0c;通过对比学习的技术进行深度学习&#xff0c;从而训练出能够精准识别和理解这些…

leetcode:1464. 数组中两元素的最大乘积(python3解法)

难度&#xff1a;简单 给你一个整数数组 nums&#xff0c;请你选择数组的两个不同下标 i 和 j&#xff0c;使 (nums[i]-1)*(nums[j]-1) 取得最大值。 请你计算并返回该式的最大值。 示例 1&#xff1a; 输入&#xff1a;nums [3,4,5,2] 输出&#xff1a;12 解释&#xff1a;如…

海外静态IP和动态IP有什么区别?推荐哪种?

什么是静态ip、动态ip&#xff0c;二者有什么区别&#xff1f;哪种好&#xff1f;关于这个问题&#xff0c;不难发现&#xff0c;在知道、知乎上面的解释有很多&#xff0c;但据小编的发现&#xff0c;这些回答都是关于静态ip和动态ip的专业术语解释&#xff0c;普通非专业人事…

一、初识Redis与分布式系统

目录 一、Redis应用 二、实现方式 三、Redis应用 四、分布式系统 五、分布式系统实现 1、应用服务和数据库服务分离 2、引入负载均衡&#xff0c;应用服务器集群&#xff08;解决高并发&#xff09; 3、引入读写分离&#xff0c;数据库主从结构&#xff08;解决高并发&a…

CentOS 7 实战指南:文本处理命令详解

前言 在Linux系统中&#xff0c;文本处理是非常基础却又必不可少的一项技能。如果你正在使用CentOS系统&#xff0c;那么学会如何利用文本操作命令来高效地处理文本文件无疑将会是一个强有力的工具。 本篇文章将介绍一些最常用和最实用的文本操作命令&#xff0c;并通过详尽的…

为什么要用扫码出入库?

一、什么是扫码出入库管理系统 传统的仓库管理模式存在很多问题&#xff0c;如&#xff1a;货物积压、过期、丢失等。这些问题不仅影响了企业的正常运营&#xff0c;还给企业带来了经济损失。为了解决这些问题&#xff0c;扫码出入库管理系统应运而生。该系统采用先进的二维码…

利用蚁剑钓鱼上线CS

前言 ​ 中国蚁剑使用Electron构建客户端软件&#xff0c;Electron实现上用的是Node.js&#xff0c;并且Node.js能执行系统命令&#xff0c;故可以利用蚁剑的webshell页面嵌入js来直接执行命令&#xff0c;进而钓鱼来上线CS。&#xff08;类似Goby&#xff0c;Goby也是使用Ele…

MCMC:Metropolis-Hastings抽样

马尔可夫链有两个要素&#xff1a; 一步转移概率矩阵&#xff1a;初始分布&#xff1a; 如果这两个要素都确定了&#xff0c;这个链的转移行为就被完全确定下来了。我们就可以求得极限分布 &#xff0c;只需解下面这个方程即可。 但是MCMC试图解决的问题刚好是反过来。即已知…

JOSEF约瑟 断电延时继电器 SRTD-220VDC-2H2D 导轨安装

系列型号&#xff1a; SRTD-24VDC-1H1D断电延时继电器&#xff1b;SRTD-110VDC-1H1D断电延时继电器&#xff1b; SRTD-220VDC-1H1D断电延时继电器&#xff1b;SRTD-110VAC-1H1D断电延时继电器&#xff1b; SRTD-220VAC-1H1D断电延时继电器&#xff1b;SRTD-24VDC-2H断电延时继电…