Flume 快速入门【概述、安装、拦截器】

文章目录

    • 什么是 Flume?
    • Flume 组成
    • Flume 安装
    • Flume 配置任务文件
      • 应用示例
      • 启动 Flume 采集任务
    • Flume 拦截器
      • 编写 Flume 拦截器
      • 拦截器应用

什么是 Flume?

Flume 是一个开源的数据采集工具,最初由 Apache 软件基金会开发和维护。它的主要目的是帮助用户将大规模数据从各种数据源(如日志文件、网络数据源、消息队列等)采集、传输和加载到数据存储系统(如 Hadoop HDFS、Apache HBase、Apache Hive 等)。

Flume 旨在处理大规模数据流,以便进行数据分析和处理。

Flume 组成

Flume (配置)主要由以下 4 个部分组成:

1. 数据源(Source): Flume 可以从多种数据源收集数据,例如日志文件、网络流、消息队列等。

2. 通道(Channel): 采集的数据被存储在通道中,等待传输到目标数据存储系统。Flume 支持多种不同类型的通道,如内存通道、文件通道和 Kafka 通道。

3. 拦截器(Interceptor): 拦截器允许用户对采集的数据进行预处理和转换,以满足特定需求。

4. 接收器(Sink): 接收器将数据传输到目标数据存储系统,如 Hadoop HDFS、HBase、Kafka 等。

Flume 通过灵活的配置,允许用户根据其数据采集需求来定义数据流的整个流程,包括数据源、通道、拦截器和接收器。

这使得 Flume 成为处理大规模数据采集和传输任务的强有力工具,构建数据管道,将分散的数据整合到中心存储或处理系统中,用于实时或者离线数据分析和报告。

Flume 安装

官方安装包下载地址:http://archive.apache.org/dist/flume

本篇博客使用的版本为:Flume-1.10.1

1. 解压

tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/

2. 配置环境变量

vim /etc/profile

文件末尾添加:

#FLUME_HOME
export FLUME_HOME=/opt/flume-1.10.1
export PATH=$PATH:$FLUME_HOME/bin

刷新环境变量:source /etc/profile

其实到这里,Flume 算是安装完了,但是为了后期使用方便,这里再调整一下配置参数。

修改日志存储与输出:

cd $FLUME_HOMEvim conf/log4j2.xml

在该文件中修改日志文件的存储目录(正文第 3 行)

    <Property name="LOG_DIR">/opt/flume-1.10.1/logs</Property>

在该文件中添加日志控制台输出方式(正文末尾)

      <AppenderRef ref="Console" />

默认只有 LogFile 日志文件的输出方式。


修改堆内存大小:

cd $FLUME_HOMEvim conf/flume-env.sh

如果是本地学习或者测试环境建议调小一点:

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

我这里调整最小为 512MB,最大 2048MB,也可以将最大和最小调整为一样的,避免进行内存交换。

Flume 配置任务文件

Flume 最主要的内容就是配置任务文件了,在文章开头提到过,主要由四部分组成:

  • 数据源(Source)

  • 通道(Channel)

  • 拦截器(Interceptor)

  • 接收器(Sink)

我们可以根据需求,进入 Flume 的官方网站,查阅各项参数如何进行配置,按照要求配置即可。

配置查阅网站:Flume 1.10.1 User Guide

在这里插入图片描述

其中给出了一个模板文件,内容如下所示:

# example.conf: A single-node Flume configuration# Name the components on this agent  声明变量名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source  配置数据源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink  配置接收器(存储源)
a1.sinks.k1.type = logger# Use a channel which buffers events in memory 配置管道参数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel  组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

根据该模板文件就可以来快速构建一个数据采集的配置文件啦。

应用示例

将 Maxwell 发送到 Kafka 消息队列中的数据采集到 HDFS 上。

# --------- 声明变量名称 ---------
a1.sources = r1
a1.sinks = k1
a1.channels = c1# --------- 配置数据源 ---------
# 指定数据源类型
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 指定连接地址
a1.sources.r1.kafka.bootstrap.servers = hadoop120:9092,hadoop121:9092,hadoop122:9092
# 指定消费者组别,防止多个消费者之间引发数据冲突
a1.sources.r1.kafka.consumer.group.id = flume1
# 指定主题名称,这里需要和 MaxWell 指定发送的主题保持一致,否则会采集不到数据
a1.sources.r1.kafka.topics = topic_db# --------- 配置接收器 ---------
# 指定存储源类型
a1.sinks.k1.type = hdfs
# 动态规划 HDFS 写入路径
a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/# 当以下值中的其中一个满足时,触发滚动操作,将数据写入到新的文件中(避免小文件过多)
# 根据运行时间判定(s),测试环境调小,开发环境30m-1h
a1.sinks.k1.hdfs.rollInterval = 10 
# 根据数据量大小判定(B),128 MB
a1.sinks.k1.hdfs.rollSize = 134217728
# 根据文件的条数判断,为 0 时表示不依据该参数
a1.sinks.k1.hdfs.rollCount = 0# 压缩文件
# 指定文件类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream
# 指定数据压缩格式
a1.sinks.k1.hdfs.codeC = gzip# --------- 配置通道 ---------
# 通道类型
a1.channels.c1.type = file
# 检查点存储路径
a1.channels.c1.checkpointDir = /opt/module/flume-1.10.1/file-channel/checkpoint1
# 用于存储日志文件的目录,多个路径用逗号分隔
a1.channels.c1.dataDirs = /opt/module/flume-1.10.1/file-channel/data1
# 指定允许等待的时间
a1.channels.c1.keep-alive = 6# --------- 组装 ---------
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume 采集任务

