大数据-玩转数据-Flume

一、Flume简介

    1. Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。
    1. Flume基于流式架构,容错性强,也很灵活简单。
    1. Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。

二、Flume角色

2.1、Source

用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

2.2、Channel

用于桥接Sources和Sinks,类似于一个队列。

2.3、Sink

从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

2.4、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

三、Flume传输过程

source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。

四、Flume部署及使用

4.1 采集架构

在这里插入图片描述

4.2 Flume安装

4.2.1 下载

apache-flume-1.6.0-bin.tar.gz
链接:https://pan.baidu.com/s/1ySmEEObFtKtyT7GsEldnfA
提取码:436t

4.2.2 安装

Flume的安装非常简单,只需要解压即可
tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:KaTeX parse error: Expected 'EOF', got '#' at position 6: PWD ]#̲ scp -rp apache…PWD

4.2.3 测试

采集配置:

vi netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent去采集数据
启动命令:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf   指定flume自身的配置文件所在目录
-f conf/netcat-logger.con  指定我们所描述的采集方案
-n a1  指定我们这个agent的名字

在这里插入图片描述
先要往agent采集监听的端口上发送数据,让agent有数据可采
发送命令:

安装telnet:

]# yum install telnet
]# telnet anget-hostname port (telnet localhost 44444)

测试输入输出如下图:
在这里插入图片描述
在这里插入图片描述

4.3 Flume配置

1)Flume 配置分析
在这里插入图片描述
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

测试日志:
在这里插入图片描述
配置说明如下:
在这里插入图片描述

4.4 Flume 的 ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。

1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.zgjy</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.41</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.3</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>

4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:

package 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/170911.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【阿里云】任务2-OSS对象存储教程(找我参加活动可获得京东卡奖励)

目录 前言说明第一步第二步第三步&#xff1a;开通并使用OSS传输加速三、清理第四步-提交作品第五步-提交记录到小程序 前言 本次任务是阿里云官方发出的&#xff0c;每个任务30软妹币&#xff0c;欢迎大家加入我的活动群&#xff0c;门槛很低&#xff0c;所有人都可以参加&…

实验一 Anaconda安装和使用(Python程序设计实验报告)

实验一 Anaconda安装和使用 一、实验环境 Python集成开发环境IDLE/Anaconda 二、实验目的 1&#xff0e;掌握Windows下Anaconda的安装和配置。 2. 掌握Windows下Anaconda的简单使用&#xff0c;包括IDLE、Jupyter Notebook、Spyder工具的使用。 3. 掌握使用pip管理Python扩展库…

《数据结构、算法与应用C++语言描述》-队列的应用-工厂仿真

工厂仿真 完整可编译运行代码见&#xff1a;Github::Data-Structures-Algorithms-and-Applications/_19Factory simulation/ 问题描述 一个工厂有m台机器。工厂的每项任务都需要若干道工序才能完成。每台机器都执行一道工序&#xff0c;不同的机器执行不同的工序。一台机器一…

文件上传 [SUCTF 2019]CheckIn1

打开题目 我们用cmd curl --head url 查看网站使用的是什么服务器 此题用的是openresty&#xff0c;OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台 我们上传php&#xff0c;phtml的一句话木马都显示不合法 那我们试试传a.jpg的一句话木马 显示我们一句话木马内容里面…

leetCode 25.K 个一组翻转链表

给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。k 是一个正整数&#xff0c;它的值小于 或 等于链表的长度。如果节点总数不是 k 的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。你不能只是单纯的改变节点内部的值&a…

asp.net core自定义异常过滤器并记录到Log4Net日志

1.创建异常过滤器特性 using Log4Net.Controllers; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.Filters;namespace Log4NetTest {public class CustomerExceptionFilterAttribute : Attribute, IExceptionFilter{private readonly ILogger<CustomerE…

Mysql 和 Redis 数据如何保持一致

先阐明一下Mysql和Redis的关系&#xff1a;Mysql是数据库&#xff0c;用来持久化数据&#xff0c;一定程度上保证数据的可靠性&#xff1b;Redis是用来当缓存&#xff0c;用来提升数据访问的性能。 关于如何保证Mysql和Redis中的数据一致&#xff08;即缓存一致性问题&#xf…

Apache DolphinScheduler如何完全设置东八区?

默认情况 为了兼容全世界不同时区&#xff0c;Apache DolphinScheduler 使用的是 UTC 0 时区&#xff0c;包括保存到数据库表中的数据时区&#xff0c;以及展示到页面上的时区。 如果我们想在页面上看到东八区时间&#xff0c;则需要在页面上手动选择上海时区&#xff0c;如下…

威海广泰-002111 三季报分析(20231109)

威海广泰-002111 基本情况 公司名称&#xff1a;威海广泰空港设备股份有限公司 A股简称&#xff1a;威海广泰 成立日期&#xff1a;2002-08-30 上市日期&#xff1a;2007-01-26 所属行业&#xff1a;专用设备制造业 周期性&#xff1a;0 主营业务&#xff1a;航空产业、消防产业…

POWER APPS:必填项功能

Power apps的Forms组件中&#xff0c;它的卡片有个属性为Required。作用为提醒此项为必填&#xff0c;且在submitform时检查此项是否有值&#xff0c;没有值就无法正常提交&#xff0c;且会有提示。 另外可对提交按钮的属性displaymode做一次判断&#xff0c;只允许按钮在窗体中…

2023nacos源码解读第2集——nacos-server的启动

nacos 是一个典型的server-client中间件&#xff0c;server这里安装最新的nacos-server 2.3.0-BETA版本 1.docker启动nacos-server 镜像详情参考nacos-docker项目的readme &#xff0c;很方便&#xff0c;但是官方提供的nacos-server镜像往往可能滞后&#xff0c;且不便于后续…

安卓RadioButton设置图片大小

RadioButton都不陌生&#xff0c;一般我们都会设置图片在里面&#xff0c;这就涉及一个问题&#xff0c;图片的大小。如果图片过大&#xff0c;效果很不理想。搜了很多方法&#xff0c;都不理想。无奈只能自己研究了 代码如下&#xff1a; 1&#xff0c;一个简单的 RadioButt…