增量表数据通道
数据通道如下图所示
Flume 配置
概述
Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。
需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:
配置实操
- 创建Flume 配置文件在 Flume job 目录下创建
kafka_to_hdfs_db.conf
,内容如下。a1.sources = r1 a1.channels = c1 a1.sinks = k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092 a1.sources.r1.kafka.topics = topic_db a1.sources.r1.kafka.consumer.group.id = flume a1.sources.r1.setTopicHeader = true a1.sources.r1.topicHeader = topic a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = db a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
- 编写拦截器
- 创建 maven项目,内容如下
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency> </dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins> </build>
- 在
com.logan.gmall.flume.interceptor
包下创建类TimestampAndTableNameInterceptor
package com.logan.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map;public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// headerMap<String, String> headers = event.getHeaders();// bodyString log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonLog = JSONObject.parseObject(log);// 将秒的body.ts转为header.timestamp的毫秒Long ts = jsonLog.getLong("ts");String tsMills = String.valueOf(ts * 1000);String tableName = jsonLog.getString("table");// header 添加timestamp 和 tableNameheaders.put("timestamp", tsMills);headers.put("tableName", tableName);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}@Overridepublic void configure(Context context) {}}}
- 将打好的包上传到 hadoop101 的
/opt/module/flume/lib
文件夹下
- 创建 maven项目,内容如下
测试
- 确保Zookeeper、Kafka集群启动
- 启动hadoop101的Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console
- 生成模拟数据
INSERT INTO user_info ( login_name, phone_num, email, user_level, birthday, create_time ) VALUES ( ?, ?, ?, ?, ?, ? )
snhahpxnbgkf(String), 13821184391(String), snhahpxnbgkf@126.com(String), 1(String), 1992-08-14 16:26:42.0(Timestamp), 2023-06-14 16:26:42.0(Timestamp)
- 观察 HDFS 是否有新数据出现