cd $FLUME_HOME./bin/flume-ng agent -c conf/ -f job_file -n a1

参数解析:

  • ./bin/flume-ngflume-ng 是 Flume 的执行脚本,它用于启动 Flume 的 agent 实例。
  1. agent:这告诉 Flume-ng 启动一个代理实例,也就是一个数据采集和传输任务的执行单元。

  2. -c conf/:指定 Flume 配置文件的目录。

  3. -f job_file:指定 Flume 任务配置文件的参数,其中配置文件包含了数据源、通道、接收器以及数据处理的详细信息。

  4. -n a1:指定代理实例的名称,与配置文件中的对应。

Flume 拦截器

当我们在配置文件中定义了动态参数时,例如上方示例中接收器的配置语句:

a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/

我们设想的是将表名称和年月日进行动态规划,但在未设置拦截器时,这些动态参数值都会被默认为空,如果是系统预定义的参数则为系统设定值。

如下所示:


其中 tableName 是自定义的值,Flume 系统并没有对其进行预定义,所以为空,但 %Y %m %d 这三个值系统默认为当前的日期值,所以不为空。

如果想将上述值设定为希望出现的值,此时便引出了拦截器的概念。通过对拦截器的配置,将采集的数据进行预处理和转换,以满足特定需求。

编写 Flume 拦截器

在 IDEA 中编写拦截器代码,然后打包上传,使用依赖如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.work</groupId><artifactId>intercepted</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- JSON 解析包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- flume 包,不打包该 Jar 包--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

注意,JDK 版本与平台保持一致。

拦截器实现,用于设定表头与写入日期:

package com.work.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @author Moon_coder* @version 1.0* @date 2023/10/29 17:32*/
public class TableNameAndTimestamp implements Interceptor {/*** 初始化方法*/@Overridepublic void initialize() {}/**** 处理单条数据* @param event* @return*/@Overridepublic Event intercept(Event event) {try{// 1.获取头数据Map<String, String> headers = event.getHeaders();// 2.获取数据内容,将字节数据转换为字符串String log = new String(event.getBody(), StandardCharsets.UTF_8);// 3.将字符串转换为 JSON 对象JSONObject jsonObject = JSONObject.parseObject(log);// 4.获取表名String table = jsonObject.getString("table");// 5.获取时间,我的数据是经 Maxwell 采集的,Maxwell 中的数据是 10 位时间戳,不含毫秒,将数据存入 HDFS 时 *1000String ts = jsonObject.getString("ts") + "000";// 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);}catch (JSONException e){// 如果不是 JSON 数据,则将该数据定义为脏数据return null;}return event;}/**** 批量数据处理方法* @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {// 批量处理 event 同时实现过滤功能events.removeIf(next -> intercept(next) == null);return events;}/*** 关闭方法*/@Overridepublic void close() {}// TODO 返回拦截器类public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TableNameAndTimestamp();}@Overridepublic void configure(Context context) {}}}

类名或 Jar 包名称都没有特别要求,自定义即可。

注意: 当我们在往头信息里面放东西时,需要与键名一一对应。

