RocketMQ sql92的使用及原理简单分析附源码

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ 版本

  • 5.1.0

RokcetMQ消息过滤

目前官方支持的消息过滤方式主要有两种

  • tag
  • sql92

我们可以通过查看ExpressionType的源码证明

tag过滤方式是现在最为常用的过滤方式,但是一个消息只能包含一个tag。

对于相对复杂的消息过滤场景tag过滤方式可能就不够用了,但是绝大多数业务场景tag过滤方式已经够用了。

sql92过滤方式可以有助于我们实现一些高级功能,比如RocketMQ的多测试环境消息隔离等。

这里就暂时不过多讨论sql92的具体使用场景,我们还是先来学习怎么使用sql92

sql92 语法规则

语法说明示例
IS NULL判断属性不存在。a IS NULL :属性a不存在。
IS NOT NULL判断属性存在。a IS NOT NULL:属性a存在。
> >= < <=用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。 a IS NOT NULL AND a > ‘abc’:错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx)表示属性的值在某个集合内。集合的元素只能是字符串。a IS NOT NULL AND (a IN (‘abc’, ‘def’)):属性a存在且属性a的值为abc或def。
= <>等于和不等于。可用于比较数字和字符串。a IS NOT NULL AND (a = ‘abc’ OR a<>‘def’):属性a存在且属性a的值为abc或a的值不为def。
AND OR逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。

由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。

  • 空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。

  • 数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

sql92使用

源码

所有源码已上传至github

  • 地址:https://github.com/weihubeats/weihubeats_demos/tree/master/java-demos/rocketmq-demo/src/main/java/com/weihubeats/rocketmq/demo/sql92

消息发送

public class SQLProducer {public static int count = 10;public static String topic = "xiao-zou-topic";public static void main(String[] args) {DefaultMQProducer producer = MQUtils.createLocalProducer();IntStream.range(0, count).forEach(i -> {Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));try {if (i % 2 == 0) {message.putUserProperty("gray", "dev1");}SendResult sendResult = producer.send(message);DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));}catch (Exception e) {throw new RuntimeException(e);}});producer.shutdown();}
}

这里我们假装消息是发送个多个测试的消息,所以每条消息都在UserProperty添加了一个dev1标签。

我们要实现的就是比如只有dev1环境的消费者才会消费带有dev1标签的消息,其他消息则丢弃掉

消息消费

