出售短视频平台,多节点实例数据同步触发的方式

news/2025/2/27 14:17:43/文章来源:https://www.cnblogs.com/yunbaomengnan/p/18564148

出售短视频平台,多节点实例数据同步触发的方式
今天标题的内容,主要讲同步如何触发?内容已经圈定死,因此就不谈数据同步涉及的一致性,只谈如何触发这个动作。多节点实例触发的关键是,一旦触发,各个节点都要通知到位。那如何进行多个节点通知呢?答案就是通过广播。

本案例核心流程图

 

实现步骤

1、定义高层广播抽象接口

@FunctionalInterface
public interface DataSyncTrigger {void broadcast(Object data);
}

 

2、定义通知事件类
注: 本文会采用spring的事件监听模式实现

public class DataSyncTriggerEvent extends ApplicationEvent {/*** Create a new ApplicationEvent.** @param source the object on which the event initially occurred (never {@code null})*/public DataSyncTriggerEvent(Object source) {super(source);}
}

 

3、定义高层抽象广播的模板基类

@RequiredArgsConstructor
public abstract class BaseDataSyncTrigger implements DataSyncTrigger, ApplicationContextAware {protected ApplicationContext applicationContext;protected final DataSyncTriggerProperty dataSyncTriggerProperty;@Overridepublic void broadcast(Object data) {DataSyncTriggerEvent dataSyncTriggerEvent = new DataSyncTriggerEvent(data);applicationContext.publishEvent(dataSyncTriggerEvent);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}private Collection<DataSyncTriggerCallBack> listDataSyncTriggerCallBacks(){try {Map<String, DataSyncTriggerCallBack> dataSyncTriggerCallBackMap = applicationContext.getBeansOfType(DataSyncTriggerCallBack.class);return Collections.unmodifiableList(dataSyncTriggerCallBackMap.values().stream().collect(Collectors.toList()));} catch (BeansException e) {}return Collections.emptyList();}public void callBack(Object data){Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks = listDataSyncTriggerCallBacks();if(CollectionUtil.isNotEmpty(dataSyncTriggerCallBacks)){if(dataSyncTriggerProperty.isTriggerCallBackAsync()){callbackAsync(data, dataSyncTriggerCallBacks);}else{callbackSync(data, dataSyncTriggerCallBacks);}}}private  void callbackSync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {dataSyncTriggerCallBack.execute(data);}}private  void callbackAsync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {ThreadUtil.execAsync(()->{dataSyncTriggerCallBack.execute(data);});}}
}

 

4、定义抽象回调接口【扩展点】
当业务收到通知,可以通过该回调接口进行具体业务操作

@FunctionalInterface
public interface DataSyncTriggerCallBack {void execute(Object data);
}

 

5、定义具体广播实现类
注: 这个广播的具体实现方案就很多了,只要天生具备广播能力或者基于原来特性扩展出广播的组件都可以,比如rocketmq的广播机制、redis的pubsub机制、zookeeper的分布式协调能力、基于注册中心服务发现能力改造出来的广播能力等。本文就以redis的pubsub机制为例

