1.场景分析
使用flume做数据传输时,可能遇到将一个数据流中的多张表分别保存到各自位置的问题,同时由于采集时间和数据实际发生时间存在差异,因此需要根据数据实际发生时间进行分区保存。
鉴于此,需要设计flume拦截器配置conf文件实现上述功能,废话不多说,直接上代码。
2.配置文件
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83_noneautotype</version></dependency></dependencies>
3.主程序
public class test implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据try {byte[] body = event.getBody();Map<String, String> headers = event.getHeaders();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullJSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题和历史数据同步)Long ts = DateFormatUtil.toTsAddTimeZone(jsonObject.getString("@timestamp"));String topicHeader = headers.get("topic");
// System.out.println("topicHeader主题名称为:"+topicHeader);String httpUserAgent = jsonObject.getString("topicHeader");//数据筛选if("clb-healthcheck".equals(httpUserAgent) || (StringUtils.isNotEmpty(httpUserAgent) && httpUserAgent.startsWith("kube-probe"))){
// System.out.println("过滤的事件为:"+event);return null;}else {jsonObject.put("timestamp",ts);headers.put("timestamp", ts.toString());if("xxx".equals(topicHeader)){headers.put("table","table1");}else if("xxxx".equals(topicHeader)){headers.put("table","table2");}else{headers.put("table","other");}event.setBody(jsonObject.toString().getBytes(StandardCharsets.UTF_8));
// System.out.println("传输的事件为:"+event);return event;}}catch (JSONException e){
// System.out.println("格式有问题的事件为:"+event);return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new test();}@Overridepublic void configure(Context context) {}}
}
4.utils-时间处理程序
/*** 日期转换工具类* 注意:SimpleDateFormat在对日期进行转换的时候,存在线程安全的问题* 建议:使用JDK1.8之后提供的日期包下的相关类完成封装*/
public class DateFormatUtil {private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");private static final DateTimeFormatter dtf1 = DateTimeFormatter.ofPattern("yyyyMMdd");private static final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyyMMddHH");private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private static final DateTimeFormatter dtfFull1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss SSS");private static final DateTimeFormatter dtfFull2 = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");public static Long toTs(String dtStr, boolean isFull) {LocalDateTime localDateTime = null;if (!isFull) {dtStr = dtStr + " 00:00:00";}localDateTime = LocalDateTime.parse(dtStr, dtfFull);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTsAddTimeZone(String dtStr) {LocalDateTime localDateTime = null;localDateTime = LocalDateTime.parse(dtStr, dtfFull2);return localDateTime.toInstant(ZoneOffset.of("+0")).toEpochMilli();}public static Long toTs1(String dtStr) {LocalDateTime localDateTime = null;localDateTime = LocalDateTime.parse(dtStr, dtfFull1);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTs(String dtStr) {return toTs(dtStr, false);}public static String toDate(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf.format(localDateTime);}public static String toYmdHms(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtfFull.format(localDateTime);}public static String toYmd(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf1.format(localDateTime);}public static String toYmdH(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf2.format(localDateTime);}public static int toYmdHInt(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return Integer.parseInt(dtf2.format(localDateTime));}public static void main(String[] args) {long t1= 1670833135997L;String s1 = toYmdH(t1);int i1 = toYmdHInt(t1);String s2 = toYmdH(1670833136000L);
// long s3 = toTsAddTimeZone("2024-01-31 05:43:46");long s4 = toTsAddTimeZone("2024-01-31T06:11:54.000Z");// System.out.println(s3);System.out.println(s4);}}
5.打包放入flume的lib目录
mv test.jar /data/flume-1.9.0/lib/
6.编写配置文件运行
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2#配置source
a1.sources.r1.type= org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.kafka.consumer.group.id= xxx
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = xxx:9092
a1.sources.r1.kafka.topics = xxx,xxxx
a1.sources.r1.kafka.consumer.auto.offset.reset = latest
a1.sources.r1.interceptors = i1
#此处需要写jar包的详细reference
a1.sources.r1.interceptors.i1.type =test$Builder#memory channel
a1.channels.c1.type = memory
#channel的event个数
a1.channels.c1.capacity = 20000
#事务event个数
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 2147483648#配置channel
#a1.channels.c1.type = file
#a1.channels.c1.checkpointDir =/data/xxx
#a1.channels.c1.dataDirs = /data/module/xxx
#a1.channels.c1.maxFileSize = 2147483648
#a1.channels.c1.capacity = 2000000
#a1.channels.c1.transactionCapacity=20000
#a1.channels.c1.keep-alive = 6
#a1.chhannels.c1.checkpointInterval=60000
#a1.minimumRequirdSpace=26214400#配置sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log1
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 360
a1.sinks.k1.hdfs.rollSize = 1174405120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#配置sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k2.hdfs.filePrefix = log2
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 360
a1.sinks.k2.hdfs.rollSize = 1174405120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = gzip#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1#k1.batchSize+k2.batchSize < c1.capacity
启动命令
nohup /data/module/flume-1.9.0/bin/flume-ng agent -Xms1024m -Xmx2048m -n a1 -c /data/module/flume-1.9.0/conf -f /data/module/flume-1.9.0/job/test.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=36001 >/dev/null 2>&1 &