【大数据之Flume】七、Flume进阶之自定义Sink

(1)概述:
  Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

  Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

  Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

  自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink

  MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

  实现相应方法:
    configure(Context context)//初始化 context(读取配置文件内容)
    process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

  适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。

(2)需求:
  使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

(3)分析:
在这里插入图片描述

步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>

(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。

package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event  = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}

(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。

# Name the components on this agent 
#组件声明
a1.sources = r1
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port =44444# Describe the sink 
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console

(5)开启端口并发送消息

nc localhost 44444

(6)结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/60918.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+LayUI的宿舍管理系统 001

项目简介 源码来源于网络&#xff0c;项目文档仅用于参考&#xff0c;请自行二次完善哦。 系统以MySQL 8.0.23为数据库&#xff0c;在Spring Boot SpringMVC MyBatis Layui框架下基于B/S架构设计开发而成。 系统中的用户分为三类&#xff0c;分别为学生、宿管、后勤。这三…

Kubernetes集群部署上篇(安装部署,但是集群网络未部署)

第四阶段 时 间&#xff1a;2023年8月9日 参加人&#xff1a;全班人员 内 容&#xff1a; Kubernetes集群部署上篇 目录 一、Kubernetes部署方式 &#xff08;一&#xff09;minikube &#xff08;二&#xff09;二进制包 &#xff08;三&#xff09;Kubeadm Kubea…

(八)穿越多媒体奇境:探索Streamlit的图像、音频与视频魔法

文章目录 1 前言2 st.image&#xff1a;嵌入图像内容2.1 图像展示与描述2.2 调整图像尺寸2.3 使用本地文件或URL 3 st.audio&#xff1a;嵌入音频内容3.1 播放音频文件3.2 生成音频数据播放 4 st.video&#xff1a;嵌入视频内容4.1 播放视频文件4.2 嵌入在线视频 5 结语&#x…

E. Power of Points - 思维

分析&#xff1a; 题意本质就是找点在数组中任意一个位置时和所有的端点之间的距离和&#xff0c;但是直接暴力会超时&#xff0c;可以对数组排个序&#xff0c;设当前遍历的是xi&#xff0c;那么此时求的到各端点的距离就是j从1 ~ i - 1的所有端点与xi的距离之和&#xff0c;也…

章节4:JavaScript操作Cookie

章节4&#xff1a;JavaScript操作Cookie 直接利用Cookie登录 JavaScript语法 获取&#xff1a;document.cookie; 设置&#xff1a;document.cookie“usernamexx”; 删除&#xff1a;document.cookie“usernamexx;expiresThu, 01 Jan 1970 00:00:00 GMT”;

《爬虫》爬取页面图片并保存

爬虫 前言代码效果 简单的爬取图片 前言 这几天打算整理与迁移一下博客。因为 CSDN 的 Markdown 编辑器很好用 &#xff0c;所以全部文章与相关图片都保存在 CSDN。而且 CSDN 支持一键导出自己的文章为 markdown 文件。但导出的文件中图片的连接依旧是 url 连接。为了方便将图…

Neo4j笔记-数据迁移(导出/导入)

这里先说明以下几点&#xff1a; Neo4j在4.0下版本默认的库名是&#xff1a;graph.db Neo4j在4.0上版本默认的库名是&#xff1a;neo4j.db 不管是Neo4j&#xff0c;还是Neo4j Desktop&#xff0c;都会在bin目录下有neo4j、neo4j-admin软件。在conf目录下&#xff0c;有neo4j.…

【计算机网络】概述及数据链路层

每一层只依赖于下一层所提供的服务&#xff0c;使得各层之间相互独立、灵活性好&#xff0c;已于实现和维护&#xff0c;并能促进标准化工作。 应用层&#xff1a;通过应用进程间的交互完成特定的网络应用&#xff0c;HTTP、FTP、DNS&#xff0c;应用层交互的数据单元被称为报…

《合成孔径雷达成像算法与实现》Figure3.7

代码复现如下&#xff1a; clc clear all close all%参数设置 TBP 100; %时间带宽积 T 10e-6; %脉冲持续时间%参数计算 B TBP/T; …

linux系统虚拟主机开启支持SourceGuardian(sg11)加密组件

注意&#xff1a;sg11我司只支持linux系统虚拟主机自主安装。支持php5.3及以上版本。 1、登陆主机控制面板&#xff0c;找到【远程文件下载】这个功能。 2、远程下载文件填写http://download.myhostadmin.net/vps/sg11_for_linux.zip 下载保存的路径填写/others/ 3、点击控制…

剑指 Offer 15. 二进制中1的个数

题目描述 编写一个函数&#xff0c;输入是一个无符号整数&#xff08;以二进制串的形式&#xff09;&#xff0c;返回其二进制表达式中数字位数为 ‘1’ 的个数&#xff08;也被称为 汉明重量).&#xff09;。 提示&#xff1a; 请注意&#xff0c;在某些语言&#xff08;如…

研究:ChatGPT在生成代码方面的准确率比抛硬币还低!

文章目录 ChatGPT真的能帮助程序员&#xff1f;使用ChatGPT时需要注意的事情ChatGPT的局限性如何有效地使用ChatGPT ✍创作者&#xff1a;全栈弄潮儿 &#x1f3e1; 个人主页&#xff1a; 全栈弄潮儿的个人主页 &#x1f3d9;️ 个人社区&#xff0c;欢迎你的加入&#xff1a;全…