1.kafka安装
1.1.java安装
openjdk下载,建议使用17,至少应该高于版本11
# 进入家目录,解压下载的java包,配置环境变量
tar vxf openjdk-20.0.1_linux-x64_bin.tar.gz -C /usr/local/
vi .bash_profile
# 注意要把JAVA的目录放到$PATH之前
export JAVA_HOME=/usr/local/jdk-20
export PATH=$PATH:$JAVA_HOME/bin
source .bash_profile
java -version
1.2.zookeeper安装
zookeeper下载
安装
mkdir -p /opt/zookeeper
tar vxf apache-zookeeper-3.9.2-bin.tar.gz -C /opt/zookeeper
cd /opt/zookeeper/apache-zookeeper-3.9.2-bin
mv * ..
cd /opt/zookeeper/
rm -rf apache-zookeeper-3.9.2-bin/
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
配置文件修改
# 默认如下,根据自己需求修改,例如修改数据目录,修改监听端口
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
添加环境变脸
vi .bash_profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin
打开关闭
zkServer.sh start
jps #有QuorumPeerMain
zkServer.sh stop
1.3.kafka安装
kafka下载
安装
mkdir -p /opt/kafka
tar vxf kafka_2.12-3.0.0.tgz -C /opt/kafka/
cd /opt/kafka/kafka_2.12-3.0.0/
mv * ..
rm -rf kafka_2.12-3.0.0/
配置文件修改
cd /opt/kafka/config
vi server.properties
# 默认如下,根据需求修改,例如zookeeper地址
# 需要添加监听地址,不然只能本地访问
listeners=PLAINTEXT://192.168.1.101:9092
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
添加环境变量
vi .bash_profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
打开关闭
# 打开
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
jps
# 查看日志
tail -f /opt/kafka/logs/server.log
# 关闭
kafka-server-stop.sh
2.安装debezium
dezezium下载
安装
cd /opt/kafka/
mkdir connectors
tar vxf debezium-connector-oracle-2.7.1.Final-plugin.tar.gz -C /opt/kafka/connectors/
修改配置文件
cd /opt/kafka/config
vi connect-distributed.properties
####################################
# 这里的IP要和1.3修改的kafka的监听IP相同
bootstrap.servers=192.168.1.101:9092
# listeners也要打开
listeners=HTTP://192.168.1.101:8083
plugin.path=/opt/kafka/connectors
####################################
启动kafka connect
# 启动
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
jps
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 关闭,通过jps得到ConnectDistributed的pid
jps
kill -9 xxxxx
3.抽取oracle表
3.1.connector配置文件
vi oracle-connector-monkeydb.json
{"name": "oracle-connector","config": {"connector.class": "io.debezium.connector.oracle.OracleConnector","tasks.max": "1","database.server.name": "monkeydb","database.hostname": "192.168.1.100","database.port": "1521","database.user": "dbzuser","database.password": "dbz","database.dbname": "monkeydb","schema.history.internal.kafka.bootstrap.servers": "192.168.1.101:9092","schema.history.internal.kafka.topic": "schema-changes.monkeydb", /*所有schema的元数据存放在这个topic中*/"include.schema.changes": "true","table.include.list": "monkey.debtest","topic.prefix": "monkeydb" /*抽取的每个表会生成一个topic,这是topic的前缀*/}
}
3.2.数据库新建用户
CREATE USER dbzuser IDENTIFIED BY dbz;GRANT UNLIMITED TABLESPACE TO dbzuser;GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;GRANT CREATE TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
GRANT SELECT ON V_$TRANSACTION TO dbzuser;GRANT SELECT ON V_$MYSTAT TO dbzuser;
GRANT SELECT ON V_$STATNAME TO dbzuser;
3.3.打开connector
connector相关
# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 查看有哪些connector
curl -i -X GET http://192.168.1.101:8083/connectors/
# 停止connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/pause
# 回复connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/resume
# 查看状态(connector的名字在json文件中有执行)
curl -s -X GET http://192.168.1.101:8083/connectors/oracle-connector/status
# 删除
curl -i -X DELETE http://192.168.1.101:8083/connectors/oracle-connector
kafka相关
# 查看生成的topic
kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --list
# 查看表中抽出来数据(实时)
kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic monkeydb.MONKEY.DEBTEST --from-beginning
4.写入oracle表
JDBC Sink Connector Plug-in下载
和第2步下载地址相同,第2步下载的是Oracle Connector Plug-in
4.1.Sink Connector配置文件
vi sink-oracle-connector-monkeydb.json
{"name": "oracle-sink-connector","config": {"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "monkeydb.MONKEY.DEBTEST","connection.url": "jdbc:oracle:thin:@192.168.1.102:1521/orcl","connection.username": "dbzuser","connection.password": "dbz","table.name.format": "dbzuser.debtest","insert.mode": "upsert","primary.key.mode": "record_key","primary.key.fields": "ID","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "true","value.converter.schemas.enable": "true"}
}
4.2.打开connector
# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @sink-oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors
5.问题及解决方法
connector的json配置文件第一次写错了,导致history topic创建到了两外一个kafka中,改回来之后,创建connector后,connetor状态错误
"io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.\n\tat
解决:
修改connector的json中name即可。只要修改了name,再次开启,就会重新抓取。