NIFI实现数据库数据增量同步

说明

nifi版本:1.23.2(docker镜像)

需求背景

将数据库中的数据同步到另一个数据库中,要求对于新增的数据和历史有修改的数据进行增量同步

模拟数据

建表语句

源数据库和目标数据库结构要保持一致,这样可以避免后面单独转换

-- 创建测试表
CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

测试数据

-- 模拟数据
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据1', 20, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据2', 21, 1);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据3', 21, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据4', 18, 0);
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据5', 22, 1);

完整测试数据

配置数据库连接池

在画布空白位置鼠标右键,选择Configure

新增配置 

在弹出的界面点击+号,添加新的数据库连接池配置,如果已经有了配置该步骤可以跳过

 在弹出的界面筛选对应类型的连接池,我这里选择DBCPConnectionPool,然后点击ADD

点击刚才新添加的那一条数据右侧的小齿轮,进行连接池相关的配置

配置连接池相关属性 

主要配置以下几个内容,其他的根据情况决定是否需要修改,密码输入后是不会显示的

校验属性

校验配置是否正确,点击右上角的对钩,然后在弹出的界面点击VERIFY进行验证

 验证通过会全部显示绿色,如果某一条不通过会有提示,最后点击APPLY

(可选操作)给配置起个名字

为了方便后续使用,给连接池起个名字,要不然以后配置多了会分不清

激活连接池的配置

点击右侧的闪电标志激活配置,在新的页面中点击ENABLE激活,最后点击CLOSE关闭

已激活的配置

同理增加目标数据库的连接池配置,步骤和上面是一样的这里不再重复了,最终配置好后会有两个连接池的配置。如下:

获取数据库表数据

添加处理器:QueryDatabaseTable

点击工具栏的Processor,拖拽到画布中,筛选QueryDatabaseTable处理器,然后点击ADD添加到画布中

配置处理器:QueryDatabaseTable

双击处理器,切换到PROPERTIES选项卡,配置以下内容

Maximum-value Columns(最大值列):官方文档是这么解释的:以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每一列的最大值。使用多个列意味着列列表的顺序,并且每列的值预计比前几列的值增加得更慢。因此,使用多个列意味着列的分层结构,通常用于对表进行分区。此处理器可用于仅检索自上次检索以来添加/更新的那些行。请注意,某些 JDBC 类型(如 bit/boolean)不利于保持最大值,因此这些类型的列不应列在此属性中,并且将导致处理过程中的错误。如果未提供列,则将考虑表中的所有行,这可能会对性能产生影响。注意:为给定表使用一致的最大值列名非常重要,这样增量提取才能正常工作。
支持表达式语言:true

校验属性

给处理器起个名字,表示当前整个工作流的作用

拆分数据

添加处理器:SplitAvro

配置处理器:SplitAvro

双击处理器,切换到PROPERTIES选项卡,所有内容默认即可

数据入库

添加处理器:PutDatabaseRecord

配置处理器:PutDatabaseRecord

双击处理器,切换到PROPERTIES选项卡

新增Record Reader

配置AvroReader

点击右侧的箭头,在弹出的界面选择刚才配置的Reader,然后点击右侧的小齿轮

 在弹出的界面根据自己的需要自行配置,这里按照默认的配置即可

 激活Reader

点击右侧的闪电标志进行激活

 激活后的状态变为Enabled

其他配置

校验属性

连接所有处理器

连接处理器

连接QueryDatabaseTable和SplitAvro两个处理器,勾选For Relationships下的success

连接SplitAvro和PutDatabaseRecord两个处理器,勾选For Relationships下的split

处理SplitAvro处理器的告警

双击SplitAvro处理器,切换到RELATIONSHIPS,勾选下面的两个选项,然后点击APPLY

 处理PutDatabaseRecord处理器的告警

双击PutDatabaseRecord处理器,切换到RELATIONSHIPS,勾选下面的选项,然后点击APPLY

 完整配置

 启动所有处理器

QueryDatabaseTable处理器默认是一分钟执行一次的,可以在SCHEDULING选项卡下面进行配置,这里按照默认的时间来执行

 在画布空白位置鼠标右键选择Start启动所有的处理器

 

查看目标数据库数据

等待一分钟后查看目标数据库数据,发现源数据库的5条数据被同步到了目标数据库

 修改源数据库的数据

