Flume的安装部署及常见问题解决

在这里插入图片描述

1.安装地址

(1) Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

2.安装部署

注意:前提是配置好java环境

(1)将apache-flume-1.10.1-bin.tar.gz上传到linux的/opt/package/目录下
在这里插入图片描述
(2)解压apache-flume-1.10.1-bin.tar.gz到/opt/software/目录下

[zhangflink@9wmwtivvjuibcd2e package]$ tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/software/

(3)修改apache-flume-1.10.1-bin的名称为flume

[zhangflink@9wmwtivvjuibcd2e software]$ mv apache-flume-1.10.1-bin/ flume

(4)修改conf目录下的log4j2.xml配置文件,配置日志文件路径

修改日志路径

<Property name="LOG_DIR">/opt/module/flume/log</Property>

在这里插入图片描述

 <AppenderRef ref="Console" />

在这里插入图片描述

编写配置文件

官网翻译成中文的网站,可以参考这个网站进行编写配置文件:https://flume.liyifeng.org/

在这里插入图片描述

(1).Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有三个组成部分,Source、Channel、Sink。
(2).第一步:配置各个组件,根据你采集数据的需求进行选择对应的source,channels,sinks组件(直接去参考官网对应的组件功能选择即可)。
(3).第二步:连接各个组件,把采集端(Flume Sources),中间缓存(Flume Channels)和写入端(Flume Sinks)连接到一起。
(4).第三步:启动Agent。
bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

-n后面就是agent的主节点,-f 后面就是配置文件的位置,其它不变。

常用案例

监听端口配置:

# example.conf: 一个单节点的 Flume 实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   # 配置Agent a1的source r1的属性
#使用的是NetCat TCP Source,这里配的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.type = netcat       
#NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.bind = localhost    
#监听的端口
a1.sources.r1.port = 44444        # 配置Agent a1的sink k1的属性# sink使用的是Logger Sink,这个配的也是别名
a1.sinks.k1.type = logger         # 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1         

启动agent

 bin/flume-ng agent -n a1 -c conf -f conf/example.conf

在这里插入图片描述

监听文件写入HDFS里面

# file_chanel_hdfs.conf: 一个监听文件数据写入hdfs的实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   #监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000# 配置Agent a1的sink k1的属性#写入HDFS的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.0.3.141:8020/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.timeZone = Asia/Shanghai# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1        

启动后可能遇到的问题及解决方法

在这里插入图片描述

原因是普通用户没有创建文件的权限,使用root权限启动即可

sudo bin/flume-ng agent -c conf -n a1 -f conf/file_chanel_hdfs.conf

在这里插入图片描述

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;
1、agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp
为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
2、agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true
(如果客户端和flume集群时间不一致数据时间会不准确)
3、在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

我使用了第二种方法(如果实时链路中,一般数据中都会带有时间戳,要使用第一种方法,保证时间语义的准确性)。

在这里插入图片描述
在这里插入图片描述

遇到这个错误是sink配置语句中创建hdfs的路径报错

要和hadoop里面的core-site.xml 文件保持一致

<!-- 指定NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://flinkv1:8020</value>
</property>

在这里插入图片描述
此问题是由于操作hdfs的文件权限不足,修改hdfs文件权限即可。