Slf4j
public class RedisDataSyncTrigger extends BaseDataSyncTrigger implements CommandLineRunner {private final RedisTemplate redisTemplate;public RedisDataSyncTrigger(RedisTemplate redisTemplate, DataSyncTriggerProperty dataSyncTriggerProperty) {super(dataSyncTriggerProperty);this.redisTemplate = redisTemplate;}@EventListenerpublic void listener(DataSyncTriggerEvent dataSyncTriggerEvent){SyncDataDTO syncDataDTO = SyncDataDTO.builder().data(dataSyncTriggerEvent.getSource()).timeStamp(System.currentTimeMillis()).build();try {redisTemplate.convertAndSend(REDIS_CHANNEL_KEY, syncDataDTO);} catch (Exception e) {log.error("redis publish channel 【" + REDIS_CHANNEL_KEY + "】 fail,cause:" + e.getMessage(),e);}}@Overridepublic void run(String... args) throws Exception {doSubscribe();}@SneakyThrowsprivate void doSubscribe() {RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();RedisMessageListener redisMessageListener = applicationContext.getBean(RedisMessageListener.class);connection.subscribe(redisMessageListener,REDIS_CHANNEL_KEY.getBytes("utf-8"));log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Register listen channel : 【{}】",REDIS_CHANNEL_KEY);}
}

 

具体redis订阅监听实现

@RequiredArgsConstructor
@Slf4j
public class RedisMessageListener implements MessageListener{private final BaseDataSyncTrigger baseDataSyncTrigger;private final RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] body = message.getBody();String dataJson = StrUtil.str(body, "utf-8");if(JSONUtil.isJson(dataJson)){try {SyncDataDTO dataDTO = (SyncDataDTO) redisTemplate.getHashValueSerializer().deserialize(body);baseDataSyncTrigger.callBack(dataDTO.getData());} catch (Exception e) {log.error(e.getMessage(),e);}}else{log.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data 【{}】 is not match json format !!!",dataJson);}}
}

 

6、测试验证
a、编写业务逻辑类

@Service
@RequiredArgsConstructor
@Slf4j
public class DataService {private List<Object> dataList = new CopyOnWriteArrayList<>();private final RedisTemplate redisTemplate;private final BaseDataSyncTrigger dataSyncTrigger;public boolean add(String data){try {Long count = redisTemplate.opsForList().leftPush(RedisConstant.REDIS_LIST_KEY, data);if(count > 0){dataSyncTrigger.broadcast(data);return true;}} catch (Exception e) {log.error("add fail:" + e.getMessage(),e);}return false;}public List<Object> getDataList(){return dataList;}
}

 

b、编写业务控制器

@RestController
@RequestMapping("data")
@RequiredArgsConstructor
public class DataController {private final DataService dataService;@GetMapping("add/{data}")public String syncData(@PathVariable("data") String data){boolean isSuccess = dataService.add(data);return isSuccess ? "success" : "fail";}@GetMapping("list")public List<Object> listData(){return dataService.getDataList();}
}

 

c、编写业务回调类

@Component
@RequiredArgsConstructor
@Slf4j
public class LocalListDataSyncTriggerCallBack implements DataSyncTriggerCallBack {private final DataService dataService;@Overridepublic void execute(Object data) {dataService.getDataList().add(data);log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sync data:-->{}",data);}
}

 

d、小细节

注: 当项目重启时,本地存储容器是没内容的,因此需要在项目重启时,写一个钩子,从其他缓存介质将数据刷到本地存储中

@Component
@RequiredArgsConstructor
@Slf4j
public class DataInitTask implements CommandLineRunner {private final RedisTemplate redisTemplate;private final DataService dataService;@Overridepublic void run(String... args) throws Exception {List redisDataList = redisTemplate.opsForList().range(RedisConstant.REDIS_LIST_KEY, 0, -1);if(CollectionUtil.isNotEmpty(redisDataList)){dataService.getDataList().addAll(redisDataList);log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Loaded data from redis finished!!!");}}}

 

e、测试

从一个节点(示例:54860端口)添加数据,如图