UPDATE sys_user SET is_deleted = 1 WHERE id = 1;
UPDATE sys_user SET is_deleted = 1 WHERE id = 4;
INSERT INTO sys_user (name, age, gender) VALUES ('测试数据6', 22, 1);

再次查看目标数据库数据

等待处理器执行后,查看目标数据库数据发现新的数据已经被同步过去

 可以看到最后一个处理器最终由8条记录流入

结束语

以上便是使用NIFI增量同步数据库数据的全过程,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/101964.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决报错之org.aspectj.lang不存在

一、IDEA在使用时,可能会遇到maven依赖包明明存在,但是build或者启动时,报找不存在。 解决办法:第一时间检查Setting->Maven-Runner红圈中的√有没有选上。 二、有时候,明明依赖包存在,但是Maven页签中…

【算法训练-链表 五】【求和】:链表相加(逆序)、链表相加II(顺序)

废话不多说,喊一句号子鼓励自己:程序员永不失业,程序员走向架构!本篇Blog的主题是【链表相加】,使用【链表】这个基本的数据结构来实现,这个高频题的站点是:CodeTop,筛选条件为&…

Mybatis复杂查询及动态SQL

文章目录 一. 较复杂的查询操作1. 参数占位符#{}和${}2. SQL注入3. like查询4. resultType与resultMap5. 多表查询5.1. 一对一表映射5.2. 一对多表映射 二. 动态SQL1. if标签2. trim标签3. where标签4. set标签5. foreach标签 本篇中使用的数据表即基础映射类都是基于上一篇博客…

蓝牙服务功能

前言 这阵子用到蓝牙比较多,想写一个专栏专门讲解蓝牙协议及其应用,本篇是第二篇文章,讲解蓝牙服务。 参考网上各大神文章,及瑞萨的文章,参考GPT,并且加入了一些本人的理解。 图片部分源自网络&#xff…

Systrace分析App性能学习笔记

学习Gracker Systrace系列文章,总结使用Systrace分析App性能的方法。推荐想通过Systrace学习Framework的同学,去看原文。 文章目录 概述Systrace使用流程Systrace 文件生成图形方式(不推荐)命令行方式 Systrace分析快捷键使用帧状态线程状态查看线程唤醒…

Qt打开及创建项目,运行程序(1)

安装之后, 1.文件->新建文件或项目 2.Application->Qt Widgets Application 3.自己设置名称和路径 4.这一步非常非常重要,要选择编译器,(MinGW是可以在Qt里用,如果想与VS交互,要选择MSVC&#xff09…

Android签名查看

查看签名文件信息 第一种方法: 1.打开cmd,执行keytool -list -v -keystore xxx.keystore,效果如下图: 第二种方法: 1.打开cmd,执行 keytool -list -v -keystore xxxx.keystore -storepass 签名文件密码&#xff0…

YOLOV7改进-空洞卷积+共享权重的Scale-Aware RFE

代码 1、先把文件复制到common.py中 2、yolo.py添加类名 3、下半部分进行添加修改 4、cfg-training:新建配置文件 加了一行,后面对于序号1 5、这里选择12层替代

QT 第五天 TCP通信与数据库

一、数据库增删改查 QT core gui sqlgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on your comp…

人工智能和大数据:跨境电商如何实现定制化营销?

在跨境电商竞争激烈的市场中,如何精准地满足消费者的需求并提供个性化的购物体验成为了商家们面临的重要挑战。幸运的是,人工智能和大数据技术的崛起为跨境电商带来了新的机遇,使得定制化营销成为可能。本文将探讨人工智能和大数据在跨境电商…

医院空调冷热源设计方案VR元宇宙模拟演练的独特之处

作为一个备受关注的技术-元宇宙,毋庸置疑的是,因为建筑本身契合了时尚、前卫、高端、虚拟、科幻、泛在、协作、互通等元素特征,因此在建筑行业更需要元宇宙,以居民建筑环境冷热源设计来说,元宇宙会打破既定的现实阻碍和…

AI绘画:StableDiffusion实操教程-斗罗大陆2-江楠楠-常服(附高清图下载)

前段时间我分享了StableDiffusion的非常完整的教程:“AI绘画:Stable Diffusion 终极宝典:从入门到精通 ” 尽管如此,还有读者反馈说,尽管已经成功安装,但生成的图片与我展示的结果相去甚远。真实感和质感之…