public class SQLConsumer {public static String GID = "xiao-zou-gid";public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);String sql = "gray is not null and gray = 'dev1'";consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/**  Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}

这里的消息消费方式唯一不同的是我们订阅消息的方式发生了变化

普通方法我们调用的是这个方法进行消息订阅的,传入tag就行

比如像这样

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

但是这里我们使用的是sql92方式

传入的是一个MessageSelector,订阅的规则是

String sql = "gray is not null and gray = 'dev1'";

运行效果

  • 消息发送

这里我们发送了十条消息,只有5条是带有gray标签的

  • 消息消费

可以看到消息消费只有消费了带有gray标签的5条消息,符合我们的预期

sql92是在客户端还是在服务端过滤的?

sql92tag都是在服务端过滤的,我们可以查看源码得知

不过tag的过滤方式会在客户端再次过滤。因为在服务端是通过hashcode进行过滤的,为了提高性能,没有对原始的tag进行过滤,在通过hashcode过滤掉绝大多少的消息后,在客户端进行最后的tag完全过滤。

org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult

如果统一都在客户端过滤会导致传输大量的消息到客户端,影响性能

总结

本次我们对RocketMQ sql92过滤消息进行了简单的使用以及少量的源码分析,并没有完整的从整个流程进行分析,因为本篇并不是源码分析偏。sql92在实际的项目中的相对来说较少,偶尔如果做RocketMQ消息的多册环境或者灰度,可能是一个方案,但不是最佳的

参考

  • 官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/157183.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《算法通关村——缓存机制了解LRU实现》

《算法通关村——缓存机制了解LRU实现》 介绍 LRU是"Least Recently Used"&#xff08;最近最少使用&#xff09;的缓存机制&#xff0c;它是一种常用的缓存算法&#xff0c;用于管理缓存中的数据项。LRU缓存机制的基本思想是&#xff0c;当缓存达到其容量限制时&a…

SPSS卡方检验

前言&#xff1a; 本专栏参考教材为《SPSS22.0从入门到精通》&#xff0c;由于软件版本原因&#xff0c;部分内容有所改变&#xff0c;为适应软件版本的变化&#xff0c;特此创作此专栏便于大家学习。本专栏使用软件为&#xff1a;SPSS25.0 本专栏所有的数据文件请点击此链接下…

阿里云推出AI编程工具“通义灵码“;生成式 AI 入门教程 2

&#x1f989; AI新闻 &#x1f680; 阿里云推出AI编程工具"通义灵码"&#xff0c;支持多种语言及实时续写功能 摘要&#xff1a;阿里云推出了一款名为"通义灵码"的AI编程工具&#xff0c;支持多种主流编程语言&#xff0c;包括Java、Python、Go等。该工…

浅谈AcrelEMS-CB商业建筑能源管理系统解决方案-安科瑞 蒋静

1概述 AcrelEMS-CB商业建筑能源管理系统&#xff0c;集电力监控、电能质量监测与治理、电气安全预警、能耗分析、照明控制、新能源使用、能源收费以及设备运维等功能于一体&#xff0c;通过一套系统对商业建筑的能源进行统一监控、统一运维和调度&#xff0c;系统可以通过WEB和…

Ubuntu MySQL客户端功能介绍(mysql-client)mysql命令(mysql客户端命令)数据库导出、数据库导入

文章目录 Ubuntu MySQL客户端(mysql-client)功能介绍MySQL客户端与服务端服务器端&#xff08;MySQL Server&#xff09;客户端&#xff08;MySQL Client&#xff09; 安装MySQL客户端连接到MySQL服务器&#xff08;mysql -h host -u user -p&#xff09;执行SQL查询批处理模式…

解决问题Conda:CondaValueError: Malformed version string ‘~’ : invalid character(s)

解决问题Conda&#xff1a;CondaValueError: Malformed version string ‘~’ : invalid character(s) 背景 今天使用Conda构建项目运行环境的时候报错&#xff1a;&#xff1a;CondaValueError: Malformed version string ‘~’ : invalid character(s) ##报错问题 在安装te…

【Linux】 su 命令使用

su&#xff08;英文全拼&#xff1a;switch user&#xff09;命令用于变更为其他使用者的身份&#xff0c;除 root 外&#xff0c;需要键入该使用者 的密码。使用权限&#xff1a;所有使用者。 语法 su [选项] [-] [USER [参数]...] su命令 -Linux手册页 著者 作者&#xff1…

shell script 学习案例

或许进度最快的方法就是先抄袭&#xff0c;在改进&#xff0c;哈哈。学习下各个案例吧 第一&#xff1a;写过程序的都知道&#xff0c;第一个学的就是输出"Hello World!",那么学习shell也是一样&#xff0c;上案例 因为有脚本文件&#xff0c;所有同时 开了两个终端…

Mac -- zsh-最新全网超详细的个性化终端(Terminal)颜色及vim颜色配置(亲测可行)

转自 Mac -- zsh-最新全网超详细的个性化终端(Terminal)颜色及vim颜色配置(亲测可行)_mac zsh-CSDN博客 以下都是苹果 设置&#xff0c;这是简化版的&#xff0c;详细的看我引用的 个性化终端颜色背景设置 显示检查器 打开终端&#xff0c;鼠标在终端中&#xff0c;右击&…

NLP 快速入门

文章目录 前言NLP 历史回顾NLP任务语料的标注AI语料标注师岗位职责 TransformersHugging Face模型中文文本分类使用 NLTK 进行文本分类 参考链接开源NLP 前言 学习NLP&#xff0c;解决两个问题&#xff1a; 如何使用别人训练好的模型&#xff1f;如何基于别人的模型&#xff…

Linux学习第26天:异步通知驱动开发: 主动

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 在正式开启今天的学习前&#xff0c;讲一讲为什么标题中加入了【主动】俩字。之前学习的阻塞和非阻塞IO&#xff0c;都是在被动的接受应用程序的操作。而今天的学…

【Linux】jdk Tomcat MySql的安装及Linux后端接口部署

一&#xff0c;jdk安装 1.1 上传安装包到服务器 打开MobaXterm通过Linux地址连接到Linux并登入Linux&#xff0c;再将主机中的配置文件复制到MobaXterm 使用命令查看&#xff1a;ll 1.2 解压对应的安装包 解压jdk 解压命令&#xff1a;tar -xvf jdk 加键盘中Tab键即可…