[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - zhangflink supergroup          0 2023-11-19 11:04 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -chmod 777 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxrwxrwx   - zhangflink supergroup          0 2023-11-19 11:04 /flume

启动成功数据写入

在这里插入图片描述
在这里插入图片描述

监听文件写入kafka里面

首先创建kafka的topic

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-topics.sh --bootstrap-server flinkv1:9092 --create --partitions 1 --replication-factor 3 --topic flumeData

编写配置文件:

# file_memory_kafka.conf: 一个监听文件数据写入hdfs的实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   #监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000# 配置Agent a1的sink k1的属性#写入kafka的sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeData
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1     

消费对应topic测试数据是否写入

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-console-consumer.sh --bootstrap-server flinkv1:9092 --from-beginning --topic flumeData

监听成功
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/193951.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

labelimg报错IndexError: list index out of range

labelimg报错IndexError: list index out of range 问题&#xff1a;标签顺序不对&#xff0c;修改classes.txt文件。每次重新打开labelimg就会重置classes.txt文件&#xff0c;同时其中不正确的标签顺序&#xff0c;会导致所画的框图范围超出图片大小而报错&#xff0c;因此也…

对象与this

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 最近想再聊聊Java的对象…

大厂数仓专家实战分享:企业级埋点管理与应用

一.什么是埋点 埋点&#xff08;Event Tracking&#xff09;&#xff0c;是互联网数据采集工作中的一个俗称&#xff0c;正式应该叫事件跟踪&#xff0c;英文为 Event Tracking&#xff0c;它主要是针对特定用户行为或事件进行捕获、处理和发送的相关技术及其实施过程。 二.埋…

【C++初阶】STL详解(三)vector的介绍与使用

本专栏内容为&#xff1a;C学习专栏&#xff0c;分为初阶和进阶两部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握C。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&…

PS 颜色取样器标尺工具 基本使用讲解

上文 PS 吸管工具基本使用方法 我们讲完了 吸管工具 那么 我们继续 打开ps先 接着 我们选择这个 颜色取样器工具 选择之后 我们鼠标在图像上随便点一下 就会出现一个标记 然后 我们可以点多几个地方 边上的信息面板就会输出 点1 和 点2 甚至 多个 点3 点4 的 颜色 RGB代码 …

量化交易:建立趋势跟踪策略的五个指标

什么是趋势跟踪策略&#xff1f; 趋势跟踪策略是只需需顺势而为的策略&#xff0c;即在价格上涨时买入&#xff0c;在价格开始下跌时卖出。在趋势跟踪策略中&#xff0c;人们的目标不是预测或预测&#xff0c;而只是关注市场上的任何新兴趋势。 趋势是如何出现的&#xff1f;…

Python学习(一)基础语法

文章目录 1. 入门1.1 解释器的作用1.2 下载1.3 基础语法输入输出语法与引号注释&#xff1a;变量&#xff1a; 数据类型与四则运算数据类型四则运算数据类型的查看type()数据类型的转换int()、int()、float() 流程控制格式化输出循环与遍历逻辑运算符list遍历字典dict遍历 跳出…

Jave 定时任务:使用Timer类执行定时任务为何会发生任务阻塞?如何解决?

IDE&#xff1a;IntelliJ IDEA 2022.2.3 x64 操作系统&#xff1a;win10 x64 位 家庭版 JDK: 1.8 文章目录 一、Timer类是什么&#xff1f;二、Timer类主要由哪些部分组成&#xff1f;1.TaskQueue2. TimerThread 三、示例代码分析四、自定义TimerTask为什么会发生任务相互阻塞的…

RVC从入门到......

RVC变声器官方教程&#xff1a;10分钟克隆你的声音&#xff01;一键训练&#xff0c;低配显卡用户福音&#xff01;_哔哩哔哩_bilibili配音&#xff1a;AI逍遥散人&#xff08;已授权&#xff09;关注UP主并私信"RVC"&#xff08;三个字母&#xff09;自动获取一键训…

飞鼠异地组网工具实战之访问k8s集群内部服务

飞鼠异地组网工具实战之访问k8s集群内部服务 一、飞鼠异地组网工具介绍1.1 飞鼠工具简介1.2 飞鼠工具官网 二、本次实践介绍2.1 本次实践场景描述2.2 本次实践前提2.3 本次实践环境规划 三、检查本地k8s集群环境3.1 检查k8s各节点状态3.2 检查k8s版本3.3 检查k8s系统pod状态 四…

unordered_map,unordered_set模拟实现

目录 一 . 底层结构--哈希 1.直接定址法 2. 除留余数法 哈希桶 3. 一些定义 二 . 模拟实现哈希表 1.哈希表框架 ​编辑 2.插入 3.查找 4 . 删除 5.解决使用问题 6.完整代码 三 .实现unordered_map, unordered_set 1. 初步实现unordered_map, unordered_set 2.…

JavaspringbootMYSQL基于移动端的团购网站26449-计算机毕业设计项目选题推荐(附源码)

目 录 摘要 1 绪论 1.1 选题背景 1.2选题目的及意义 1.3springboot框架介绍 2 基于移动端的团购网站系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 数据流程 3.3.2 业务流程 2.3 系统功能分析 2.3.1 功能性分析 2.3.2 非功能性分析 2.4 系统用例分析 2.5本章…