Flume拦截器使用-实现分表、解决零点漂移等

img

1.场景分析

使用flume做数据传输时,可能遇到将一个数据流中的多张表分别保存到各自位置的问题,同时由于采集时间和数据实际发生时间存在差异,因此需要根据数据实际发生时间进行分区保存。
鉴于此,需要设计flume拦截器配置conf文件实现上述功能,废话不多说,直接上代码。

2.配置文件

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83_noneautotype</version></dependency></dependencies>

3.主程序

public class test implements Interceptor  {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据try {byte[] body = event.getBody();Map<String, String> headers = event.getHeaders();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullJSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题和历史数据同步)Long ts = DateFormatUtil.toTsAddTimeZone(jsonObject.getString("@timestamp"));String topicHeader = headers.get("topic");
//            System.out.println("topicHeader主题名称为:"+topicHeader);String httpUserAgent = jsonObject.getString("topicHeader");//数据筛选if("clb-healthcheck".equals(httpUserAgent) || (StringUtils.isNotEmpty(httpUserAgent) && httpUserAgent.startsWith("kube-probe"))){
//                System.out.println("过滤的事件为:"+event);return null;}else {jsonObject.put("timestamp",ts);headers.put("timestamp", ts.toString());if("xxx".equals(topicHeader)){headers.put("table","table1");}else if("xxxx".equals(topicHeader)){headers.put("table","table2");}else{headers.put("table","other");}event.setBody(jsonObject.toString().getBytes(StandardCharsets.UTF_8));
//                System.out.println("传输的事件为:"+event);return event;}}catch (JSONException e){
//            System.out.println("格式有问题的事件为:"+event);return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new test();}@Overridepublic void configure(Context context) {}}
}

4.utils-时间处理程序

/*** 日期转换工具类* 注意:SimpleDateFormat在对日期进行转换的时候,存在线程安全的问题* 建议:使用JDK1.8之后提供的日期包下的相关类完成封装*/
public class DateFormatUtil {private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");private static final DateTimeFormatter dtf1 = DateTimeFormatter.ofPattern("yyyyMMdd");private static final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyyMMddHH");private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private static final DateTimeFormatter dtfFull1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss SSS");private static final DateTimeFormatter dtfFull2 = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");public static Long toTs(String dtStr, boolean isFull) {LocalDateTime localDateTime = null;if (!isFull) {dtStr = dtStr + " 00:00:00";}localDateTime = LocalDateTime.parse(dtStr, dtfFull);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTsAddTimeZone(String dtStr) {LocalDateTime localDateTime = null;localDateTime = LocalDateTime.parse(dtStr, dtfFull2);return localDateTime.toInstant(ZoneOffset.of("+0")).toEpochMilli();}public static Long toTs1(String dtStr) {LocalDateTime localDateTime = null;localDateTime = LocalDateTime.parse(dtStr, dtfFull1);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTs(String dtStr) {return toTs(dtStr, false);}public static String toDate(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf.format(localDateTime);}public static String toYmdHms(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtfFull.format(localDateTime);}public static String toYmd(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf1.format(localDateTime);}public static String toYmdH(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf2.format(localDateTime);}public static int toYmdHInt(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return Integer.parseInt(dtf2.format(localDateTime));}public static void main(String[] args) {long t1= 1670833135997L;String s1 = toYmdH(t1);int i1 = toYmdHInt(t1);String s2 = toYmdH(1670833136000L);
//        long s3 = toTsAddTimeZone("2024-01-31 05:43:46");long s4 = toTsAddTimeZone("2024-01-31T06:11:54.000Z");//        System.out.println(s3);System.out.println(s4);}}

5.打包放入flume的lib目录

mv test.jar /data/flume-1.9.0/lib/

6.编写配置文件运行

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2#配置source
a1.sources.r1.type= org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.kafka.consumer.group.id= xxx
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = xxx:9092
a1.sources.r1.kafka.topics = xxx,xxxx
a1.sources.r1.kafka.consumer.auto.offset.reset = latest
a1.sources.r1.interceptors = i1
#此处需要写jar包的详细reference
a1.sources.r1.interceptors.i1.type =test$Builder#memory channel
a1.channels.c1.type = memory
#channel的event个数
a1.channels.c1.capacity = 20000
#事务event个数
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 2147483648#配置channel
#a1.channels.c1.type = file
#a1.channels.c1.checkpointDir =/data/xxx
#a1.channels.c1.dataDirs = /data/module/xxx
#a1.channels.c1.maxFileSize = 2147483648
#a1.channels.c1.capacity = 2000000
#a1.channels.c1.transactionCapacity=20000
#a1.channels.c1.keep-alive = 6
#a1.chhannels.c1.checkpointInterval=60000
#a1.minimumRequirdSpace=26214400#配置sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log1
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 360
a1.sinks.k1.hdfs.rollSize = 1174405120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#配置sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k2.hdfs.filePrefix = log2
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 360
a1.sinks.k2.hdfs.rollSize = 1174405120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1#k1.batchSize+k2.batchSize < c1.capacity

启动命令

