11 Flume增量表数据同步配置

增量表数据通道

数据通道如下图所示
在这里插入图片描述

Flume 配置

概述

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。
需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:
在这里插入图片描述

配置实操

  1. 创建Flume 配置文件在 Flume job 目录下创建kafka_to_hdfs_db.conf,内容如下。
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092
    a1.sources.r1.kafka.topics = topic_db
    a1.sources.r1.kafka.consumer.group.id = flume
    a1.sources.r1.setTopicHeader = true
    a1.sources.r1.topicHeader = topic
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = db
    a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
  2. 编写拦截器
    • 创建 maven项目,内容如下
      <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
      </dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
      </build>
      
    • com.logan.gmall.flume.interceptor包下创建类TimestampAndTableNameInterceptor
      package com.logan.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
      import java.util.List;
      import java.util.Map;public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// headerMap<String, String> headers = event.getHeaders();// bodyString log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonLog = JSONObject.parseObject(log);// 将秒的body.ts转为header.timestamp的毫秒Long ts = jsonLog.getLong("ts");String tsMills = String.valueOf(ts * 1000);String tableName = jsonLog.getString("table");// header 添加timestamp 和 tableNameheaders.put("timestamp", tsMills);headers.put("tableName", tableName);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}@Overridepublic void configure(Context context) {}}}
    • 将打好的包上传到 hadoop101 的/opt/module/flume/lib文件夹下

测试

  1. 确保Zookeeper、Kafka集群启动
  2. 启动hadoop101的Flume
    [logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console
    
  3. 生成模拟数据

INSERT INTO user_info ( login_name, phone_num, email, user_level, birthday, create_time ) VALUES ( ?, ?, ?, ?, ?, ? )
snhahpxnbgkf(String), 13821184391(String), snhahpxnbgkf@126.com(String), 1(String), 1992-08-14 16:26:42.0(Timestamp), 2023-06-14 16:26:42.0(Timestamp)

  1. 观察 HDFS 是否有新数据出现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/286140.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

天锐绿盾行为审计监控管理系统

其工作原理主要是通过在被监控电脑上安装代理程序&#xff0c;实现对该电脑的全方位监控。同时&#xff0c;代理程序会将收集到的数据传输到服务器上&#xff0c;以便管理人员随时查看。 PC访问地址&#xff1a; www.drhchina.com 天锐绿盾监控系统具有以下功能&#xff1a; 能…

【Spring教程31】SSM框架整合实战:从零开始学习SSM整合配置,如何编写Mybatis SpringMVC JDBC Spring配置类

目录 1 流程分析2 整合配置2.1 步骤1&#xff1a;创建Maven的web项目2.2 步骤2:添加依赖2.3 步骤3:创建项目包结构2.4 步骤4:创建SpringConfig配置类2.5 步骤5:创建JdbcConfig配置类2.6 步骤6:创建MybatisConfig配置类2.7 步骤7:创建jdbc.properties2.8 步骤8:创建SpringMVC配置…

ChatGPT引领AI时代:程序员、项目经理、产品经理、架构师、Python量化交易师的翅膀

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 在当今AI时代&#xff0c;ChatGPT作为一项卓越…

PWM实现蜂鸣器

tim4.h #ifndef __TIM4_H__ #define __TIM4_H__ #include "stm32mp1xx_rcc.h" #include "stm32mp1xx_gpio.h" #include "stm32mp1xx_tim.h" void timer4_init();#endif tim4.c #include "tim4.h"void timer4_init() {// 1. 设置GPI…

这个食堂管理大招,再不知道就晚了!

随着社会的不断发展&#xff0c;餐饮行业也在不断创新和进步。在这个数字化时代&#xff0c;智能技术为各行各业提供了更高效、便捷的解决方案。 食堂作为人们日常生活中不可或缺的一部分&#xff0c;也迎来了智能化的时代。智慧收银系统不仅提高了食堂的运营效率&#xff0c;还…

PIC单片机项目(4)——基于PIC16F877A的温度光照检测装置

1.功能设计 基于PIC16F877A单片机&#xff0c;使用DS18B20进行温度测量&#xff0c;使用光敏电阻进行光照测量&#xff0c;将测量值实时显示在LCD1602屏幕上&#xff0c;同时可以设定光照阈值和温度阈值。当温度大于阈值&#xff0c;则蜂鸣器报警&#xff0c;当光照小于阈值&am…

MIPS CPU设计(基于华中科技大学计组实验)

来都来了给我点个赞收藏一下再走呗~~~&#x1f339;&#x1f339;&#x1f339;&#x1f339;&#x1f339; 本次实验要求我们掌握的是MIPS CPU设计&#xff0c;而此实验中关注的重点是在计算机中的cpu的运行模式&#xff0c;这个模式可以参考冯诺伊曼的计算机架构去理解&#…

详细教程 - 进阶版 鸿蒙harmonyOS应用 第十二节——鸿蒙操作系统中的动画效果封装:Java和TypeScript版

简介 动画效果是开发鸿蒙应用时的一个重要功能。在这篇文章中&#xff0c;我们将详细探讨如何在鸿蒙系统中使用Java和TypeScript实现动画效果的封装&#xff0c;并提供一些代码示例。 Java版动画效果的实现 在鸿蒙操作系统中&#xff0c;我们可以使用ohos.agp.animation.Anima…

全国县级行政区点位数据,Shp+excel格式

基本信息. 数据名称: 县级行政区点位 数据格式: Shpexcel 数据时间: 2021年 数据几何类型: 点 数据坐标系: WGS84坐标系 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1xzqhdm_1省代码2xzqhmc_1省名称3xzqhdm_2市代码4xzqhmc_2市代…

ZKP Commitment (2)

MIT IAP 2023 Modern Zero Knowledge Cryptography课程笔记 Lecture 5: Commitment 2 (Ying Tong Lai) Polynomial Commitment f(x) a 0 a_0 a0​ a 1 x a_1x a1​x a 2 x 2 a_2x^2 a2​x2 … \dots … a n x n a_nx^n an​xn a i a_i ai​ is secret commit(f,…

基于 Flink 构建实时数据湖的实践

本文整理自火山引擎云原生计算研发工程师王正和闵中元在本次 CommunityOverCode Asia 2023 数据湖专场中的《基于 Flink 构建实时数据湖的实践》主题演讲。 实时数据湖是现代数据架构的核心组成部分&#xff0c;随着数据湖技术的发展&#xff0c;用户对其也有了更高的需求&…

t-SNE高维数据可视化实例

t-SNE&#xff1a;高维数据分布可视化 实例1&#xff1a;自动生成一个S形状的三维曲线 实例1结果&#xff1a; 实例1完整代码&#xff1a; import matplotlib.pyplot as plt from sklearn import manifold, datasets """对S型曲线数据的降维和可视化"&q…