redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.6</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @Description: TODO**/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);// 创建一个线程池private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());@Overridepublic void onMessage(MapRecord message) {// 异步处理消息threadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));});}
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.stream.Collectors;/*** @Description: TODO**/
@Configuration
public class RedisMQConfig {@Autowiredprivate RedisMQListener redisMQListener;@Autowiredprivate RedisUtils redisUtils;private static RedisTemplate<Object, Object> redisTemplate;private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {this.redisTemplate = redisTemplate;}@Beanpublic Subscription subscription(RedisConnectionFactory redisConnectionFactory) {if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);if (xInfoGroups.isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);} else {if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}}} else {redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);}StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);streamMessageListenerContainer.start();return subscription;}}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {String key = jsonObject.getString("key");String value = jsonObject.getString("value");MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));redisTemplate.opsForStream().add(mapRecord);System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());return formatResult(null);}

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/339685.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32Cubemx PWM驱动SG90舵机

一、SG90相关介绍 名 称&#xff1a;9克舵机 180度 尺 寸&#xff1a;23mmX12.2mmX29mm 重 量&#xff1a;9克 扭 矩&#xff1a;1.5kg/cm 工 作 电 压: 4.2-6V 温 度 范 围:0℃--55℃ 运 行 速 度&#xff1a;0.3秒/60度 死 带 宽:10微秒 二、Cubemx配置 sys配置 RCC配置 LED…

JavaWeb- Tomcat

一、概念 老规矩&#xff0c;先看维基百科&#xff1a;Apache Tomcat (called "Tomcat" for short) is a free and open-source implementation of the Jakarta Servlet, Jakarta Expression Language, and WebSocket technologies.[2] It provides a "pure Ja…

docker 容器添加指定网络地址

docker 容器添加指定网络地址 在搭建halo博客时&#xff0c;准备让 halo、mysql8.1、nginx 三个容器在同一个网段中&#xff0c;并指定IP。 实现docker内部容器之间网络互通。 查看容器网络信息命令 docker inspect 容器名各容器部署成功后网络效果如下&#xff1a; nginx …

实现秒杀功能设计

页面 登录页面 登录成功后&#xff0c;跳转商品列表 商品列表页 加载商品信息 商品详情页 根据商品id查出商品信息返回VO&#xff08;包括rmiaoshaStatus、emainSeconds&#xff09;前端根据数据展示秒杀按钮&#xff0c;点击开始秒杀 订单详情页 秒杀页面设置 后端返回秒杀…

一起玩儿物联网人工智能小车(ESP32)——24. 变量与函数(二)

摘要&#xff1a;本文介绍变量和函数的基本知识 在前面一篇中了解了变量&#xff0c;接着就来了解一下函数。函数是程序中的一个关键概念&#xff0c;它可以简化程序的编写&#xff0c;使代码更加模块化、可复用&#xff0c;提高程序的可读性。其实在之前已经多次遇到函数了&am…

跟我学java|Stream流式编程——并行流

什么是并行流 并行流是 Java 8 Stream API 中的一个特性。它可以将一个流的操作在多个线程上并行执行&#xff0c;以提高处理大量数据时的性能。 在传统的顺序流中&#xff0c;所有的操作都是在单个线程上按照顺序执行的。而并行流则会将流的元素分成多个小块&#xff0c;并在多…

快速了解扭矩、拧紧得技术原理——SunTorque智能扭矩系统

在机械工程领域&#xff0c;扭矩是一个至关重要的参数&#xff0c;扭矩大小、是否到位它直接影响到拧紧工作的质量和安全性。SunTorque智能扭矩系统将深入探讨扭矩在拧紧过程中的技术原理。 扭矩&#xff0c;也称为力矩&#xff0c;是指作用在物体上的力与力臂的乘积。在拧紧过…

c#图片作为鼠标光标

图片转换为鼠标光标代码如下&#xff1a; private void Form1_Load(object sender, EventArgs e) {//button1.Cursor System.Windows.Forms.Cursors.Hand;Bitmap bmp new Bitmap("780.jpg");Cursor cursor new Cursor(bmp.GetHicon());button1.Cursor cursor;} …

QWebEngineView类中的load、seturl、setPage、setHtml和setContent方法的功能与用法对比

文章目录 📖 介绍 📖🏡 环境 🏡📒 对比 📒📝 load方法📝 setUrl方法📝 setPage方法📝 setHtml方法📝 setContent方法📖 介绍 📖 QWebEngineView 是 Qt 提供的一个用于呈现 Web 内容的类,基于 Google 的 Chromium 浏览器引擎。它提供了对现代 Web 标…

网安入门13-文件上传(htaccess,其他绕过)

空格绕过&#xff0c;点号绕过 Pass-07 直接上传肯定是失败的 把文件名1.php改成1.php.或1.php_(下划线为空格)&#xff0c;这种命名方式在windows系统里是不被允许的&#xff0c;所以需要在burp之类里进行修改&#xff0c;然后绕过验证后&#xff0c;会被windows系统自动去掉…

怎么看待存在争议的低代码?

一、低代码直接效果怎么样&#xff1f; 以体验过的JNPF平台为例&#xff0c;JNPF低代码开发的过程就是可以通过拖拉拽的方式去完成软件开发&#xff0c;复杂功能可以通过二次开发来解决&#xff0c;提升开发效率&#xff0c;降低开发成本。 给大家举个例子&#xff0c;以我们熟…

ubuntu20.04 扩大交换空间swap

检查当前swap情况 free -msudo swapon --show关闭现有的swap sudo swapoff -a创建一个新的swap文件 sudo fallocate -l 32G /swapfile设定正确的权限 sudo chmod 600 /swapfile下面这个指令会把我们的空间变成可用的swap空间 sudo mkswap /swapfile启用swap文件 sudo swa…