RabbitMQ Stream插件使用详解

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId><version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

@Bean
Queue stream() {return QueueBuilder.durable("stream.queue1").stream().build();
}

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {return new StreamAdmin(env, sc -> {sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();sc.stream("stream.queue2").create();});
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {CompletableFuture<Boolean> send(Message message);CompletableFuture<Boolean> convertAndSend(Object message);CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);MessageBuilder messageBuilder();MessageConverter messageConverter();StreamMessageConverter streamMessageConverter();@Overridevoid close() throws AmqpException;}

RabbitStreamTemplate实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}public void setMessageConverter(MessageConverter messageConverter) {
}public void setStreamConverter(StreamMessageConverter streamConverter) {
}public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {void onStreamMessage(Message message, Context context);}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");template.setProducerCustomizer((name, builder) -> builder.name("test"));return template;
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {return new StreamRabbitListenerContainerFactory(env);
}@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {...
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);factory.setNativeListener(true);factory.setConsumerCustomizer((id, builder) -> {builder.name("myConsumer").offset(OffsetSpecification.first()).manualTrackingStrategy();});return factory;
}@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {...context.storeOffset();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue1").stream().build();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue2").stream().build();
}

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {StreamRetryOperationsInterceptorFactoryBean rfb =new StreamRetryOperationsInterceptorFactoryBean();rfb.setRetryOperations(retryTemplate);rfb.setStreamMessageRecoverer((msg, context, throwable) -> {...});return rfb;
}

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i).mapToObj(j -> "rk-" + j).collect(Collectors.toList()));
}

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");template.setSuperStreamRouting(message -> {// some logic to return a String for the client's hashing algorithm});return template;
}

你也可以通过AMQP发布,使用 RabbitTemplate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/625429.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java工程师常见面试题:Java基础(一)

1、JDK 和 JRE 有什么区别&#xff1f; JDK是Java开发工具包&#xff0c;它包含了JRE和开发工具&#xff08;如javac编译器和java程序运行工具等&#xff09;&#xff0c;主要用于Java程序的开发。而JRE是Java运行环境&#xff0c;它只包含了运行Java程序所必须的环境&#xf…

Java springboot使用EasyExcel读Excel文件,映射不到属性值,对象属性值都是null

如果你的类上有这个注解&#xff0c;去掉火或注释掉就可以了 Accessors(chain true)解决方法

vue 常用的日历排班,带农历显示组件(2024-04-16)

显示当前月日历组件&#xff0c;里面带农历或节日显示 后面可以丰富一些国家法定节假期的业务需求 代码 js-calendar.js 文件 var lunarInfo [0x04bd8, 0x04ae0, 0x0a570, 0x054d5, 0x0d260, 0x0d950, 0x16554, 0x056a0, 0x09ad0, 0x055d2, //1900-19090x04ae0, 0x0a5b6, 0…

【大数据】分布式文件系统HDFS

目录 1.什么是分布式文件系统 2.HDFS的特点 3.HDFS的核心概念 4.HDFS的体系结构 5.HDFS的配置建议 6.HDFS的局限性 7.HDFS的存储机制 7.1.数据冗余机制 7.2.错误与恢复 8.HDFS数据读写过程 1.什么是分布式文件系统 分布式文件系统是整个大数据技术的基础&#xff0c…

常用序号、标点符号 相关正则表达式

(?:[\(|&#xff08;|\[])?\d[\]|\)&#xff09;|\、]|[\u2460-\u2473]|[\u4e00-\u5341][.|、]匹配序号 \d\.(?!\d)|\d、常规序号匹配&#xff1a; rule1: 标准格式1. 2、 rule2:排除小数 [^\u4E00-\u9FA5\uFF00-\uFFEFa-zA-Z0-9\s]所有符号 [\u3000-\u303F\uFF00-\uFFE…

STM32 软件I2C方式读取MT6701磁编码器获取角度例程

STM32 软件I2C方式读取MT6701磁编码器获取角度例程 &#x1f4cd;相关篇《STM32 软件I2C方式读取AS5600磁编码器获取角度例程》&#x1f33f;《Arduino通过I2C驱动MT6701磁编码器并读取角度数据》&#x1f530;MT6701芯片和AS5600从软件读取对比&#xff0c;只是读取的寄存器和…

LeetCode 409—— 最长回文串

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 要想组成回文串&#xff0c;那么只有最中间的字符可以是奇数个&#xff0c;其余字符都必须是偶数个。 所以&#xff0c;我们先遍历一遍字符串&#xff0c;统计出每个字符出现的次数。 然后如果某个字符出现了偶…

xcode c++项目设置运行时参数

在 Xcode 项目中&#xff0c;你可以通过配置 scheme 来指定在运行时传递的参数。以下是在 Xcode 中设置运行时参数的步骤&#xff1a; 打开 Xcode&#xff0c;并打开你的项目。在 Xcode 菜单栏中&#xff0c;选择 "Product" -> "Scheme" -> "E…

滤波器笔记(杂乱)

线性相位是时间平移&#xff0c;相位不失真 零、基础知识 1、用相量表示正弦量 https://zhuanlan.zhihu.com/p/345546880 https://www.zhihu.com/question/347763932/answer/1103938667 A s i n ( ω t θ ) ⇔ A e j θ ⇔ A ∠ θ Asin(\omega t\theta) {\Leftrightarrow…

[沫忘录]MySQL索引

[沫忘录]MySQL索引 索引概述 优点 提高数据检索效率&#xff0c;降低数据库IO成本通过索引对数据进行排序&#xff0c;降低数据排序成本&#xff0c;降低CPU消耗 缺点 索引会占用一定空间当更新数据时&#xff0c;也需更新索引数据&#xff0c;这会降低数据的更新效率 索引…

Visual Studio 2019 社区版下载

一、网址 https://learn.microsoft.com/zh-cn/visualstudio/releases/2019/release-notes#start-window 二、选择这个即可

负载均衡原理、算法与实现方式

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、前言 1、什么是负载均衡 2、负载均衡的应用场…