go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{path stringtopic stringTailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}

修改模块的全部代码

  • main.go
package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}
  • common.go
package commontype CollectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}

运行结果

在这里插入图片描述
当你对不同日志文件修改都有反馈时就代表运行成功啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/651853.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EasyRecovery数据恢复软件2025激活码及下载使用步骤教程

EasyRecovery数据恢复软件是一款功能强大且用户友好的数据恢复工具&#xff0c;专为帮助用户找回因各种原因丢失的数据而设计。该软件由全球知名的数据恢复技术公司开发&#xff0c;经过多年的技术积累和更新迭代&#xff0c;已经成为行业内备受推崇的数据恢复解决方案。 EasyR…

WildCard开通GitHub Copilot

更多AI内容请关注我的专栏&#xff1a;《体验AI》 期待您的点赞&#x1f44d;收藏⭐评论✍ WildCard开通GitHub Copilot GitHub Copilot 简介主要功能工作原理 开通过程1、注册Github账号2、准备一张信用卡或虚拟卡3、进入github copilot页4、选择试用5、选择支付方式6、填写卡…

AM解调 FPGA(寻找复刻电赛电赛D题的)

设计平台 Quartus II10.3mif产生工具modelsimSE &#xff08;仿真用&#xff09; DDS&#xff08;直接数字式频率合成器&#xff09; 从前面的内容可知&#xff0c;我们需要产生一个载波&#xff0c;并且在仿真时&#xff0c;我们还需要一个较低频率的正弦波信号来充当我们的…

中电金信:深度解析|数字化营销运营体系搭建

如何更好更快地梳理好体系搭建思路&#xff0c;稳步实现落地&#xff1f;下文将为大家明确搭建的推进步骤、执行要点&#xff0c;帮助商业银行理顺数字化营销运营体系的“点”“线”“面”~ 与所有转型的曲折、阵痛等特征一样&#xff0c;商业银行构建数字化营销运营体系过程中…

泛微E9开发 如何自定义流程标题

1、功能背景 主表中有“选择类别”下拉框字段&#xff0c;用户可以根据需求来选择申请类别&#xff0c;一般多个相似流程的申请可以合并成一个&#xff0c;但是为了区分&#xff0c;我们可以通过将标题修改的方式来使整个显示页面更明确。 2、展示效果 3、实现方法 注意&…

【算法基础实验】图论-构建无向图

构建无向图 前提 JAVA实验环境 理论 无向图的数据结构为邻接表数组&#xff0c;每个数组中保存一个Bag抽象数据类型&#xff08;Bag类型需要专门讲解&#xff09; 实验数据 我们的实验数据是13个节点和13条边组成的无向图&#xff0c;由一个txt文件来保存&#xff0c;本…

php 编译安装oracel扩展

第一步安装Oracle客户端 1&#xff0c;需要下载基础包和sdk oracle客户端下载链接&#xff1a;Oracle Instant Client Downloads for Linux x86-64 (64-bit) https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html 选择最新版本 versi…

[C++基础学习]----03-程序流程结构之循环结构详解

前言 在C程序中&#xff0c;循环结构在用于重复执行一段代码块&#xff0c;直到满足某个条件为止。循环结构有多种形式&#xff0c;包括while循环、do-while循环和for循环。 正文 01-循环结构简介 1、while循环语句&#xff1a; while循环在每次循环开始前检查条件是否为真&a…

clickhouse与oracle传输数据

参考 https://github.com/ClickHouse/clickhouse-jdbc-bridge https://github.com/ClickHouse/clickhouse-jdbc-bridge/blob/master/docker/README.md clickhouse官方提供了一种方式&#xff0c;可以实现clickhouse与oracle之间传输数据&#xff0c;不仅仅是oracle&#xff0…

python 使用flask_httpauth和pyjwt实现登录权限控制

最近需要用到&#xff0c;学习了一下记录 首先安装依赖 pip install Flask-HTTPAuth pyjwt passlib Welcome to Flask-HTTPAuth’s documentation! — Flask-HTTPAuth documentation Welcome to PyJWT — PyJWT 2.8.0 documentation Passlib 1.7.4 documentation — Passl…

【北京迅为】《iTOP龙芯2K1000开发指南》-第四部分 ubuntu开发环境搭建

龙芯2K1000处理器集成2个64位GS264处理器核&#xff0c;主频1GHz&#xff0c;以及各种系统IO接口&#xff0c;集高性能与高配置于一身。支持4G模块、GPS模块、千兆以太网、16GB固态硬盘、双路UART、四路USB、WIFI蓝牙二合一模块、MiniPCIE等接口、双路CAN总线、RS485总线&#…

Linux驱动开发——(七)Linux阻塞和非阻塞IO

目录 一、阻塞和非阻塞IO简介 二、等待队列 2.1 等待队列头 2.2 等待队列项 2.3 将队列项添加/移除等待队列头 2.4 等待唤醒 2.5 等待事件 三、轮询 四、驱动代码 4.1 阻塞IO 4.2 非阻塞IO 一、阻塞和非阻塞IO简介 IO指的是Input/Output&#xff0c;也就是输入/输…