利用kafka和kafka connect插件debezium实现oracle表同步

news/2024/11/13 10:14:20/文章来源:https://www.cnblogs.com/monkey6/p/18380989

1.kafka安装

1.1.java安装

openjdk下载,建议使用17,至少应该高于版本11

# 进入家目录,解压下载的java包,配置环境变量
tar vxf openjdk-20.0.1_linux-x64_bin.tar.gz -C /usr/local/
vi .bash_profile
# 注意要把JAVA的目录放到$PATH之前
export JAVA_HOME=/usr/local/jdk-20
export PATH=$PATH:$JAVA_HOME/bin
source .bash_profile
java -version

1.2.zookeeper安装

zookeeper下载

安装

mkdir -p /opt/zookeeper
tar vxf apache-zookeeper-3.9.2-bin.tar.gz -C /opt/zookeeper
cd /opt/zookeeper/apache-zookeeper-3.9.2-bin
mv * ..
cd /opt/zookeeper/
rm -rf apache-zookeeper-3.9.2-bin/
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg

配置文件修改

# 默认如下,根据自己需求修改,例如修改数据目录,修改监听端口
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181

添加环境变脸

vi .bash_profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin

打开关闭

zkServer.sh start
jps #有QuorumPeerMain
zkServer.sh stop

1.3.kafka安装

kafka下载

安装

mkdir -p /opt/kafka
tar vxf kafka_2.12-3.0.0.tgz -C /opt/kafka/
cd /opt/kafka/kafka_2.12-3.0.0/
mv * ..
rm -rf kafka_2.12-3.0.0/

配置文件修改

cd /opt/kafka/config
vi server.properties
# 默认如下,根据需求修改,例如zookeeper地址
# 需要添加监听地址,不然只能本地访问
listeners=PLAINTEXT://192.168.1.101:9092
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

添加环境变量

vi .bash_profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin

打开关闭

# 打开
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
jps
# 查看日志
tail -f /opt/kafka/logs/server.log
# 关闭
kafka-server-stop.sh

2.安装debezium

dezezium下载

安装

cd  /opt/kafka/
mkdir connectors
tar vxf debezium-connector-oracle-2.7.1.Final-plugin.tar.gz -C /opt/kafka/connectors/

修改配置文件

cd /opt/kafka/config
vi connect-distributed.properties
####################################
# 这里的IP要和1.3修改的kafka的监听IP相同
bootstrap.servers=192.168.1.101:9092
# listeners也要打开
listeners=HTTP://192.168.1.101:8083
plugin.path=/opt/kafka/connectors
####################################

启动kafka connect

# 启动
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
jps
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 关闭,通过jps得到ConnectDistributed的pid
jps
kill -9 xxxxx

3.抽取oracle表

3.1.connector配置文件

vi oracle-connector-monkeydb.json
{"name": "oracle-connector","config": {"connector.class": "io.debezium.connector.oracle.OracleConnector","tasks.max": "1","database.server.name": "monkeydb","database.hostname": "192.168.1.100","database.port": "1521","database.user": "dbzuser","database.password": "dbz","database.dbname": "monkeydb","schema.history.internal.kafka.bootstrap.servers": "192.168.1.101:9092","schema.history.internal.kafka.topic": "schema-changes.monkeydb",   /*所有schema的元数据存放在这个topic中*/"include.schema.changes": "true","table.include.list": "monkey.debtest","topic.prefix": "monkeydb"   /*抽取的每个表会生成一个topic,这是topic的前缀*/}
}

3.2.数据库新建用户

CREATE USER dbzuser IDENTIFIED BY dbz;GRANT UNLIMITED TABLESPACE TO dbzuser;GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;GRANT CREATE TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
GRANT SELECT ON V_$TRANSACTION TO dbzuser;GRANT SELECT ON V_$MYSTAT TO dbzuser;
GRANT SELECT ON V_$STATNAME TO dbzuser;

3.3.打开connector

connector相关

# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 查看有哪些connector
curl -i -X GET http://192.168.1.101:8083/connectors/
# 停止connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/pause
# 回复connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/resume
# 查看状态(connector的名字在json文件中有执行)
curl -s -X GET http://192.168.1.101:8083/connectors/oracle-connector/status
# 删除
curl -i -X DELETE http://192.168.1.101:8083/connectors/oracle-connector

kafka相关

# 查看生成的topic
kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --list
# 查看表中抽出来数据(实时)
kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092  --topic monkeydb.MONKEY.DEBTEST --from-beginning

4.写入oracle表

JDBC Sink Connector Plug-in下载

和第2步下载地址相同,第2步下载的是Oracle Connector Plug-in

4.1.Sink Connector配置文件

vi sink-oracle-connector-monkeydb.json
{"name": "oracle-sink-connector","config": {"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "monkeydb.MONKEY.DEBTEST","connection.url": "jdbc:oracle:thin:@192.168.1.102:1521/orcl","connection.username": "dbzuser","connection.password": "dbz","table.name.format": "dbzuser.debtest","insert.mode": "upsert","primary.key.mode": "record_key","primary.key.fields": "ID","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "true","value.converter.schemas.enable": "true"}
}

