1.生产端
public class SQLProducer {public static int count = 10;public static String topic = "xiao-zou-topic";public static void main(String[] args) {DefaultMQProducer producer = MQUtils.createLocalProducer();IntStream.range(0, count).forEach(i -> {Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));try {if (i % 2 == 0) {message.putUserProperty("gray", "dev1");}SendResult sendResult = producer.send(message);DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));}catch (Exception e) {throw new RuntimeException(e);}});producer.shutdown();}
}
2.消费端
public class SQLConsumer {public static String GID = "xiao-zou-gid";public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);String sql = "gray is not null and gray = 'dev1'";consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});/** Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}
3.语法规则
4.原理
-
当消息到达 Broker 时,Broker 会将消息与对应的订阅关系进行匹配。
-
如果该订阅关系包含 SQL92 表达式,则将该表达式传递给消息过滤器。
-
消息过滤器使用 Antlr4 解析器解析 SQL92 表达式,并将其转换为语法树。
-
一旦表达式被转换为语法树,过滤器就可以开始遍历语法树,并使用消息属性和自定义属性来匹配表达式中的条件。
-
如果消息属性和自定义属性匹配 SQL92 表达式中的条件,则过滤器将消息传递给消费者。
-
如果消息属性和自定义属性不匹配 SQL92 表达式中的条件,则过滤器将跳过该消息,并继续匹配其他消息。