 nohup /data/module/flume-1.9.0/bin/flume-ng agent -Xms1024m -Xmx2048m -n a1 -c /data/module/flume-1.9.0/conf -f /data/module/flume-1.9.0/job/test.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=36001  >/dev/null 2>&1 &

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/462323.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue electron 应用在windows系统上以管理员权限打开应用

打开package.json文件&#xff0c;在build下的win增加配置 "requestedExecutionLevel": "requireAdministrator",

windows中的apache改成手动启动的操作步骤

使用cmd解决安装之后开机自启的问题 services.msc 0. 这个命令是打开本地服务找到apache的服务名称 2 .通过服务名称去查看服务的状态 sc query apacheapache3.附加上关掉和启动的命令&#xff08;换成是你的服务名称&#xff09; 关掉命令 sc stop apacheapache启动命令 …

医学三基答案在哪搜?4个大学生必备的搜题 #知识分享#职场发展

今天&#xff0c;我将分享一些受欢迎的、被大学生广泛使用的日常学习工具&#xff0c;希望能给你的学习生活带来一些便利和启发。 1.颐博咨询 这是一个网站 找题好用的在线搜题站,快考不限次搜题助手,问题截图搜题软件,练题通关考试试题大全。 2.题小聪 这是一个公众号 这…

ARP欺骗攻击利用之内网截取图片

Arp欺骗&#xff1a;目标ip的流量经过我的网卡&#xff0c;从网关出去。 Arp断网&#xff1a;目标ip的流量经过我的网卡 1. echo 1 >/proc/sys/net/ipv4/ip_forward 设置ip流量转发&#xff0c;不会出现断网现象 有时不能这样直接修改&#xff0c;还有另外一种方法 修…

猜猜谁是凶手?

目录 一、题目二、思路三、完整代码 一、题目 日本某地发生了一件谋杀案&#xff0c;警察通过排查确定杀人凶手必为4个嫌疑犯的一个。 以下为4个嫌疑犯的供词: A说&#xff1a;不是我。 B说&#xff1a;是C。 C说&#xff1a;是D。 D说&#xff1a;C在胡说 已知3个人说了…

Adb显示第3方应用的包名原理

Android早期版本实现原理请看 Android源码分析-pm命令的实现&#xff0c;列出包名pm list package&#xff0c;列出系统库pm list libraries_pm list packages-CSDN博客 Android12 对adb shell pm 实现原理做了重构&#xff1a;改成了template模式PackageManagerShellCommand …

SQL--多表查询

我们之前在讲解SQL语句的时候&#xff0c;讲解了DQL语句&#xff0c;也就是数据查询语句&#xff0c;但是之前讲解的查询都是单 表查询&#xff0c;而本章节我们要学习的则是多表查询操作&#xff0c;主要从以下几个方面进行讲解。 多表关系 项目开发中&#xff0c;在进行数据…

2024年10 个好用的AI简历工具盘点推荐

在职场竞争激烈的今天&#xff0c;一份出色的简历就像是你的秘密武器&#xff0c;能帮你在众多候选人中脱颖而出&#xff0c;赢得面试宝座。随着 ChatGPT 引领的 AI 浪潮席卷而来&#xff0c;各式各样的 AI 简历工具如雨后春笋般涌现。面对这样的背景&#xff0c;神器集今天为大…

# 流量回放工具之 Goreplay 安装及初级使用

流量回放工具之 Goreplay 安装及初级使用 文章目录 流量回放工具之 Goreplay 安装及初级使用GoReplay使用场景环境搭建Golang环境安装Goreplay 安装 Windows 下使用基本使用其它使用注意点 GoReplay GoReplay是一个开源工具&#xff0c;用于捕获和重放实时HTTP流量到测试环境中…

学习Pytorch深度学习运行AlexNet代码时关于在Pycharm中解决 “t >= 0 t < n_classes” 的断言错误方法

在学习深度学习的过程中&#xff0c;遇到了一个报错&#xff1a; 这跑的代码是AlexNet的代码实现。 运行时出现报错&#xff1a; C:\cb\pytorch_1000000000000\work\aten\src\ATen\native\cuda\Loss.cu:257: block: [0,0,0], thread: [4,0,0] Assertion t > 0 && t…

寒假作业2024.2.6

1.现有无序序列数组为23,24,12,5,33,5347&#xff0c;请使用以下排序实现编程 函数1:请使用冒泡排序实现升序排序 函数2:请使用简单选择排序实现升序排序 函数3:请使用直接插入排序实现升序排序 函数4:请使用插入排序实现升序排序 #include <stdio.h> #include <stdl…

nvm安装node后,npm无效

类似报这种问题&#xff0c;是因为去github下载npm时下载失败&#xff0c; Please visit https://github.com/npm/cli/releases/tag/v6.14.17 to download npm. 第一种方法&#xff1a;需要复制这里面的地址爬梯子去下载&#xff08;github有时不用梯子能直接下载&#xff0c;有…