0301taildir-source报错-flume-大数据

1 基础环境简介

linux系统:centos,前置安装:jdk、hadoop、zookeeper、kafka,版本如下

软件版本描述
centos7linux系统发行版
jdk1.8java开发工具集
hadoop2.10.0大数据生态基础组件
zookeeper3.5.7分布式应用程序协调服务
kafka3.0分布式mq组件
flume1.9.0分布式采集传输组件

2 报错

  • 场景1:动态监控目录多个日志变化,通过flume采集传输到kafka

  • 报错日志

    org.apache.flume.FlumeException: Error creating positionFile parent directoriesat org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)at org.apache.flume.conf.Configurables.configure(Configurables.java:41)at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)
    Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flumeat sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)at java.nio.file.Files.createDirectory(Files.java:674)at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)at java.nio.file.Files.createDirectories(Files.java:727)at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)... 11 more
  • conf文件如下

    #定义组件
    a1.sources = r1
    a1.channels = c1#配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /export/server/flume/taildir_position.json#配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
    a1.channels.c1.kafka.topic = topic_log01
    a1.channels.c1.parseAsFlumeEvent = false#组装 
    a1.sources.r1.channels = c1
    
  • 原因就是在创建positionFile的时候父目录已存在

  • 场景2:我们生成的日志文件app.log 每经过一天会按照日期重命名文件,然后生成新的app.log,此时flume会重新采集所有的日志信息,导致信息重复采集2次。

  • Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}{"inode":2496275,"pos":12,"file":"/opt/module/flume/files2/log.txt"}
    
  • 而flume会同时判断Inode和file来确定是否同一文件

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统

    用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来

    识别文件。

3 解决

场景1解决方案有两种:

  1. 既然是创建父目录已存在,我们可以吧positionFile位置重新配置。

  2. 修改源代码,我们通过源代码找下处理逻辑,下载1.9.0版本的flume源代码,官网地址:https://archive.apache.org/dist/flume/,找到TailSource 170行

     @Overridepublic synchronized void configure(Context context) {String fileGroups = context.getString(FILE_GROUPS);Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),fileGroups.split("\\s+"));Preconditions.checkState(!filePaths.isEmpty(),"Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");String homePath = System.getProperty("user.home").replace('\\', '/');positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);Path positionFile = Paths.get(positionFilePath);try {// 此处创建父目录,如果存在报错Files.createDirectories(positionFile.getParent());} catch (IOException e) {throw new FlumeException("Error creating positionFile parent directories", e);}headerTable = getTable(context, HEADERS_PREFIX);batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,DEFAULT_CACHE_PATTERN_MATCHING);backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);fileHeader = context.getBoolean(FILENAME_HEADER,DEFAULT_FILE_HEADER);fileHeaderKey = context.getString(FILENAME_HEADER_KEY,DEFAULT_FILENAME_HEADER_KEY);maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);if (maxBatchCount <= 0) {maxBatchCount = DEFAULT_MAX_BATCH_COUNT;logger.warn("Invalid maxBatchCount specified, initializing source "+ "default maxBatchCount of {}", maxBatchCount);}if (sourceCounter == null) {sourceCounter = new SourceCounter(getName());}}
    

    在这里插入图片描述

可以在创建父目录之前检测是否已存在,如果已存在,直接跳过创建即可,修改try代码块中内容如下

boolean exists = Files.exists(positionFile.getParent());if (!exists)Files.createDirectories(positionFile.getParent());

maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示:在这里插入图片描述

重新运行,正常启动,如下图日志所示:在这里插入图片描述

kafka中新接收的数据如下图所示:在这里插入图片描述

场景2解决方案 把TailFile如下代码

  public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode && this.path.equals(path)) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}// 修改为public boolean updatePos(String path, long inode, long pos) throws IOException {if (this.inode == inode) {setPos(pos);updateFilePos(pos);logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);return true;}return false;}

即不校验file只校验inode,具体这里不再去验证,有兴趣自己验证下

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

参考链接:

[1]flume教学视频[CP/OL].2020-04-16.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/537971.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深入理解设计模式】命令设计模式

