【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

文章目录

  • 1. 组件简介
  • 2. 项目实践
    • 2.1 负载均衡
      • 2.1.1 需求
      • 2.1.2 配置
      • 2.1.3 运行
    • 2.2 故障转移
      • 2.2.1 需求
      • 2.2.2 配置
      • 2.2.3 运行

1. 组件简介

       Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor。

  • Default Sink Processor是默认的,不用配置Sink group,就是咱们现在使用的这种最普通的形式,一个Channel后面接一个Sink的形式;
  • Load balancing Sink Processor是负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力;
  • Failover Sink Processor是故障转移处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,按照Sink的优先级,默认先让优先级高的Sink来处理数据,如果这个Sink出现了故障,则用优先级低一点的Sink处理数据,可以保证数据不丢失。

2. 项目实践

2.1 负载均衡

使用Load balancing Sink Processor,即负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力。其参数为:

  • processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可;
  • processor.type:针对负载均衡的sink处理器,这里需要指定load_balance;
  • processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。
  • processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长,一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率;
  • processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒。

2.1.1 需求

采集指定端口的数据,并实现两个sink通道的负载均衡,采用轮询方式发送数据,为了展现实验效果,使用avro sink,每到一个event就写一次数据(默认是积攒接收一百个再写一次数据)。

2.1.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 apache-flume-1.9.0-bin]# cat conf/load-balancing.conf 
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type=avro 
a1.sinks.k1.hostname=192.168.152.101 
a1.sinks.k1.port=41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type=avro 
a1.sinks.k2.hostname=192.168.152.102 
a1.sinks.k2.port=41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = load_balance 
a1.sinkgroups.g1.processor.backoff = true 
a1.sinkgroups.g1.processor.selector = round_robin # 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 apache-flume-1.9.0-bin]# cat conf/load-balancing-101.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 apache-flume-1.9.0-bin]# cat conf/load-balancing-102.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.1.3 运行

先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console
[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console
apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

向指定端口发送数据,模拟输入:

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hehe
OK
haha
OK

查看HDFS中的保存的运行结果:

[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -ls -R / 
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 00:47 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data101.1687366028115.log.tmp
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data102.1687366024769.log.tmp
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data101.1687366028115.log.tmp 
haha
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data102.1687366024769.log.tmp
hehe

2.2 故障转移

使用Failover Sink Processor,即故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。其参数为:

  • processor.type:针对故障转移的sink处理器,使用failover;
  • processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被优先级比较高的sink取走;
  • processor.maxpenalty:sink发生故障之后,最大等待时间。

2.2.1 需求

实现两个sink的故障转移。

2.2.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 conf]# cat failover.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = 192.168.152.101 
a1.sinks.k1.port = 41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = 192.168.152.102 
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = failover 
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 conf]# cat failover-101.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 conf]# cat failover-102.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.2.3 运行

  1. 先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-101.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-102.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover.conf -Dflume.root.logger=INFO,console
  1. 向指定端口发送数据,模拟输入两个数据test1test2
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
  1. 查看HDFS中的保存的运行结果:

因为bigdata03的优先级高,可以看到两个数据都是由其写入。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:51 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:51 /failover/data102.1687398676525.log.tmp
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data102.1687398676525.log.tmp
test1
test2
  1. 关闭bigdata03,再输入测试数据test3
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
test3
OK
  1. 查看HDFS中的保存的运行结果:

关闭bigdata03后,数据就由优先度较低的bigdata02写入,保证数据不丢失,达到故障转移的目的,此时若再次开启bigdata03,则数据就又会由优限度更高的bigdata03传输。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:54 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:54 /failover/data101.1687398846336.log.tmp
-rw-r--r--   2 root supergroup         14 2023-06-22 09:53 /failover/data102.1687398676525.log
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data101.1687398846336.log.tmp
test3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/853.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

供应商索赔(金税数据)导入并创建凭证(ALV长篇备忘三)

