使用TCP方式拉取Canal数据

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181

instance.properties

开启配置项:

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;public class CanalClientApp {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),"example",null, null);while (true) {connector.connect();connector.subscribe("test_db.users");Message message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size()>0) {for (CanalEntry.Entry entry : entries) {String tableName = entry.getHeader().getTableName();CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();HashMap<Object, Object> map = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());map.put(key, column.getValue());}System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));}}}}}}
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/106401.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

golang面试题:json包变量不加tag会怎么样?

问题 json包里使用的时候&#xff0c;结构体里的变量不加tag能不能正常转成json里的字段&#xff1f; 怎么答 如果变量首字母小写&#xff0c;则为private。无论如何不能转&#xff0c;因为取不到反射信息。如果变量首字母大写&#xff0c;则为public。 不加tag&#xff0c…

00_socket_demo

1.服务器端的代码 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h>#define PORT 8080 #define BUFFER_SIZE 1024int main() {int server_fd, new_soc…

STC8单片机PWM定时器+EC11编码器实现计数

STC8单片机PWM定时器+EC11编码器实现计数 📌相关篇《STC单片机+EC11编码器实现调节PWM输出占空比》📍《stc单片机外部中断+EC11编码器实现计数功能》🔖STC8系列支持此功能的型号: ✨从上面的相关篇中有通过通用定时器加外部中断以及常规方法实现驱动EC11编码器的方法。本…

Consul学习笔记之-初识Consul

文章目录 1. What is consul?2. Consul能干什么3. Consul的架构3.1 概念 4. Consul VS Eureka4.1 CAP4.2 对比 1. What is consul? 根据官方文档的定义&#xff1a; HashiCorp Consul is a service networking solution that enables teams to manage secure network connec…

觉非科技数据闭环系列 | BEV感知研发实践

随着自动驾驶迈向量产场景&#xff0c;“BEV感知数据闭环”已成为新一代自动驾驶量产系统的核心架构。数据成为了至关重要的技术驱动力&#xff0c;发挥数据闭环的飞轮效应或将成为下半场从1到N的胜负关键。 觉非科技在此方面已进行了大量的研究工作&#xff0c;并在实际量产项…

vuex中actions异步调用以及读取值

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 将根据segmentId查出来的合同信息托管到vuex中&#xff0c;让每个人都可以获取到合同信息 描述以及问题点 1&#xff1a;调用vuex异步函数的语法是 this.$store.dispatch(actions方法名,值) 2&#…

typeScript 学习笔记(二)

类接口 TypeScript 入门教程 (xcatliu.com) 十四.类 ① 类 类&#xff1a;定义了一件事物的抽象特点&#xff0c;包含它的属性和方法对象&#xff1a;类的实例&#xff0c;通过new生成面向对象&#xff08;OOP&#xff09;的三大特性&#xff1a;封装、继承、多态封装&…

arm栈推导

按照栈生长方向分&#xff1a;可以分为递增栈&#xff08;向高地址生长&#xff09;&#xff1b;递减栈&#xff08;向低地址生长&#xff09; 按照sp执行位置来分&#xff1a;满栈&#xff08;sp指向栈顶元素的位置&#xff09;&#xff1b;空栈&#xff08;sp指向即将入栈的…

ChatGPT AIGC总结Excel中Vlookup,lookup,xlookup的区别

在Excel的使用过程中,查找函数是非常重要的,如Vlookup,lookup,Xlookup,index+match等都是使用的最多的函数,我们让ChatGPT,AIGC用思维导图来总结一下,各查找函数的用法与区别。 AIGC ChatGPT ,BI商业智能, 可视化Tableau, PowerBI, FineReport, 数据库Mysql Oracle…

驱动测试开发

测试驱动开发介绍 测试驱动开发&#xff08;Test Driven Development,英文缩写TDD&#xff09;是极限编程的一个重要组成部分它的基本思想就是在开发功能代码之前&#xff0c;先编写测试代码也就是说在明确要开发某个功能后 首先思考如何对这个功能进行测试&#xff0c;并完成…

我们这一代人的机会是什么?

大家好&#xff0c;我是苍何&#xff0c;今天作为专业嘉宾参观了 2023 年中国国际智能产业博览会&#xff08;智博会&#xff09;&#xff0c;是一场以「智汇八方&#xff0c;博采众长」为主题的汇聚全球智能技术和产业创新的盛会&#xff0c;感触颇深&#xff0c;随着中国商业…

linux入门---用匿名管道实现一个功能

前言 在之前的学习中我们知道通信的概念和匿名管道的使用&#xff0c;那么接下来我们就要用匿名管道来实现一个功能&#xff0c;首先我们有很多的函数需要被执行&#xff0c;然后创建一些子进程通过匿名管道方式给子进程传递一些信息&#xff0c;然后子进程就根据这些信息来确…