Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法
}
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {//日志记录private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@Resourceprivate RedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/193704.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 如何实现职责链设计模式?什么是职责链设计模式?Python 职责链设计模式示例代码

什么是职责链&#xff08;Chain of Responsibility&#xff09;设计模式&#xff1f; 职责链&#xff08;Chain of Responsibility&#xff09;设计模式是一种行为型设计模式&#xff0c;旨在构建一个对象链&#xff0c;每个对象都有机会处理请求&#xff0c;并且可以将请求传…

IDEA插件推荐:Apipost-Helper

Hello&#xff0c;大家好&#xff0c;我是灰小猿。 今天分享一下我最近在开发过程中发现的一个比较实用的IDEA插件—Apipost-Helper IDEA虽然能够帮助开发人员更加高效地编写、调试和部署软件应用程序。但我们在编写完接口代码后肯定还需要进行接口调试等操作&#xff0c;这个…

国际知名商学院复旦大学EMBA荣登全球第8位,中文项目国内居首

2023年10月16日&#xff0c;英国《金融时报》&#xff08;FT&#xff09;发布全球EMBA项目排名。复旦大学EMBA位列全球8强&#xff0c;蝉联中文项目全球第一。学术研究、生源资历、商学院顾问委员会国际化程度、整体满意度等数个重要指标位列中文项目全球第 1位。    排名不…

apply和call在Javascript中的使用与区别

apply和call在js中的使用与区别&#xff1a; 字符串格式化&#xff1a; ${占位符} name小帅 console.log(我是${name}) //我是小帅apply: 语法&#xff1a;function.apply(thisArg, [argsArray])thisArg&#xff1a;可选参数&#xff0c;指定函数执行时的上下文&#xff08…

Bootloader——预编程流程

预编程目录 前言一、预编程步骤1.1 切换到扩展会话1.2 检查刷写前提条件整车ECU进入扩展会话(补充)1.3 停用故障码存储功能1.4 停止通信(一般报文或网络管理报文)前言 刷写过程定义了刷写前、刷写中、刷写后三个阶段。 一、预编程步骤 预编程步骤用来做刷写前的CAN网络准…

公共字段自动填充-@TableField的fill实现(2)

TheadLocal 客户端发送的每次http请求&#xff0c;在服务端都会分配新的线程。因此登录检查过滤器、controller、元数据对象处理器属于一个线程。 TheadLocal是线程的局部变量&#xff1a; TheadLocal常用方法&#xff1a; 如何在元数据对象处理器中获取当前登录用户的id&…

二十、虚拟机网络配置

1、Linux网络配置原理 我自己Linux虚拟机的IP地址是&#xff1a;192.168.159.131 vmnet8&#xff1a;192.168.159.1 无线网卡&#xff1a;192.168.159.1 2、查看网络IP和网关 查看虚拟网络编辑器和修改IP地址 如果把这个位置的子网IP换成&#xff1a;192.168.8.0的话重启虚拟机…

【开源】基于Vue.js的在线课程教学系统的设计和实现

项目编号&#xff1a; S 014 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S014&#xff0c;文末获取源码。} 项目编号&#xff1a;S014&#xff0c;文末获取源码。 目录 一、摘要1.1 系统介绍1.2 项目录屏 二、研究内容2.1 课程类型管理模块2.2 课程管理模块2…

数字IC前端学习笔记:异步复位,同步释放

相关阅读 数字IC前端https://blog.csdn.net/weixin_45791458/category_12173698.html?spm1001.2014.3001.5482 异步复位 异步复位是一种常见的复位方式&#xff0c;可以使电路进入一个可知的状态。但是不正确地使用异步复位会导致出现意想不到的错误&#xff0c;复位释放便是…

力扣hot100 两数之和 哈希表

&#x1f468;‍&#x1f3eb; 力扣 两数之和 &#x1f60b; 思路 在一个数组中如何快速找到某一个数的互补数&#xff1a;哈希表 O(1)实现⭐ AC code class Solution {public int[] twoSum(int[] nums, int target){HashMap<Integer, Integer> map new HashMap<&g…

安卓源码-工程目录

1、程序启动配置及主要的权限声明 2、 界面渲染 3、 布局用 4、 常量等 5、 gradle构建

【Linux】C文件系统详解(三)——如何理解缓冲区以及自主封装一个文件接口

文章目录 如何理解缓冲区现象概念:文件缓冲区为什么要有缓冲区缓冲区在哪里 自己封装一个简单的文件接口自主封装目标 代码关于缓冲区强制刷新内核 关于字符串格式化函数printf和scanf函数 如何理解缓冲区 以前写过一个进度条, 有一个输出缓冲区->这个缓冲区在哪里,为什么要…