命令设计模式&#xff1a; 命令模式&#xff08;Command Pattern&#xff09;是一种行为型设计模式&#xff0c;它将请求封装为一个对象&#xff0c;从而使你可以用不同的请求对客户端进行参数化&#xff0c;对请求排队或记录请求日志&#xff0c;以及支持可撤销的操作。 概述…

YOLOv9实例分割教程|(一)训练教程

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;主力高效涨点&#xff01;&#xff01;&#xff01; 一、创建数据集及数据配置文件 创新一个文件夹存放分割数据集&#xff0c;包含一个images和labels文件夹。标签格式如下所示&#xff1a; 创新数据集…

可视化Relay IR

目标 为Relay IR生成图片形式的计算图。 实现方式 使用RelayVisualizer可视化Relay&#xff0c;RelayVisualizer定义了一组接口&#xff08;包括渲染器、解析器&#xff09;将IRModule可视化为节点和边&#xff0c;并且提供了默认解析器和渲染器。 首先需要安装依赖&#x…

基于PHP的数字化档案管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的数字化档案管理系统 一 介绍 此数字化档案管理系统基于原生PHP&#xff0c;MVC架构开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统角色分为用户和管理员。 技术栈 php(mvc)mysqlbootstrapphpstudyvscode 二 功能 …

Js输入输出语句

输入语法 prompt("您想输入的是&#xff1f;")输出语法: 语法1: document.write(‘要出的内容’&#xff09; <body><script>document.write("你好")document.write("<h1>我是<h1>")</script> </body>作…

武汉星起航:秉承客户至上服务理念,为创业者打造坚实后盾

在跨境电商的激荡浪潮中&#xff0c;武汉星起航电子商务有限公司一直秉持着以客户为中心的发展理念&#xff0c;为跨境创业者提供了独特的支持和经验积累&#xff0c;公司通过多年的探索和实践&#xff0c;成功塑造了一个以卖家需求为导向的服务平台&#xff0c;为每一位创业者…

MongoDB从0到1:高效数据使用方法

MongoDB&#xff0c;作为一种流行的NoSQL数据库。从基础的文档存储到复杂的聚合查询&#xff0c;从索引优化到数据安全都有其独特之处。文末附MongoDB常用命令大全。 目录 1. 引言 MongoDB简介 MongoDB的优势和应用场景 2. 基础篇 安装和配置MongoDB MongoDB基本概念 使…

Android 学习之追踪应用的安装情况

先上结论&#xff0c;急用的话直接看结论 结论一、借助 API 读取安装信息&#xff0c;然后上报二、借助手动埋点&#xff0c;然后上报三、对比 前提过程 结论 一、借助 API 读取安装信息&#xff0c;然后上报 通过 PackageManager 的 API&#xff0c;我们可以得知自身应用安装…

Devin内测注册全攻略:一文带你快速体验最新AI软件工程师技术 ️

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

力扣刷题Days18-190颠倒二进制位(js)

目录 1&#xff0c;题目 2&#xff0c;代码 1&#xff0c;逐位颠倒 800001011 循环过程&#xff1a; 最终结果&#xff1a; 3&#xff0c;学习与总结 1&#xff0c;<< 位运算符 1&#xff0c;题目 颠倒给定的 32 位无符号整数的二进制位。 2&#xff0c;代码 1…

利用位运算符设置标志位

在写程序的过程中&#xff0c;会碰到需要修改标志位的情况。比如需要设置一个文件标识符可读或可写&#xff0c;首先想到的是利用int变量&#xff08;1表示不可读不可写 &#xff0c;2表示不可读可写&#xff0c;3表示可读不可写&#xff0c;4表示可读可写&#xff09;。但是这…

Challenge 6 - OSCP C

156 开放了161端口,直接snmpbulkwalk扫描 snmpbulkwalk -c public -v2c 192.168.243.156 NET-SNMP-EXTEND-MIB::nsExtendObjects得到账号密码jack:3PUKsX98BMupBiCf 试了那几个web端口,发现可以登录8083的vesta,但是需要用户名大写Jack 接着登录之后里面可以创建计划任务…