使用Filebeat+Kafka+Logstash+Elasticsearch构建日志分析系统

        随着时间的积累,日志数据会越来越多,当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到Elasticsearch中,并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。

一、背景信息

Kafka是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。

在实际应用场景中,为了满足大数据实时检索的需求,您可以使用Filebeat采集日志数据,并输出到Kafka中。Kafka实时接收Filebeat采集的数据,并输出到Logstash中。输出到Logstash中的数据在格式或内容上可能不能满足您的需求,此时可以通过Logstash的filter插件过滤数据。最后将满足需求的数据输出到Elasticsearch中进行分布式检索,并通过Kibana进行数据分析与展示。简单流程如下。

流程图

二、操作流程

1、准备工作

完成环境准备,包括创建Elasticsearch、Logstash、ECS和消息队列 Kafka 版实例、创建Topic和Consumer Group等。

2、步骤一:安装并配置Filebeat

  安装并配置Filebeat,设置input为系统日志,output为Kafka,将日志数据采集到Kafka的指定Topic中。

3、步骤二:配置Logstash管道

配置Logstash管道的input为Kafka,output为阿里云Elasticsearch,使用Logstash消费Topic中的数据并传输到阿里云Elasticsearch中。

4、步骤三:查看日志消费状态

在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。

5、步骤四:通过Kibana过滤日志数据

在Kibana控制台的Discover页面,通过Filter过滤出Kafka相关的日志。

