Flume基础知识(十一):Flume自定义接口

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予 不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析 系统(Channel)。

3)实现步骤

(1)创建一个 maven 项目,并引入以下依赖。

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>
</dependencies>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.atguigu.interceptor;
​
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
​
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
​
/*** @author:左泽林* @date:日期:2022-01-17-时间:14:47* @message:*/
public class TypeInterceptor implements Interceptor {
​/*声明一个集合,用于存放拦截器处理后的事件*/private List<Event> addHeaderEvents ;
​//初始化public void initialize() {/*初始化*/addHeaderEvents = new ArrayList<Event>();}
​//处理单个事件public Event intercept(Event event) {
​//1.获取header & bodyMap<String, String> headers = event.getHeaders();String body = new String(event.getBody());
​//2.根据body中是否包含“atguigu”添加不同的头信息if (body.contains("atguigu")){headers.put("type","atguigu");}else{headers.put("type","other");}
​/*返回数据*/return event;}
​/*批处理事件*/public List<Event> intercept(List<Event> events) {
​//1. 清空集合addHeaderEvents.clear();
​/*遍历events*/for (Event event : events) {addHeaderEvents.add(intercept(event));}
​/*返回数据*/return events;}public void close() {
​}public static class Builder implements Interceptor.Builder{
​public Interceptor build() {return new TypeInterceptor();}
​public void configure(Context context) {}}
}
打包,上传到服务器Flume的lib下,Flume会在启动时调用

(3)编辑 flume 配置文件

为 hadoop100 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
#拦截器
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射需要对应代码中的拦截类型,这里就是atguigu、other
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop102 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分别在 hadoop100,hadoop101,hadoop102 上启动 flume 进程,注意先后顺序。

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
[root@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
[root@hadoop100 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf

(5)在 hadoop100 使用 netcat 向 localhost:44444 发送字母和数字。

(6)观察 hadoop101 和 hadoop102 打印的日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/330128.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录刷题第四十二天| 01背包问题,你该了解这些! ● 01背包问题,你该了解这些! 滚动数组 ● 416. 分割等和子集

代码随想录刷题第四十二天 今天是0-1背包问题&#xff0c;掌握了套路就不难了~~~ 0-1背包问题理论基础&#xff08;二维数组篇&#xff09;卡码网第46题 题目思路&#xff1a; 代码实现&#xff1a; input_line input() # 读取一行输入 mn input_line.split() m, n int…

Spanner on a modern columnar storage engine 中文翻译

文章目录 0. 摘要1. 存储引擎2. 存储引擎迁移的挑战2.1 可靠性、可用性和数据完整性2.2 性能和成本2.3 复杂性 3. 迁移可靠性的系统原则方法3.1 可靠性原则和自动化架构3.2 迁移方案和按周迁移3.3 客户 部署感知 调度3.4 管理可靠性、可用性和性能 4. 项目管理和驱动指标概括 0…

NetWorkX之社会网络分析

NetWorkX之社会网络分析 文章目录 NetWorkX之社会网络分析netwokx社会网络分析简介简单的案例使用networkx分析恋情关系总结 netwokx社会网络分析简介 networkx 是 Python 中一个非常强大的模块&#xff0c;用于创建、操作和研究图结构的网络。在社会网络分析中&#xff0c;它…

RK3399平台入门到精通系列讲解(实验篇)自定义工作队列的使用

🚀返回总目录 文章目录 一、自定义工作队列介绍1.1、工作队列相关结构体1.2、工作队列相关接口函数二、自定义共享队列案例2.1、Makefile2.2、驱动案例共享队列是由内核管理的全局工作队列,自定义工作队列是由内核或驱动程序创建的特定工作队列,用于处理特定的任务。 一、…

高校电力能耗监测精细化管理系统,提升能源利用效率的利器

电力是高校不可离开的重要能源&#xff0c;为学校相关管理人员提供在线用能查询统计等服务。通过对学校照明用电、空调用电等数据的采集、监控、分析&#xff0c;为学校电能管理制定合理的能源政策提供参考。同时&#xff0c;也可以培养学生的节能意识&#xff0c;学校后勤电力…

K8S-应用部署

1 应用管理解读 2 应用部署实践 资源对象管理关系 资源对象管理实践 手工方式&#xff1a; kubectl run pod名称 --imageimage地址资源清单方式: apiVersion: v1 kind: Pod metadata:labels:run: my-podname: my-pod spec:containers:- image: kubernetes-register.sswang.co…

[VUE]2-vue的基本使用

目录 vue基本使用方式 1、vue 组件 2、文本插值 3、属性绑定 4、事件绑定 5、双向绑定 6、条件渲染 7、axios 8、⭐跨域问题 &#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅…

vueRouter 配合 keep-alive 不生效的问题

文章目录 问题说明案例复现demo 结构问题复现和解决 其实这个不生效的问题根本也不算一个问题&#xff0c;犯的错和写错单词差不多&#xff0c;但是也是一时上头没发现&#xff0c;所以记录一下&#xff0c;如果遇到同样的问题&#xff0c;也希望可以帮助你早点看到这个哭笑不得…

js逆向第13例:猿人学第6题js混淆-回溯赛

文章目录 m是加密字符串怎么来的?浏览器环境检测本地运行的js代码任务六:采集全部5页的彩票数据,计算全部中奖的总金额(包含一、二、三等奖) 此题总体难度低于第5题,老规矩还是查看控制台请求地址https://match.yuanrenxue.cn/api/match/6?m=rPRDgpbV3Wd%252FyPfURQAkxK…

冠军团队!第二届百度搜索创新大赛AI方案

Datawhale干货 作者&#xff1a;李柯辰&#xff0c;Datawhale成员 写在前面 大家好&#xff0c;我们是2023年第二届百度搜索创新大赛 赛道三——AI应用设计赛道的冠军团队——“肝到凌晨”&#xff0c;很高兴能与大家分享我们这次比赛的经验&#xff0c;同时也希望以后有机会可…

【前沿技术】超级稳定的视频卡通画方案

Git clone项目到本地 git clone gitgithub.com:Artiprocher/DiffSynth-Studio.git 基本原理 使用了stable diffusion稳定扩散模型和controlnet来控制图像生成的轮廓&#xff0c;animatediff控制视频帧与帧之间的连续性&#xff0c;最后使用RIFE技术平滑整个生成后的视频。 …

NI基于PC的测量和控制系统

基于PC的测量和控制系统为工程师提供了电气和物理测量功能&#xff0c;使其能够以可自定义、准确且经济实惠的方式进行台式测量. 什么是基于PC的测量和控制系统&#xff1f; 在基于PC的测量和控制系统中&#xff0c;NI硬件产品通过USB或以太网连接到PC或笔记本电脑。这种系统具…