出售短视频平台,多节点实例数据同步触发的方式
今天标题的内容,主要讲同步如何触发?内容已经圈定死,因此就不谈数据同步涉及的一致性,只谈如何触发这个动作。多节点实例触发的关键是,一旦触发,各个节点都要通知到位。那如何进行多个节点通知呢?答案就是通过广播。
本案例核心流程图
实现步骤
1、定义高层广播抽象接口
@FunctionalInterface public interface DataSyncTrigger {void broadcast(Object data); }
2、定义通知事件类
注: 本文会采用spring的事件监听模式实现
public class DataSyncTriggerEvent extends ApplicationEvent {/*** Create a new ApplicationEvent.** @param source the object on which the event initially occurred (never {@code null})*/public DataSyncTriggerEvent(Object source) {super(source);} }
3、定义高层抽象广播的模板基类
@RequiredArgsConstructor public abstract class BaseDataSyncTrigger implements DataSyncTrigger, ApplicationContextAware {protected ApplicationContext applicationContext;protected final DataSyncTriggerProperty dataSyncTriggerProperty;@Overridepublic void broadcast(Object data) {DataSyncTriggerEvent dataSyncTriggerEvent = new DataSyncTriggerEvent(data);applicationContext.publishEvent(dataSyncTriggerEvent);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}private Collection<DataSyncTriggerCallBack> listDataSyncTriggerCallBacks(){try {Map<String, DataSyncTriggerCallBack> dataSyncTriggerCallBackMap = applicationContext.getBeansOfType(DataSyncTriggerCallBack.class);return Collections.unmodifiableList(dataSyncTriggerCallBackMap.values().stream().collect(Collectors.toList()));} catch (BeansException e) {}return Collections.emptyList();}public void callBack(Object data){Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks = listDataSyncTriggerCallBacks();if(CollectionUtil.isNotEmpty(dataSyncTriggerCallBacks)){if(dataSyncTriggerProperty.isTriggerCallBackAsync()){callbackAsync(data, dataSyncTriggerCallBacks);}else{callbackSync(data, dataSyncTriggerCallBacks);}}}private void callbackSync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {dataSyncTriggerCallBack.execute(data);}}private void callbackAsync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {ThreadUtil.execAsync(()->{dataSyncTriggerCallBack.execute(data);});}} }
4、定义抽象回调接口【扩展点】
当业务收到通知,可以通过该回调接口进行具体业务操作
@FunctionalInterface public interface DataSyncTriggerCallBack {void execute(Object data); }
5、定义具体广播实现类
注: 这个广播的具体实现方案就很多了,只要天生具备广播能力或者基于原来特性扩展出广播的组件都可以,比如rocketmq的广播机制、redis的pubsub机制、zookeeper的分布式协调能力、基于注册中心服务发现能力改造出来的广播能力等。本文就以redis的pubsub机制为例
Slf4j public class RedisDataSyncTrigger extends BaseDataSyncTrigger implements CommandLineRunner {private final RedisTemplate redisTemplate;public RedisDataSyncTrigger(RedisTemplate redisTemplate, DataSyncTriggerProperty dataSyncTriggerProperty) {super(dataSyncTriggerProperty);this.redisTemplate = redisTemplate;}@EventListenerpublic void listener(DataSyncTriggerEvent dataSyncTriggerEvent){SyncDataDTO syncDataDTO = SyncDataDTO.builder().data(dataSyncTriggerEvent.getSource()).timeStamp(System.currentTimeMillis()).build();try {redisTemplate.convertAndSend(REDIS_CHANNEL_KEY, syncDataDTO);} catch (Exception e) {log.error("redis publish channel 【" + REDIS_CHANNEL_KEY + "】 fail,cause:" + e.getMessage(),e);}}@Overridepublic void run(String... args) throws Exception {doSubscribe();}@SneakyThrowsprivate void doSubscribe() {RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();RedisMessageListener redisMessageListener = applicationContext.getBean(RedisMessageListener.class);connection.subscribe(redisMessageListener,REDIS_CHANNEL_KEY.getBytes("utf-8"));log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Register listen channel : 【{}】",REDIS_CHANNEL_KEY);} }
具体redis订阅监听实现
@RequiredArgsConstructor @Slf4j public class RedisMessageListener implements MessageListener{private final BaseDataSyncTrigger baseDataSyncTrigger;private final RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {byte[] body = message.getBody();String dataJson = StrUtil.str(body, "utf-8");if(JSONUtil.isJson(dataJson)){try {SyncDataDTO dataDTO = (SyncDataDTO) redisTemplate.getHashValueSerializer().deserialize(body);baseDataSyncTrigger.callBack(dataDTO.getData());} catch (Exception e) {log.error(e.getMessage(),e);}}else{log.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data 【{}】 is not match json format !!!",dataJson);}} }
6、测试验证
a、编写业务逻辑类
@Service @RequiredArgsConstructor @Slf4j public class DataService {private List<Object> dataList = new CopyOnWriteArrayList<>();private final RedisTemplate redisTemplate;private final BaseDataSyncTrigger dataSyncTrigger;public boolean add(String data){try {Long count = redisTemplate.opsForList().leftPush(RedisConstant.REDIS_LIST_KEY, data);if(count > 0){dataSyncTrigger.broadcast(data);return true;}} catch (Exception e) {log.error("add fail:" + e.getMessage(),e);}return false;}public List<Object> getDataList(){return dataList;} }
b、编写业务控制器
@RestController @RequestMapping("data") @RequiredArgsConstructor public class DataController {private final DataService dataService;@GetMapping("add/{data}")public String syncData(@PathVariable("data") String data){boolean isSuccess = dataService.add(data);return isSuccess ? "success" : "fail";}@GetMapping("list")public List<Object> listData(){return dataService.getDataList();} }
c、编写业务回调类
@Component @RequiredArgsConstructor @Slf4j public class LocalListDataSyncTriggerCallBack implements DataSyncTriggerCallBack {private final DataService dataService;@Overridepublic void execute(Object data) {dataService.getDataList().add(data);log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sync data:-->{}",data);} }
d、小细节
注: 当项目重启时,本地存储容器是没内容的,因此需要在项目重启时,写一个钩子,从其他缓存介质将数据刷到本地存储中
@Component @RequiredArgsConstructor @Slf4j public class DataInitTask implements CommandLineRunner {private final RedisTemplate redisTemplate;private final DataService dataService;@Overridepublic void run(String... args) throws Exception {List redisDataList = redisTemplate.opsForList().range(RedisConstant.REDIS_LIST_KEY, 0, -1);if(CollectionUtil.isNotEmpty(redisDataList)){dataService.getDataList().addAll(redisDataList);log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Loaded data from redis finished!!!");}}}
e、测试
从一个节点(示例:54860端口)添加数据,如图
观察其他节点(示例:59829端口)本地存储是否接收到数据
从图可以发现已经收到数据,同时我们观察控制台
可以看出业务回调已经触发
以上就是出售短视频平台,多节点实例数据同步触发的方式, 更多内容欢迎关注之后的文章