            // 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);

如果是自定义的值,名称与 Flume 配置文件设定的必须对应:


如果是系统预定义的值,则需要在官方网站中查询其对应的键名。例如这里出现的 %Y %m %d 这三个值,在接收器的参数定义那里即可查询到(HDFS Sink¶),如下所示:

在这里插入图片描述

这里官方给出了提示,说 对于所有与时间相关的转义序列,事件的标头中必须存在一个关键字为 “timestamp” 的标头,所以在拦截器中对头信息的时间进行操作时,对应的键名为 timestamp

拦截器应用

将打包好的拦截器 Jar 包上传到 Flume 中的 lib 目录下,然后在 Flume 任务配置文件中添加拦截器配置,如下所示:

# --------- 拦截器 ---------
# 拦截器名称
a1.sources.r1.interceptors = i1
# 编写的拦截器全类名 + $Builder 标识符
a1.sources.r1.interceptors.i1.type = com.work.flume.interceptor.TableNameAndTimestamp$Builder

再次执行上方的示例任务,可以看到配置完拦截器后,头信息已经达到了我们预期的结果。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/154894.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

主流大语言模型的技术细节

主流大语言模型的技术原理细节从预训练到微调https://mp.weixin.qq.com/s/P1enjLqH-UWNy7uaIviWRA 比较 LLaMA、ChatGLM、Falcon 等大语言模型的细节&#xff1a;tokenizer、位置编码、Layer Normalization、激活函数等。2. 大语言模型的分布式训练技术&#xff1a;数据并行、…

机器学习2:决策树--基于信息增益的ID3算法

1.决策树的简介 建立决策树的过程可以分为以下几个步骤: 计算每个特征的信息增益或信息增益比,选择最优的特征作为当前节点的划分标准。根据选择的特征将数据集划分为不同的子集。对每个子集递归执行步骤 1 和步骤 2,直到满足终止条件。构建决策树,并输出。基于信息增益的…

快速入门:使用 Spring Boot 构建 Web 应用程序

前言 本文将讨论以下主题&#xff1a; 安装 Java JDK、Gradle 或 Maven 和 Eclipse 或 IntelliJ IDEA创建一个新的 Spring Boot 项目运行 Spring Boot 应用程序编写一个简单的 Web 应用程序打包应用程序以用于生产环境 通过这些主题&#xff0c;您将能够开始使用 Spring Boo…

跨境电商卖家必备:自养号测评的显著优势与多重功能解析!

自养号测评&#xff0c;对于跨境电商卖家而言&#xff0c;是一种至关重要的运营手段。通过这种方式&#xff0c;卖家能够迅速增加产品销量、评论数量&#xff0c;提升在平台中的排名&#xff0c;进而促进产品的流量转化和订单增长。与传统机刷方式不同&#xff0c;自养号测评采…

腾讯云2023年双11活动,云服务器2核2G首年88元,领券最高省9999元!

腾讯云2023年双11大促活动正在火热进行中&#xff0c;腾讯云也是拿出了十足的诚意&#xff0c;个人企业用户均可领取代金券礼包&#xff0c;云服务器首年1.8折起&#xff0c;买1年送3个月&#xff01; 一、活动时间 腾讯云2023年双11大促活动时间比较充足&#xff0c;一直持续…

修改svc的LoadBalancer的IP引发的惨案

文章目录 背景修改externalIPs的操作api-server报错日志挽救教训 背景 k8s集群没有接外部负载均衡&#xff0c;部署istio的时候ingressgateway一直pending。 于是手动修改了这个lb svc的externalIP&#xff0c;于是k8s就崩了&#xff0c;如何崩的&#xff0c;且听我还道来。 …

探索考培系统在职业培训领域的发展前景与挑战

随着科技的不断发展&#xff0c;考培系统在职业培训领域正逐渐发挥重要作用。考培系统通过数字化、智能化的方式&#xff0c;提供了更加高效、方便的培训方式&#xff0c;给职业培训带来新的发展前景。 考培系统可以通过在线课程、视频教学等方式&#xff0c;将培训内容全面、详…

【Jenkins】新建任务FAQ

问题1. 源码管理处填入Repository URL&#xff0c;报错&#xff1a;无法连接仓库&#xff1a;Error performing git command: ls-remote -h https://github.com/txy2023/GolangLearning.git HEAD 原因&#xff1a; jenkins全局工具配置里默认没有添加git的路径&#xff0c;如果…

C++进阶语法——STL 标准模板库(上)(Standard Template Library)【学习笔记(六)】

文章目录 STL 标准模板库1、 STL简介2、STL容器的类别3、STL迭代器的类别4、STL算法的类别5、泛型编程&#xff08;generic programming&#xff09;6、C模板&#xff08;template&#xff09;6.1 函数模板&#xff08;function template&#xff09;6.2 类模板&#xff08;cla…

Java模块化应用实践之精简JRE | 京东云技术团队

导语 Java9及以后的版本引入了模块化特性&#xff0c;但是直到今天JDK21都发布了&#xff0c;依然没有被大量使用起来&#xff0c;那么这个特性就真的没啥意义了吗&#xff1f; 别忘了&#xff0c;Java本身可是把模块化做到了极致的&#xff0c;所以可以利用这个特性对JRE本身…

电商数据采集抓取封装数据、淘宝、天猫、京东等平台商品详情API接口参数详解

电商数据采集抓取数据、淘宝、天猫、京东等平台的电商数据抓取&#xff0c;网页爬虫、采集网站数据、网页数据采集软件、python爬虫、HTM网页提取、APP数据抓包、APP数据采集、一站式网站采集技术、BI数据的数据分析、数据标注等成为大数据发展中的热门技术关键词。那么电商数据…

基于物联网、大数据、云计算、人工智能等技术的智慧工地源码(Java+Spring Cloud +UniApp +MySql)

智慧工地是指利用物联网、大数据、云计算、人工智能等技术手段&#xff0c;为建筑施工现场提供智能硬件及物联网平台的解决方案&#xff0c;实现建筑工地的实时化、可视化、多元化、智慧化、便捷化。智慧工地的建设目标是实现全天候的管理监控&#xff0c;提高施工效率和质量&a…