4.2.打开connector

# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @sink-oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors

5.问题及解决方法

connector的json配置文件第一次写错了,导致history topic创建到了两外一个kafka中,改回来之后,创建connector后,connetor状态错误

"io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.\n\tat 

解决:

修改connector的json中name即可。只要修改了name,再次开启,就会重新抓取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/787518.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF中如何根据数据类型使用不同的数据模板

我们在将一个数据集合绑定到列表控件时,有时候想根据不同的数据类型,显示为不同的效果。 例如将一个文件夹集合绑定到ListBox时,系统文件夹显示为不同的效果,就可以使用模板选择器功能。 WPF提供了一个模板选择器类型DataTemplateSelector,它可以根据数据对象和数据绑定元…

Typecho Joe 导航菜单目录以及搜索关键字回显主题优化版

Joe 是 Typecho 博客中一款开源免费且非常精美的主题,但是这款主题很早就停止维护了,有些功能作者并没有开发,并且在 Typecho 更新到 1.2.1 版本后还出现了一个小 BUG Joe 主题的知名度很高,所以在原作者停止维护后很多大佬发布过自己魔改的版本,不可否认魔改后的主题 BUG…

gdb学习记录

目录如何查看地址值查看当前函数参数多线程调试只暂停指定线程,其他线程不影响总结如何查看地址值 查看下一个地址:x/x 0x12345679 以八进制显示:x/o 0x12345678 以十进制显示:x/d 0x12345678 显示更多的地址和值:x/8xw 0x12345678(显示从该地址开始的8个字(word),每个…

UE4 CharacterMovementComponent

之前在学习网络通信时,经常需要对客户端上的行为进行预测,来降低延迟带来的比较差的体验。而UE4为角色移动提供了CharacterMovementComponent这个组件,其在移动时可以发现已经实现了客户端预测。 现在可以通过对CharacterMovementComponent进行扩展,实现我们自定义的运动模…

抖音coze接入网站实现智能在线客服

在coze.cn上创建好机器人并且可以发布为API,方便接入自己的系统 一定要先创建新令牌,再点发布,并且必须勾选,发布到API 在客服系统gofly.v1kf.com中配置 【菜单】【团队设置】【大模型设置】接口地址部分,请填写扣子的机器人ID 接口密钥部分,请填写扣子的API令牌然后…

centos9(linux): 安装clamav

一,官方网站 https://www.clamav.net/ 如图:二,安装 1,开启epel仓库 [root@blog ~]# yum install -y epel-release 2,安装clamav [root@blog ~]# yum install clamav 说明:病毒库数据较大,200多M,安装时需要等待较长时间 安装完成后查看clamav的版本: [root@blog ~]# clam…

pygame手搓五子棋

代码:#coding=utf-8import os,sys,re,time import pygame import random from win32api import GetSystemMetricspygame.init() pygame.display.set_caption("五子棋")percent = 0.6 screen_width = GetSystemMetrics(0) screen_height = GetSystemMetrics(1) windo…

网站提示404错误:页面未找到怎么办

当网站提示 404 Error 或 “页面未找到” 时,这意味着客户端尝试访问的资源在服务器上不存在或无法找到。这种情况很常见,可以通过以下几个步骤来诊断和解决问题: 常见原因URL 输入错误:这是最常见的原因之一。由于人为疏忽或输入错误,导致请求的 URL 与服务器上实际存在的…

OpenCV开发笔记(七十九):基于Stitcher类实现全景图片拼接

前言一个摄像头视野不大的时候,我们希望进行两个视野合并,这样让正视的视野增大,从而可以看到更广阔的标准视野。拼接的方法分为两条路,第一条路是stitcher类,第二条思路是特征点匹配。  本篇使用stitcher匹配,进行两张图来视野合并拼接。 Demo 两张图拼接过程步骤一:…

WPF 路由事件

一、什么是路由事件? 根据MSDN定义:功能定义:路由事件是一种可以针对元素树中的多个侦听器(而不是仅针对引发该事件的对象)调用处理程序的事件。 实现定义:路由事件是由 类的实例支持的 CLR 事件, RoutedEvent 由事件 Windows Presentation Foundation (WPF) 系统处理。…

【转载】Win11优化大小核调度(无需重启)

出处:https://bbs.saraba1st.com/2b/thread-2140520-1-1.html 打开隐藏电源管理选项: 管理员模式运行cmd,分别输入: powercfg -attributes SUB_PROCESSOR 7f2f5cfa-f10c-4823-b5e1-e93ae85f46b5 -ATTRIB_HIDE powercfg -attributes SUB_PROCESSOR 93b8b6dc-0698-4d1c-9ee4-…

【Pytorch教程】迅速入门Pytorch深度学习框架

@目录前言1.tensor基础操作1.1 tensor的dtype类型1.2 创建tensor(建议写出参数名字)1.2.1 空tensor(无用数据填充)API示例1.2.2 全一tensor1.2.3 全零tensor1.2.4 随机值[0,1)的tensor1.2.5 随机值为整数且规定上下限的tensorAPI示例1.2.6 随机值均值0方差1的tensor1.2.7 从…