Flume学习笔记(2)—— Flume进阶

Flume进阶

Flume 事务

事务处理流程如下:

Put

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,回滚数据

Take

  • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

Flume Agent 内部原理

ChannelSelector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel

其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)

  • ReplicatingSelector 会将同一个 Event 发往所有的 Channel
  • Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

SinkProcessor

SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

  • DefaultSinkProcessor 对应的是单个的Sink
  • LoadBalancingSinkProcessor 可以实现负载均衡的功能
  • FailoverSinkProcessor 可以实现错误恢复的功能

Flume 拓扑结构

简单串联

将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统

不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

复制和多路复用

(单 source,多 channel、sink)

Flume 支持将事件流向一个或者多个目的地

这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、

这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

聚合

业务中常用,比如说日志采集功能:

日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦

可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

Flume实战案例

复制和多路复用

需求:使用 Flume-1 监控文件变动

  1. Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS
  2. Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem

实现流程:
1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf

配置文件中需要有1个source,2个channel,2个sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理

2.创建配置文件flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

source绑定上一个agent的sink1,然后上传到hdfs

3.创建配置文件:flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

参数说明:

sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume

可以将events保存到本地文件系统

  • directory:本地文件系统保存数据的路径(注意,该路径必须已经存在才可以)

4.分别启动相应的flume进程:

nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &

nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &

nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
5.在hdfs和文件夹中都能看到相应的内容:

hdfs:

文件系统:

负载均衡和故障转移

需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

实现流程:

1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf

配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks = k1 k2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

参数说明:Flume 1.11.0 User Guide — Apache Flume

通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume

2.创建 flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = logger# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

sink输出到本地的控制台

3.创建 flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

sink输出到本地的控制台

4.执行指令:

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

5.使用nc localhost 44444发送数据

由于console2设置的优先级高于console1,因此数据由console2接收到;

接下来将console2进程kill掉,数据就由console1接收了:

聚合

需求:

hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log

hadoop103 上的 Flume-2 监控某一个端口的数据流

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

实现流程:

1.首先在三台服务器的job文件夹先创建目录group3

2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同

4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

5.分别在三台服务器上执行指令

hadoop104: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

6.在hadoop102上向日志文件中追加内容:

echo "hello" > /home/why/data/flumeDemo/test3/test3.log

在hadoop103中通过nc hadoop103 44444向44444端口发送数据;

然后在hadoop104中即可接收到数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/189543.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity 场景烘培 ——unity灯光和设置天空盒(二)

提示:文章有错误的地方,还望诸位大神指出。 文章目录 前言一、光源种类1.Directional Light(方向光,平行光)2.Point Light(点光源)3.Spotlight(聚光灯)4.Area Light(区域光&#xff…

vue项目中设置background: url() 是行内样式不生效,样式表是可以的

[TOC](vue项目中设置background: url() 是行内样式不生效,样式表是可以的) 首先:如果不是项目中普通的一个index.html中是可以的 一、原因 在Vue项目中,行内样式和样式表的编译规则是有所不同的。当你在Vue组件的行内样式中使用相对路径引用图…

【汇编】栈及栈操作的实现

文章目录 前言一、栈是什么?二、栈的特点三、栈操作四、8086cpu操作栈4.1 汇编指令4.2 汇编代码讲解问题:回答: 4.3 栈的操作4.3 push 指令和pop指令的执行过程执行入栈(push)时,栈顶超出栈空间执行出栈(pop)时,栈顶超…

初识Linux:目录的创建销毁

目录 ​编辑 提示:以下指令均在Xshell 7 中进行 零、桌面的本质 💻 扩展🎇: 一、cd指令: 1、cd - : 2、cd ~: 重命名命令:alias 二、stat指令 冷知识: 如果…

MATLAB 模型预测控制(MPC)控制入门 —— 设计并仿真 MPC 控制器

系列文章目录 文章目录 系列文章目录前言一、使用 MPC Designer 设计控制器1.1 CSTR 模型1.2 导入被控对象并定义 MPC 结构1.3 定义输入和输出通道属性1.4 配置仿真场景1.5 配置控制器水平线1.6 定义输入约束条件1.7 指定控制器调整权重1.8 消除输出超调1.9 测试控制器抗干扰能…

【迅搜01】安装运行并测试XunSearch

安装运行并测试XunSearch 这回的新系列,我们将学习到的是一个搜索引擎 迅搜 XunSearch 的使用。这个搜索引擎在 PHP 圈可能还是有一点名气的,而且也是一直在更新的,虽说现在 ElasticSearch 已经是实际上的搜索引擎霸主了,而且还有…

云端援手:智能枢纽应对数字资产挑战 ——华为云11.11应用集成管理与创新专区优惠限时购

现新客3.96元起,下单有机会抽HUAWEI P60 Art 福利仅限双十一 机会唾手可得,立即行动! 「有效管理保护应用与数据的同时实现高效互通」——华为云全力满足企业需求,推出全套「应用集成管理与创新」智能解决方案:华为云…

【用unity实现100个游戏之15】开发一个类保卫萝卜的Unity2D塔防游戏3(附项目源码)

文章目录 先看本次实现的最终效果前言绘制炮塔UI炮塔转向敌人生成炮弹旋转我们的子弹对敌人造成伤害,回收子弹自动发射子弹添加攻击间隔显示伤害字体设计通用泛型单例创建更多炮塔升级增加伤害升级缩短攻击间隔添加货币杀死敌人获取金币源码完结 先看本次实现的最终…

40 _ 初识动态规划:如何巧妙解决“双十一”购物时的凑单问题?

淘宝的“双十一”购物节有各种促销活动,比如“满200元减50元”。假设你女朋友的购物车中有n个(n>100)想买的商品,她希望从里面选几个,在凑够满减条件的前提下,让选出来的商品价格总和最大程度地接近满减条件(200元),这样就可以极大限度地“薅羊毛”。作为程序员的你…

可以免费使用的设计素材网站分享

UI设计师最怕什么? 没有创意,没有灵感,没有思路! 在哪里可以得到idea?别担心,往下看! 你知道网络有多大,你想要什么吗?今天,我想和大家分享一些宝藏网页设…

计算机毕业设计选题推荐-人才招聘微信小程序/安卓APP-项目实战

✨作者主页:IT毕设梦工厂✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

机器学习 天气识别

>- **🍨 本文为[🔗365天深度学习训练营](https://mp.weixin.qq.com/s/Nb93582M_5usednAKp_Jtw) 中的学习记录博客** >- **🍖 原作者:[K同学啊 | 接辅导、项目定制](https://mtyjkh.blog.csdn.net/)** >- **🚀…