Redis学习——高级篇④
- = = = = = = Redis7高级之Redis与Mysql数据双写一致性工程案例(四)= = = = = =
- 4.1 MySQL主从复制原理
- 4.2 canal 工作原理
- 4.3 mySQL->canal->redis 双写一致性
- 1.环境
- 2.配置Mysql
- 3.配置canal
- 4. Canal客户端(Java编写)
- 1.SQL脚本(随便找个数据库)
- 2.建Module
- 3.改POM
- 4.改YML
- 5.启动类
- 6.业务类
= = = = = = Redis7高级之Redis与Mysql数据双写一致性工程案例(四)= = = = = =
4.1 MySQL主从复制原理
MySQL的主从复制
- 当
Master
主服务器上的数据发生改变时,将其改变写入二进制事件日志文件中 Slave
从服务器会在一定时间间隔内对master
主服务器上的二进制日志进行探测,探测其是否发生过改变- 如果探测到
master
主服务器的二进制时间发生了改变,则开始一个I/O Thread
请求 master 二进制事件日志
- 如果探测到
- 同时
master
主服务器为每个I/O Thread
启动一个dump Thread
,用于向其发送二进制事件日志 slave
从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中slave
从服务器将启动SQL Thread
从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致- 最后
I/O Thread
和SQL Thread
将进入睡眠状态,等待下一次被唤醒
4.2 canal 工作原理
- canal 模拟 Mysql slave 的交互协议,将自己作为 Mysql slave ,向 Mysql master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave(即cancal)
- canal 解析 binary log 对象(原始为 byte 流)
4.3 mySQL->canal->redis 双写一致性
1.环境
linux的mysql、canal、redis,其中canal和redis在一台机器,mysql单独一个机器
2.配置Mysql
别忘了 开启
start net mysql80
- 查看mysql版本
- 当前的主机二进制日志——
show master status;
-
查看
SHOW VARIABLES LIKE 'log_bin';
如果是on跳过下一步!
-
开启
MySQL
的binlog
写入功能
做好备份
my.cnf
[client]
default_character_set=utf8[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
collation_server = utf8_general_ci
character_set_server = utf8
修改
-
重启
mysql
-
再次查看
SHOW VARIABLES LIKE 'log _bin'
;
-
授权
canal
连接MySQL
账号- mysql默认的用户在mysq|库的user表里
- 默认没有canal账户,此处新建+授权
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;SELECT * FROM mysql.user;
上面是教学给的 但是我报错 我这样写的 才过的 看大家自己测试的时候,也希望大家要到原因留言告诉俺一声 😀
3.配置canal
canal服务端
-
下载canal.deployer-1.1.7.tar.gz
- Releases · alibaba/canal (github.com)
-
解压 tar -zxvf canal.deployer-1.1.7.tar.gz
-
配置
-
修改 /mycanal/conf/example 路径下的 instance.properties 文件
-
-
启动
-
在 /mycanal/bin 路径下执行 ./startup.sh
必须要有 java 8 的环境
-
等我去装一下 java的环境!!!别下载21的 下个19得了 后面报错 生成不了文件!!!我服了
-
看是否成功启动(报错记录:如果canal启动后没有这两个文件,多半是内存不够了,记着扩一下内存)
-
查看server日志
- 查看样例example的日志
4. Canal客户端(Java编写)
Redis用RedisTemplate
1.SQL脚本(随便找个数据库)
CREATE TABLE `t_user` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`userName` VARCHAR(100) NOT NULL,PRIMARY KEY (`id`)) ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
2.建Module
canal_demo
3.改POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.14</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xfcy</groupId><artifactId>canal_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal_demo</name><description>canal_demo</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><junit.version>4.12</junit.version><log4j.version>1.2.17</log4j.version><lombok.version>1.16.18</lombok.version><mysql.version>5.1.47</mysql.version><druid.version>1.1.16</druid.version><mapper.version>4.1.5</mapper.version><mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version></properties><dependencies><!--canal--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency><!--SpringBoot通用依赖模块--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--swagger2--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--SpringBoot与Redis整合依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><!--SpringBoot与AOP--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><!--Mysql数据库驱动--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SpringBoot集成druid连接池--><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>${druid.version}</version></dependency><!--mybatis和springboot整合--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>${mybatis.spring.boot.version}</version></dependency><!--通用基础配置junit/devtools/test/log4j/lombok/hutool--><!--hutool--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.2.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><optional>true</optional></dependency><!--persistence--><dependency><groupId>javax.persistence</groupId><artifactId>persistence-api</artifactId><version>1.0.2</version></dependency><!--通用Mapper--><dependency><groupId>tk.mybatis</groupId><artifactId>mapper</artifactId><version>${mapper.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
4.改YML
server.port=5555# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.238.130:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false
5.启动类
这个是一个模板不用启动
package com.xfcy.canal_demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class CanalDemoApplication {public static void main(String[] args) {//SpringApplication.run(CanalDemoApplication.class, args);}}
6.业务类
RedisUtils类
package com.xfcy.canal_demo.util;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/*** @author 晓风残月Lx* @date 2023/3/25 20:28*/
public class RedisUtils {public static final String REDIS_IP_ADDR = "192.168.238.110";public static final String REDIS_pwd = "111111";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}
}
RedisCanalClientExample
package com.xfcy.canal_demo.biz;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import com.xfcy.canal_demo.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author 晓风残月Lx* @date 2023/3/25 20:23*/
public class RedisCanalClientExample {public static final Integer _60SECONDS = 60;public static final String REDIS_IP_ADDR = "192.168.238.130";private static void redisInsert(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(List<Column> columns){JSONObject jsonObject = new JSONObject();for (Column column : columns){System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(List<Entry> entrys){for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {//获取变更的row数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);}//获取变动类型EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType == EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读int batchSize = 1000;//空闲空转计数器int emptyCount = 0;System.out.println("---------------------canal init OK,开始监听mysql变化------");try {connector.connect();//connector.subscribe(".*\\..*");connector.subscribe("db01.t_user"); // 设置监听哪个表connector.rollback();int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {connector.disconnect();}}
}
java程序下connectors.subscribe 配置的过滤正则
A | B |
---|---|
全库全表 | connector.subscribe(". *\\..*") |
指定库全表 | connector.subscribe("test1..*") |
单表 | connector.subscribe("test.user") |
多规则组合使用 | connector.subscribe("test\\..*,test2.user1,test3.user2") |
关闭资源代码简写
try-with-resource释放资源(语法糖)
jdk1.7后增加了try-with-resources
,他是一个声明一个或多个资源的ty语句。一个资源作为一个对象,必须在程序结束之后关闭。try-with-resources
语句确保在语句的最后每个资源都被关闭,任何实现了java.lang AutoCloseable
和java.io.Closeable
的对象都可以使用try-with-resource
来实现异常处理Q和关闭资源。