三、步骤一:安装并配置Filebeat

  1. 连接ECS服务器。

  2. 安装Filebeat。

    ​本文以6.8.5版本为例,安装命令如下,详细信息请参见Install Filebeat。

    curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.5-linux-x86_64.tar.gz
    tar xzvf filebeat-6.8.5-linux-x86_64.tar.gz
  3. 执行以下命令,进入Filebeat安装目录,创建并配置filebeat.kafka.yml文件。

    cd filebeat-6.8.5-linux-x86_64
    vi filebeat.kafka.yml

    filebeat.kafka.yml配置如下。

    filebeat.prospectors:- type: logenabled: truepaths:- /var/log/*.logoutput.kafka:hosts: ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092"]topic: estestversion: 0.10.2

    重要

    当Filebeat为7.0及以上版本时,filebeat.prospectors需要替换为filebeat.inputs。

    参数

    说明

    type

    输入类型。设置为log,表示输入源为日志。

    enabled

    设置配置是否生效:

    • true:生效

    • false:不生效

    paths

    需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。

    hosts

    消息队列Kafka实例的单个接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点中的任意一个接入点。

    topic

    日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。

    version

    Kafka的版本,可在消息队列Kafka的实例详情页面获取。

    重要

    • 不配置此参数会报错。

    • 由于不同版本的Filebeat支持的Kafka版本不同,例如8.2及以上版本的Filebeat支持的Kafka版本为2.2.0,因此version需要设置为Filebeat支持的Kafka版本,否则会出现类似报错:Exiting: error initializing publisher: unknown/unsupported kafka vesion '2.2.0' accessing 'output.kafka.version' (source:'filebeat.kafka.yml'),详细信息请参见version。

  4. 启动Filebeat。

    ./filebeat -e -c filebeat.kafka.yml

四、步骤二:配置Logstash管道

  1. 进入阿里云Elasticsearch控制台的Logstash页面。
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 单击创建管道

  5. 创建管道任务页面,输入管道ID并配置管道。

    本文使用的管道配置如下。

    input {kafka {bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]group_id => "es-test"topics => ["estest"]codec => json}
    }
    filter {}
    output {elasticsearch {hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"user =>"elastic"password =>"<your_password>"index => "kafka‐%{+YYYY.MM.dd}"}
    }
    表 1. input参数说明

    参数

    说明

    bootstrap_servers

    消息队列Kafka实例的接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点。

    group_id

    指定为您已创建的Consumer Group的名称。

    topics

    指定为您已创建的Topic的名称,需要与Filebeat中配置的Topic名称保持一致。

    codec

    设置为json,表示解析JSON格式的字段,便于在Kibana中分析。

    表 2. output参数说明

    参数

    说明

    hosts

    阿里云Elasticsearch的访问地址,取值为http://<阿里云Elasticsearch实例的私网地址>:9200

    说明

    您可在阿里云Elasticsearch实例的基本信息页面获取其私网地址,详情请参见查看实例的基本信息。

    user

    访问阿里云Elasticsearch的用户名,默认为elastic。您也可以使用自建用户,详情请参见通过Elasticsearch X-Pack角色管理实现用户权限管控。

    password

    访问阿里云Elasticsearch的密码,在创建实例时设置。如果忘记密码,可进行重置,重置密码的注意事项及操作步骤请参见重置实例访问密码。

    index

    索引名称。设置为kafka‐%{+YYYY.MM.dd}表示索引名称以kafka为前缀,以日期为后缀,例如kafka-2020.05.27

    更多Config配置详情请参见Logstash配置文件说明。

    如果您有多topic的数据同步需求,需要在kafka中添加新的topic,然后在Logstash的管道配置中添加input。示例如下:

    input {kafka {bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]group_id => "es-test"topics => ["estest"]codec => json
    }kafka {bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]group_id => "es-test-2"topics => ["estest_2"]codec => json
    }
    }
  6. 单击下一步,配置管道参数。

    管道参数配置

    参数

    说明

    管道工作线程

    并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。

    管道批大小

    单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。

    管道批延迟

    创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。

    队列类型

    用于事件缓冲的内部排队模型。可选值:

    • MEMORY:默认值。基于内存的传统队列。

    • PERSISTED:基于磁盘的ACKed队列(持久队列)。

    队列最大字节数

    请确保该值小于您的磁盘总容量。默认值:1024 MB。

    队列检查点写入数

    启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。

    警告

    配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。

  7. 单击保存或者保存并部署

    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。

    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

五、步骤三:查看日志消费状态

  1. ​进入消息队列Kafka控制台。

  2. 参见查看消费状态,查看详细消费状态。

    预期结果如下:

    查看消费详情

六、步骤四:通过Kibana过滤日志数据

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。

  2. 创建一个索引模式。

    1. 在左侧导航栏,单击Management

    2. 在Kibana区域,单击Index Patterns

    3. 单击Create index pattern

    4. 输入Index pattern(本文使用kafka-*),单击Next step

      创建索引模式

    5. 选择Time Filter field name(本文选择@timestamp),单击Create index pattern

      Time Filter field name

  3. 在左侧导航栏,单击Discover

  4. 从页面左侧的下拉列表中,选择您已创建的索引模式(kafka-*)。

  5. 在页面右上角,选择一段时间,查看对应时间段内的Filebeat采集的日志数据。

    查看日志数据

  6. 单击Add a filter,在Add filter页面中设置过滤条件,查看符合对应过滤条件的日志数据。

    过滤日志数据

七、常见问题

Q:同步日志数据出现问题,管道一直在生效中,无法将数据导入Elasticsearch,如何解决?

A:查看Logstash实例的主日志是否有报错,根据报错判断原因,具体操作请参见查询日志。常见的原因及解决方法如下。

原因

解决方法

Kafka的接入点不正确。

参见查看接入点获取正确的接入点。完成后,修改管道配置替换错误接入点。

Logstash与Kafka不在同一VPC下。

重新购买同一VPC下的实例。购买后,修改现有管道配置。

说明

VPC实例只能通过专有网络VPC访问

云消息队列 Kafka 版

Kafka或Logstash集群的配置太低,例如使用了测试版集群。

升级集群规格,完成后,刷新实例,观察变更进度。升级Logstash实例规格的具体操作,请参见升配集群;升级Kafka实例规格的具体操作,请参见升级实例配置。

管道配置中包含了file_extend,但没有安裝logstash-output-file_extend插件。

选择以下任意一种方式处理:

  • 安装logstash-output-file_extend插件。具体操作,请参见 安装或卸载插件。

  • 中断变更,等到实例处于变更中断状态后,在管道配置中,去掉file_extend配置,触发重启恢复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/174699.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UWB基础——IEEE 802.15.4z中可选波形

在前面的文章&#xff1a;UWB基础——基带简介中介绍了关于UWB基带脉冲波形以及相关的定义&#xff0c;本文继续介绍在IEEE 802.15.4z-2020标准中新增的一些兼容脉冲形状。 1. 基带脉冲响应 传输脉冲形状p(t)受到与标准参考脉冲r(t)的互相关函数形状的限制。 两个脉冲之间归一…

基于连续Hopfield神经网络优化——旅行商问题优化计算

大家好&#xff0c;我是带我去滑雪&#xff01; 利用神经网络解决组合优化问题是神经网络应用的一个重要方面。所谓组合优化问题&#xff0c;就是在给定约束条件下&#xff0c;使目标函数极小&#xff08;或极大&#xff09;的变量组合问题。将Hopfield网络应用于求解组合优化问…

php+vue3实现点选验证码

buildadmin 中的点选验证码实现 验证码类 <?phpnamespace ba;use Throwable; use think\facade\Db; use think\facade\Lang; use think\facade\Config;/*** 点选文字验证码类*/ class ClickCaptcha {/*** 验证码过期时间(s)* var int*/private int $expire 600;/*** 可以…

Leetcode刷题详解——太平洋大西洋水流问题

1. 题目链接&#xff1a;417. 太平洋大西洋水流问题 2. 题目描述&#xff1a; 有一个 m n 的矩形岛屿&#xff0c;与 太平洋 和 大西洋 相邻。 “太平洋” 处于大陆的左边界和上边界&#xff0c;而 “大西洋” 处于大陆的右边界和下边界。 这个岛被分割成一个由若干方形单元格…

Spring Cloud学习(八)【RabbitMQ 服务异步通讯】

文章目录 初识 MQ同步通讯异步通讯MQ 常见框架 RabbitMQ 快速入门RabbitMQ 单机部署RabbitMQ概述常见消息模型 SpringAMQPSimpleQueue 模型WorkQueue 模型发布订阅模型发布订阅-Fanout Exchange发布订阅-DirectExchange发布订阅-TopicExchange消息转换器 初识 MQ 同步通讯 同步…

解密图像处理中的利器——直方图与均衡化

直方图与均衡化是数字图像处理中常用的重要工具&#xff0c;它们能够帮助我们更好地理解和改善图像的亮度分布。本文将首先介绍直方图的基本概念以及其在图像处理中的意义&#xff0c;接着详细阐述直方图均衡化的原理和算法。同时&#xff0c;文章将探讨直方图均衡化在图像增强…

基于Vue+SpringBoot的天然气工程运维系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目详细录屏 二、功能模块2.1 系统角色分类2.2 核心功能2.2.1 流程 12.2.2 流程 22.3 各角色功能2.3.1 系统管理员功能2.3.2 用户服务部功能2.3.3 分公司&#xff08;施工单位&#xff09;功能2.3.3.1 技术员角色功能2.3.3.2 材料员角色功能 2.3…

【Redis系列】Redis上设置key,value的时候出现NOAUTH Authentication required提示如何解决?

哈喽&#xff0c;大家好&#xff0c;我是小浪。相信大家在初学一门新的知识点的时候都会遇到各种各样的问题&#xff0c;在网上找了一大堆的解决方案&#xff0c;最后还是无功而返&#xff0c;那么今天博主就记录一下在进行Redis的一些操作中遇到的问题~ 当我们好不容易安装好R…

matlab simulink PSO算法优化simulink的PID参数

1、内容简介 略 13-可以交流、咨询、答疑 PSO算法优化simulink的PID参数 2、内容说明 标准的PSO算法优化simulink的PID参数 PSO、粒子群算法、simulink参数优化 3、仿真分析 4、参考论文 略 链接&#xff1a;https://pan.baidu.com/s/1yQ1yDfk-_Qnq7tGpa23L7g 提取码&…

计算机组成原理——指令系统题库1-20

1、以下有关指令系统的说法中错误的是什么。 A、 指令系统是一台机器硬件能执行的指令全体 B、 任何程序运行前都要先转化为机器语言 C、 指令系统是计算机软件、硬件的界面 D、 指令系统和机器语言是无关的。 2、在CPU执行指令的过程中&#xff0c;指令的地址由什么给出。…

线性模型拟合非线性数据中,如何找到最优的【分箱】数

具体的数据可以回看上一条博客。我们先来始化三个空列表&#xff0c;用于存储后续计算的预测得分、交叉验证得分的平均值和交叉验证得分的方差。 pred,score,var [], [], [] 2. 再定义一个列表&#xff0c;包含了我们想要尝试的分箱数量。 binsrange [2,5,10,15,20,30] 3…

Domino 14中安装配置使用OnTime团队日历组件

大家好&#xff0c;才是真的好。 在9月底发布的Domino 14 EA3中&#xff0c;包含了OnTime团队日历组件。OnTime是一款日历协作功能应用&#xff0c;侧重于团队日历&#xff0c;功能侧重于协作&#xff0c;界面较为简洁、现代等。 Domino如今越来越多地集成进合作伙伴的解决方…