回顾一次容器环境的MySQL、canal、Elasticsearch数据同步
MySQL和Elasticsearch安装初始化就不展示了,版本如下:
sql表关键字段如下:
CREATE TABLE `fault_code` (`title` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,`description` varchar(512) CHARACTER SET utf8mb4 DEFAULT NULL,`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
主要是同步title和description
Elasticsearch的映射:
PUT /fault_code
{"mappings": {"properties": {"title": {"type": "text","analyzer": "ik_smart"},"description": {"type": "text","analyzer": "ik_smart"}}}
}
为canal专门创建一个db 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'testcanal';GRANT SELECT, REPLICATION SLAVE, SHOW DATABASES ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;
拉取canal镜像,由于上面MySQL、Elasticsearch版本已经确定,根据官方文档canal版本选择1.1.5
docker pull canal/canal-server:v1.1.5
提前创建 /mydata/canal/conf目录,接着来执行拷贝配置文件命令
启动canaltest,拷贝配置文件出来进行配置
docker run --name canaltest \
-p 11111:11111 \
-id canal/canal-server:v1.1.5
把配置文件拷贝出来
docker cp canaltest:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
接着对instance.properties文件进行修改:
canal.instance.mysql.slaveId=22
canal.instance.master.address=192.168.56.10:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=testcanal
canal.instance.filter.regex=test.fault_code
主要是上面几个属性
删除原来的canal服务:
docker rm -f canaltest
启动canal服务:
docker run --name canal115 \
-p 11111:11111 \
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-id canal/canal-server:v1.1.5
进入canal容器:
docker exec -it canal115 /bin/bash
到指定目录查看日志情况
cd canal-server/logs/example/tail -50 example.log
第一次查看日志发现问题:
CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]with command: show master status
重新赋予权限,并重启MySQL之后,canal就可以正常使用
拉取docker pull slpcat/canal-adapter:v1.1.5-jdk8镜像
docker pull slpcat/canal-adapter:v1.1.5-jdk8
启动docker canal adapter
docker run --name canaladapter
-p 8081:8081
-id slpcat/canal-adapter:v1.1.5-jdk8
把两个配置文件拷贝出来:
canaladapter的配置文件拷贝出来
docker cp canaladapter:/opt/canal-adapter/conf/application.yml /mydata/canaladapter/conf/es的映射文件拷贝出来
docker cp canaladapter:/opt/canal-adapter/conf/es7/faultcode.yml /mydata/canaladapter/conf/
修改两个配置文件
application.yml:
需要改的有下面几行:
canal.tcp.server.host: 127.0.0.1:11111源数据:
srcDataSources:defaultDS:url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=trueusername: canalpassword: testcanal- name: eshosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or rest# security.auth: test:123456 # only used for rest modecluster.name: elasticsearch
faultcode.yml:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: fault_code_id: idfieldMapping:title: title # 将 MySQL 的 title 字段映射到 Elasticsearch 的 title 字段description: description # 将 MySQL 的 description 字段映射到 Elasticsearch 的 description 字段sql: "select fc.id, fc.title, fc.description from fault_code fc"commitBatch: 3000
删除旧的canaladapter
启动新的canaladapter
docker run --name canaladapter \
-p 8081:8081 \
-v /mydata/canaladapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
-v /mydata/canaladapter/conf/faultcode.yml:/opt/canal-adapter/conf/es7/faultcode.yml \
-id slpcat/canal-adapter:v1.1.5-jdk8
发现启动不了adapter
这个问题应该是格式不对,修改了一下格式,再度重启canal adapter
docker restart 029aa4c345af
重启之后,canal adapter可以正常启动,但是进入容器查看canal adapter日志
docker exec -it cd0b5d5 /bin/bash
cd logs/adapter/
tail -100 adapter.log
2024-12-26 00:28:00.194 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## syncSwitch refreshed.
2024-12-26 00:28:00.194 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## start the canal client adapters.
2024-12-26 00:28:00.206 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## something goes wrong when starting up the canal client adapters:
java.lang.NullPointerException: nullat com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:51) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_282]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_282]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_282]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_282]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_282]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_282]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_282]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_282]at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]
这个经过搜索确定是格式上的错误,把canal adapter的格式又修改了一下重启:
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 192.168.56.10:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=trueusername: canalpassword: testcanalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase- name: eshosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or rest# security.auth: test:123456 # only used for rest modecluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
启动之后发现还是有问题
2024-12-26 00:50:14.161 [Thread-3] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error!
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refusedat com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198) ~[na:na]at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115) ~[na:na]at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:59) ~[na:na]at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_282]
Caused by: java.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_282]at sun.nio.ch.Net.connect(Net.java:482) ~[na:1.8.0_282]at sun.nio.ch.Net.connect(Net.java:474) ~[na:1.8.0_282]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) ~[na:1.8.0_282]at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150) ~[na:na]... 4 common frames omitted
看到错误猜测是某个连接没配对,再去看看配置,应该是canal.tcp.server.host: 192.168.56.10:11111没有配置成真实地址
改完再试一下
这一次又有新问题:
Extension instance(name: es, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter) could not be instantiated: class could not be found
这个是name需要修改一下:
- name: es7hosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or rest# security.auth: test:123456 # only used for rest modecluster.name: elasticsearch
修改完之后application.yml文件如下:
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 192.168.56.10:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:url: jdbc:mysql://192.168.56.10:3306/test?useUnicode=trueusername: canalpassword: testcanalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase- name: es7hosts: 192.168.56.10:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or rest# security.auth: test:123456 # only used for rest modecluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
docker restart cd0b5d5b98b7
进入容器查看日志,发现已经启动成功
测试新增同步:
测试修改同步:
测试删除同步: