(1)概述:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。
(2)需求:
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
(3)分析:
步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。
package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}
(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。
# Name the components on this agent
#组件声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port =44444# Describe the sink
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)开启任务
bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console
(5)开启端口并发送消息
nc localhost 44444
(6)结果