情境/背景:供应商三包索赔款项源起QMS质量系统,联动金税系统完成发票开具,最终在SAP系统中创建完成财务凭证。该流程为手工操作,费时费力且效率低下容易出错。 目标/任务:把QMS供应商三包索赔业务搬上线,同SAP FI顾问梳理功能说明书&#xf…

星辰秘典:揭开Python项目的神秘密码——2048游戏

✨博主:命运之光 🌸专栏:Python星辰秘典 🐳专栏:web开发(html css js) ❤️专栏:Java经典程序设计 ☀️博主的其他文章:点击进入博主的主页 前言:你好&#x…

一步一步学OAK之七:通过OAK相机实现特征跟踪

目录 特征跟踪Setup 1: 创建文件Setup 2: 安装依赖Setup 3: 导入需要的包Setup 4: 定义FeatureTrackerDrawer类定义变量定义onTrackBar方法定义trackFeaturePath方法定义drawFeatures方法定义FeatureTrackerDrawer类的构造函数 Setup 5: 创建pipelineSetup 6: 创建节点创建相机…

GIT保存记录原理之commit对象

GIT 中提交对象非常的重要,我们通过它记录代码提交过程、进行文件保存、回退等操作,那么它是怎样帮助我们记录这些信息的呢?其实就是都保存在项目根目录的 .git 文件夹中。 新建空项目 gitDemo使用 git init初始化,在文件夹根目录…

图像去模糊:RSBlur 数据集以及模糊图像合成方法

本内容主要介绍图像去模糊数据集 RSBlur,以及逼真模糊图像合成方法。 论文:Realistic Blur Synthesis for Learning Image Deblurring 代码(官方):https://github.com/rimchang/RSBlur 1.1 背景 运动模糊是由曝光…

vue 组件简单实例及传参交互

前言:vue 可以比较灵活的使用 html的片段,并将html的片段进行数据隔离,参数也可以互相传递,组件与组件之间也可以进行数据的交互 合理的使用组件可以避免重复代码或者很方便的调用第三方组件库 vue组件 简单实例组件传参实际应用父子组件交互…

maven打包所有依赖,对外提供sdk.jar

maven打包所有依赖 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compile.source>1.8</maven.compile.source><maven.compile.target>1.8</maven.compile.target></properties><…

分布式计算模型详解:MapReduce、数据流、P2P、RPC、Agent

前言 本文隶属于专栏《大数据理论体系》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和参考文献请见大数据理论体系 思维导图 MapReduce MapReduce 是一种分布式计算模…

在 Maya、ZBrush、Substance 3D 和 UE5 中创建理发椅

今天瑞云渲染小编给大家带来Kevin J. Coulman 分享的理发椅项目背后的工作流程&#xff0c;详细介绍了如何在 Maya 和 ZBrush 中为道具建模&#xff0c;分享了制作准确材质的技巧&#xff0c;并解释了为什么选择 UE5 进行渲染。 介绍 大家好! 我的名字是Mehdi Benmansour&…

layui实现选择框搜索(下拉搜索)功能

1.可以使用官方介绍的方法&#xff0c;适用于form表单内的下拉搜索&#xff0c;外层需要使用layui-form样式&#xff0c;select标签内添加lay-search“”&#xff0c;此方法若外层不添加layui-form无法实现搜索功能&#xff0c;如下所示&#xff1a; 2.下面是另一种形式的下拉选…

设计一个高流量高并发的系统需要关注哪些点

1、设计原则 1.1、系统设计原则 在设计一个系统之前&#xff0c;我们先要有一个统一且清晰的认知&#xff1a;不要想着一下就能设计出完美的系统&#xff0c;好的系统是迭代出来的。不要复杂化&#xff0c;要先解决核心问题。但是要有先行的规划&#xff0c;对现有的问题有方…

CABAC编解码原理分析

CABAC编解码原理分析 文章目录 CABAC编解码原理分析一、二进制算数编码二、CABAC编码三、CABAC编解码与普通的二元算术编码的区别四、 CABAC编解码中各个变量的计算&#xff1a;五、 一些其他问题&#xff1a;六、 总结&#xff1a;七、参考资料 一、二进制算数编码 cabac是一…