记一次项目所学(中间件等)-动态提醒功能(RocketMQ)

记一次项目所学(中间件等)–动态提醒功能(RocketMQ)

订阅发布模式与观察者模式

在这里插入图片描述

在这里插入图片描述

RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务

Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快

RocketMQ相关工具类及配置实现

配置类

 
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.2.RELEASE</version></dependency>

生产者发送消息工具类

public class RocketMQUtil {//同步发送消息public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{SendResult result = producer.send(msg);System.out.println(result);}//异步发送消息public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}

RocketMQ配置类

@Configuration
public class RocketMQConfig {//  rocketMQ名称服务器的地址@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;//生产者@Bean("momentsProducer")public DefaultMQProducer momentsProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);producer.setNamesrvAddr(nameServerAddr);producer.start();return producer;}@Bean("momentsConsumer")//push 为推送,还有拉取等consumerpublic DefaultMQPushConsumer momentsConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);consumer.setNamesrvAddr(nameServerAddr);//订阅    *表示所有内容consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");//消费者监听器,监听到后下一步操作//registerMessageListener注册消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//ConsumeConcurrentlyStatus并发处理//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){MessageExt msg = msgs.get(0);if(msg == null){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//取出的是byte数组类型String bodyStr = new String(msg.getBody());UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);Long userId = userMoment.getUserId();//定位粉丝idList<UserFollowing>fanList = userFollowingService.getUserFans(userId);for(UserFollowing fan : fanList){//发到redis用户到redis拿String key = "subscribed-" + fan.getUserId();//把动态列表拿出来String subscribedListStr = redisTemplate.opsForValue().get(key);List<UserMoment> subscribedList;if(StringUtil.isNullOrEmpty(subscribedListStr)){subscribedList = new ArrayList<>();}else{//转换列表的类subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);}subscribedList.add(userMoment);//把列表再转成字符串放进去redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();return consumer;}

具体业务逻辑:

@Service
public class UserMomentsService {@Autowiredprivate UserMomentsDao userMomentsDao;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addUserMoments(UserMoment userMoment) throws Exception {userMoment.setCreateTime(new Date());//cruduserMomentsDao.addUserMoments(userMoment);DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");//主题 以及json的数组消息Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));RocketMQUtil.syncSendMsg(producer, msg);}// 查询订阅动态public List<UserMoment> getUserSubscribedMoments(Long userId) {String key = "subscribed-" + userId;//查出来的是String描述的json类型String listStr = redisTemplate.opsForValue().get(key);//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中return JSONArray.parseArray(listStr, UserMoment.class);}
}

PS:消费信息逻辑在配置类的Consumer中已经写好了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526092.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解释区块链技术的应用场景、优势及经典案例

目录 1.区块链应用场景 2.区块链优势 3.区块链经典案例 区块链技术是一种分布式账本技术&#xff0c;它通过加密和安全验证机制&#xff0c;允许网络中的多个参与者之间进行可信的、不可篡改的交易和数据的记录与传输。区块链技术的应用场景广泛&#xff0c;其优势也十分显著…

从零开始:神经网络(2)——MP模型

声明&#xff1a;本文章是根据网上资料&#xff0c;加上自己整理和理解而成&#xff0c;仅为记录自己学习的点点滴滴。可能有错误&#xff0c;欢迎大家指正。 神经元相关知识&#xff0c;详见从零开始&#xff1a;神经网络——神经元和梯度下降-CSDN博客 1、什么是M-P 模型 人…

Android UI自动化测试框架—SoloPi简介

1、UI自动化测试简介 软件测试简介 ​软件测试是伴随着软件开发一同诞生的&#xff0c;随着软件规模大型化&#xff0c;结构复杂化&#xff0c;软件测试也从最初的简单“调试”&#xff0c;发展到当今的自动化测试。 ​ 自动化测试是什么呢&#xff1f;自动化测试是把以人为…

Solidity Uniswap V2 价格预言机

预言机是连接区块链与链下服务的桥梁&#xff0c;这样就可以从智能合约中查询现实世界的数据。Chainlink 是最大的oracle网络之一&#xff0c;创建于 2017 年&#xff0c;如今已成为许多 DeFi 应用的重要组成部分。https://github.com/XuHugo/solidityproject Uniswap 虽然是链…

Learn OpenGL 03 着色器

GLSL 着色器的开头总是要声明版本&#xff0c;接着是输入和输出变量、uniform和main函数。每个着色器的入口点都是main函数&#xff0c;在这个函数中我们处理所有的输入变量&#xff0c;并将结果输出到输出变量中。 一个典型的着色器有下面的结构&#xff1a; #version vers…

深入理解Java泛型:灵活、安全、可重用的编程利器

Java泛型是一项强大的编程特性&#xff0c;为程序员提供了一种灵活、类型安全、可重用的编码方式。通过泛型&#xff0c;我们能够编写更加通用、适应多种数据类型的代码&#xff0c;从而提高了代码的灵活性和可维护性。在这篇博客中&#xff0c;我们将深入探讨Java泛型的各个方…

【Flink】Flink 的八种分区策略(源码解读)

Flink 的八种分区策略&#xff08;源码解读&#xff09; 1.继承关系图1.1 接口&#xff1a;ChannelSelector1.2 抽象类&#xff1a;StreamPartitioner1.3 继承关系图 2.分区策略2.1 GlobalPartitioner2.2 ShufflePartitioner2.3 BroadcastPartitioner2.4 RebalancePartitioner2…

全栈的自我修养 ———— css中常用的布局方法flex和grid

在项目里面有两种常用的主要布局:flex和grid布局&#xff08;b站布局&#xff09;&#xff0c;今天分享给大家这两种的常用的简单方法&#xff01; 一、flex布局1、原图2、中心对齐3、主轴末尾或者开始对其4、互相间隔 二、grid布局1、基本效果2、加间隔3、放大某一个元素 一、…

政安晨:【深度学习处理实践】(四)—— 实施一个温度预测示例

在开始使用像黑盒子一样的深度学习模型解决温度预测问题之前&#xff0c;我们先尝试一种基于常识的简单方法。 它可以作为一种合理性检查&#xff0c;还可以建立一个基准&#xff0c;更高级的机器学习模型需要超越这个基准才能证明其有效性。对于一个尚没有已知解决方案的新问…

HTML 学习笔记(四)图片

<!--通过图片标签"<img src "图片路径">"来调用图片在网页中进行显示--> <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthd…

Linux网络套接字之预备知识

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ 目录 一、预备知识 1.理解源IP地址和目的IP地址 …

Chain of Verification(验证链、CoVe)—理解与实现

原文地址&#xff1a;Chain of Verification (CoVe) — Understanding & Implementation 2023 年 10 月 9 日 GitHub 存储库 介绍 在处理大型语言模型&#xff08;LLM&#xff09;时&#xff0c;一个重大挑战&#xff0c;特别是在事实问答中&#xff0c;是幻觉问题。当答案…