 观察其他节点(示例:59829端口)本地存储是否接收到数据

 

从图可以发现已经收到数据,同时我们观察控制台

可以看出业务回调已经触发

 

以上就是出售短视频平台,多节点实例数据同步触发的方式, 更多内容欢迎关注之后的文章

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/839744.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

读数据质量管理:数据可靠性与数据质量问题解决之道12应对与缓解

应对与缓解1. 解决 1.1. 当你发现数据出了故障,并且了解到它的初步影响时,下一步(有时甚至在根因分析之前)就是要解决这个问题,并且和利益相关方沟通,协商接下来该怎么做 1.2. 在事故解决后,无论是通过修改代码、数据或者运行环境中的哪种方式,数据团队都应该与受到影响…

ENSP学习

开启dhcp服务时,需要先进入系统视图,不然系统无法识别命令

题解 - Omkar and Password

好橙。题目 洛谷的 RMJ 挂了我就不挂洛谷了。 题目大意给定长为 \(n\) 的数列,每次可以将相邻的且不相同的数字合并(即加和)。 问最短能合并成几个数。思路简析 虽然只是道橙题,但挺有趣。 太菜了,开始被硬控 5mins 😇。 考虑如果整个数列都是相同的数的话,必然不能合并…

gofiber: 模板: 分页功能模块

一,代码 1,模块 package pageimport "fmt"type Page struct {//定义分页的structTotal int `json:"total"`TotalPage int `json:"totalpage"`CurrentPage int `json:"currentpage"`PrevPage int `json:"prevpage"`NextPage…

gorm: 安装使用

一,官方网站: https://gorm.io/ 如图: 二,安装: 从命令行安装gorm $ go get -u gorm.io/gorm go: downloading gorm.io/gorm v1.25.12 go: downloading github.com/jinzhu/now v1.1.5 go: downloading golang.org/x/text v0.19.0 go: downloading golang.org/x/text v0.20.0…

app小程序web安全—sign签名绕过

零、前言 在web界面登陆时,小程序和app传输数据时经常会碰到签名,会对请求的参数进行签名,如果自己修改了数据包就会校验失败非常麻烦。 本文编写的契机就是因为碰到了一个JeecgBoot的小程序, 想请求信息泄露的url但是显示“Sign签名校验失败”,让我非常无语,到手的漏洞飞…

Mobaxterm

软件来源: https://webra.top/app/249.html 安装包:推荐的B站视频: https://www.bilibili.com/video/BV1ze41157SP/?spm_id_from=333.337.search-card.all.click&vd_source=a171527166b8663ca801f58ca719a9b8

如何保留 wpftmp.csproj 文件用于调试

在构建 WPF 的过程,会生成 wpftmp.csproj 中间项目文件,用这个文件来辅助 XAML 构建过程。中间项目文件会在构建完成之后被删除,本文告诉大家如何保留 wpftmp.csproj 文件用于调试设置方法是添加 <GenerateTemporaryTargetAssemblyDebuggingInformation>true</Gene…

React Fiber架构的原理和工作模式

React16 => React引入Fiber架构,解决过去更新机制的问题 =》 在长时间的更新过程中,主线程会被阻塞,导致应用无法即时响应用户输入。 核心内容: 【Fiber是什么】【Fiber的底层原理】Fiber是什么? 当编写React组件并使用JSX时,React在底层会将JSX转换为元素的对象结构转…

【shell脚本】精选10款Nginx日志分析脚本

Nginx日志分析是网站运维和性能优化的重要环节。10个常用的Nginx日志分析脚本,脚本涵盖了从基本访问统计到高级性能分析的多个方面。供参考学习。 1、统计访问最多的前10个IP地址 #!/bin/bash LOG_FILE=$1 echo "统计访问最多的10个IP" awk {a[$1]++}END{print &quo…

【后门程序编程】基础1

一、后门概述 1.1 后门的发展历史 1.1.1 功能上发展。1.1.2 隐蔽性上的发展。1.2 后门的分类1.2.1 网页后门 1.2.2 线程插入后门 1.2.3 扩展后门 1.2.4 C/S后门。1.2.5 root kit。二、编写简单的cmdshell程序 2.1 管道通信技术简介 2.2 正向连接后门的编程 2.2.1 双…

LEAD:用于无源通用域自适应的学习分解

LEAD:用于无源通用域自适应的学习分解 通用领域适应(UniDA)的目标是在存在协变量和标签转移的情况下进行知识转移。最近,出现了无源通用域适配(SF UniDA),可以在不访问源数据的情况下实现UniDA,由于数据保护政策,这往往更实用。主要的挑战在于确定协变量移位样本是否属…