关于Flume-Kafka-Flume的模式进行数据采集操作


       测试是否连接成功:

        在主节点flume目录下输入命令:

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
# 这个file_to_kafka.conf文件就是我们的配置文件

 

        然后在另一台节点输入命令进行消费数据:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

        然后再开一个主节点终端,在这个主节点上面在对应生成数据的文件追加数据

         

        这样就可以看见第一个主节点的终端和消费节点上面有数据变化了! 

 


         下面这个是配置拦截器,把json格式的内容进行消费,其他的进行拦截

        Flume采集数据到kafka的配置conf文件内容:

#定义组件

#1、定义source、channel、agent名称
a1.sources = r1
a1.channels = c1
#配置source

#2、描述source
a1.sources.r1.type = TAILDIR

#指定监控的组名
a1.sources.r1.filegroups = f1

#指定f1组监控的路径
a1.sources.r1.filegroups.f1 = /opt/software/applog/log/app.*

#指定断点续传的文件
a1.sources.r1.positionFile = /opt/software/flume/taildir_position.json
# 配置拦截器
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder
#配置channel

#3、描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

#指定kafka集群
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092

#指定数据写到kafka哪个topic
a1.channels.c1.kafka.topic = topic_log

#是否以Event对象的形式写入kafka
a1.channels.c1.parseAsFlumeEvent = false
#组装 

#4、关联source->channel
a1.sources.r1.channels = c1

         如果一开始测试我们flume和kafka是否能成功采集数据的时候,我们应该先把拦截器的两行配置先删除,后面再根据我们需要的内容进行拦截对应的内容。就比如:我们期望我们采集到数据是json格式的,如果不是json格式的话,我们就放弃这个数据。

     具体操作:

(1)创建Maven工程flume-interceptor

(2)创建包:com.gugu.gmall.flume.interceptor

(3)在pom.xml文件中添加如下配置

        

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

 在com.gugu.gmall.flume.utils包下创建JSONUtil类

package com.gugu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */public static boolean isJSONValidate(String log){try {JSONObject.parseObject(log);return true;}catch (JSONException e){return false;}}
}

在com.gugu.gmall.flume.interceptor包下创建ETLInterceptor类 

package com.gugu.gmall.flume.interceptor;import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}

        然后进行打包,复制到我们flume下的lib目录下就可以了!

        然后再和上面测试一样进行测试连接,是否成功把非json格式的数据拦截成功!


感谢各位的观看,创作不易,能不能给哥们来一个点赞呢!!!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!!

关注我下一篇不迷路哦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/173921.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows conan环境搭建

Windows conan环境搭建 1 安装conan1.1 安装依赖软件1.1.1 python安装1.1.2 git bash安装1.1.3 安装Visual Studio Community 20191.1.3.1 选择安装的组件1.1.3.2 选择要支持的工具以及对应的SDK 1.1.4 vscode安装 1.3 验证conan功能1.4 查看conancenter是否包含poco包1.5 查看…

3DMAX渲染AO图的三种方法

3DMAX渲染AO图的三种方法 使用Mental Ray渲染AO 1. 我为这个演示制作了一个非常简单的场景。该场景包含一个茶壶、一个盒子和一个球体。我还应用了一些材质&#xff0c;并将渲染引擎设置为Mental Ray。 2. 我还在场景中添加并定位了几个泛光灯。 3. 我选择了Mental Ra…

算法笔记-其他高效的技巧与算法(未处理完)

算法笔记-其他高效的技巧与算法 前缀和 前缀和 #include <cstdio> #include <vector> using namespace std; const int MAXN 10000; int n, a[MAXN]; int sum[MAXN] { 0 };int main() {scanf("%d", &n);for (int i 0; i < n; i) {scanf("…

weblogic控制台登陆console的时候慢

我们在搭建完weblogic后&#xff0c;登录控制台时&#xff0c;会出现等待很长时间的情况。 如下图&#xff1a;怎么解决呢 连接所属服务器,.找到jdk的安装路径 [rootlocalhost lib]# echo $JAVA_HOME/ /usr/java/jdk1.8.0_161/ 进入jre下的lib目录下的security目录&#xff0…

如何使用PHPStudy本地快速搭建网站并实现远程访问

文章目录 [toc]使用工具1. 本地搭建web网站1.1 下载phpstudy后解压并安装1.2 打开默认站点&#xff0c;测试1.3 下载静态演示站点1.4 打开站点根目录1.5 复制演示站点到站网根目录1.6 在浏览器中&#xff0c;查看演示效果。 2. 将本地web网站发布到公网2.1 安装cpolar内网穿透2…

java实现选择排序

算法步骤 首先在未排序序列中找到最小&#xff08;大&#xff09;元素&#xff0c;存放到排序序列的起始位置再从剩余未排序元素中继续寻找最小&#xff08;大&#xff09;元素&#xff0c;然后放到已排序序列的末尾。重复第二步&#xff0c;直到所有元素均排序完毕。 动图演…

MacOS下VMware Fusion配置静态IP

前言 在虚拟机安装系统后&#xff0c;默认是通过DHCP动态分配的IP&#xff0c;这会导致每次重启虚拟机ip都可能会改变&#xff0c;使用起来会有很多不便。 配置静态IP 查看主机网关地址 cat /Library/Preferences/VMware\ Fusion/vmnet8/nat.conf 查看主机DNS&#xff0c;m…

【数据结构】堆详解!(图解+源码)

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 数据结构解析 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f324;️前言&#x1f324;️堆的理论☁️二叉树的顺序存储☁️堆的概念 &#x1f324;️堆的实现…

聚势启新,KaiwuDB 生态联盟沙龙首站落地长春

11月9日&#xff0c;由 KaiwuDB 联合和润集团、致远互联主办的“KaiwuDB 生态联盟沙龙”首站活动在吉林长春顺利举办。沙龙以“聚势&#xff0c;启新”为主题&#xff0c;邀请基础软硬件、应用软件、信息安全等产业链上下游伙伴企业到场&#xff0c;共同就产业数智化趋势下的新…

【milkv】2、mpu6050驱动添加及测试

前言 本章介绍mpu6050的驱动添加以及测试。 其中驱动没有采用sdk提供的驱动&#xff0c;一方面需要配置irq&#xff0c;另一方面可以学习下如何通过ko方式添加驱动。 一、参考文章 驱动及测试文件编译流程&#xff1a; https://community.milkv.io/t/risc-v-milk-v-lsm6ds…

低代码平台是什么?具备哪些特性?

目录 一、低代码开发概念 二、低代码开发和零代码开发的区别 三、低代码和零代码的开发优势 四、低代码开发平台介绍 JNPF开发平台 1&#xff09;产品功能点 2&#xff09;产品功能模块 五、小结 低代码开发平台近两年发展迅猛&#xff0c;并迅速渗透到各个细分领域。本文简要介…

Arduino ESP8266使用AliyunIoTSDK.h连接阿里云物联网平台

文章目录 1、AliyunIoTSDK简介2、相关库安装3、阿里云创建产品&#xff0c;订阅发布4、对开源的Arduino ESP8266源代码修改5、使用阿里云点亮一个LED灯6、设备向阿里云上传温度数据7、项目源码 1、AliyunIoTSDK简介 AliyunIoTSDK是arduino的一个库&#xff0c;可以在arduino的…