一、假设需求:
- 某系统在MySQL某表中操作了一条数据
- 在其他系统中,实时获取最新被操作数据的数据库名、数据表名、操作类型、数据内容
应用场景:
按最近项目的一个需求来说:
1.当某子系统向报警表中新增了一条报警数据;
2.项目中各个子系统需要获取刚刚新增的报警数据;
3.如果使用传统入库查库方式:
- 大批量插入时获取最新的报警数据需要新增查询逻辑
- 频繁获取最新新增数据效率较低
二、实现思路
- 使用ApplicationListener监听数据库
- 将监听到的数据同步并发布到Redis消息队列中
- 其他系统订阅Redis消息队列频道获取新增的最新数据
三、代码实现
- 引入redis客户端依赖(SpringBoot并未集成)
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.0.0</version></dependency>
- 创建数据同步事件
public class MessageEvent extends ApplicationEvent {private CdcMessage message;/*** 初始化对象* * @param source*/public MessageEvent(Object source, CdcMessage message) {super(source);this.message = message;}@Overridepublic Object getSource() {return super.getSource();}public CdcMessage getMessage() {return this.message;}public void setMessage(CdcMessage message) {this.message = message;}
}
- 创建数据信息类CdcMessage
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CdcMessage implements Serializable {/*** 数据*/private JSONObject data;/*** 数据库类型*/private String dbType;/*** 处理类型(UPDATE DELETE CREATE)*/private String handleType;/*** 数据库名*/private String database;/*** 表名*/private String table;/*** JSON 转对象** @param clazz 转换类型* @param <T> 泛型* @return 集合结果*/public <T> List<T> toBean(Class<T> clazz) {List<T> rst = new LinkedList<>();rst.add(JSON.toJavaObject(data, clazz));return rst;}
}
- 创建数据同步方法(实现ApplicationListener数据监听接口,实现onApplicationEvent方法)
@Slf4j
@Component
public class Process implements ApplicationListener<MessageEvent> {@Overridepublic void onApplicationEvent(MessageEvent event) {CdcMessage message = event.getMessage();// 当TableName表进行新增操作时,执行数据同步操作if ("TableName".equalsIgnoreCase(message.getTable()) && "CREATE".equals(message.getHandleType())) {// 创建Jedis对象,连接到Redis服务器Jedis jedis = new Jedis("ip", 6379);// 设置认证密码jedis.auth("psssword");JSONObject messageData = message.getData();// 发布消息给消费者jedis.publish("频道名称", JSON.toJSONString(messageData ));// 关闭Jedis连接jedis.close();}}
}
四、测试
- 编写测试代码(消息订阅)
@Testpublic void test() {// 创建Jedis对象,连接到Redis服务器Jedis jedis = new Jedis("ip", 6379);// 设置认证密码jedis.auth("password");// 创建消息订阅器对象JedisPubSub jedisPubSub = new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {// 在接收到消息时执行的逻辑,可以根据实际需求进行编写System.out.println(message);}};// 订阅指定频道jedis.subscribe(jedisPubSub, "频道名称");// 关闭Jedis连接jedis.close();}
- 新增数据
- 获取消息订阅数据
五、总结
该功能主要实现方式为传统数据监听+MQ消息发布/订阅。由于该项目系统MQ只集成了Redis,所以未使用四大MQ从而使用Redis。