go语言并发实战——日志收集系统(六) 编写日志收集系统客户端

上节回顾

在上一篇文章中我们介绍了编写客户端的四个步骤,分别是:

  • 读取配置文件,寻找日志路径
  • 初始化服务
  • 根据日志路径l来收集日志
  • 将收集到的日志发送Kafka中
    关于上述的内容博主画了一个思维导图(有点丑,大家勉强看看,以前没画过):
    在这里插入图片描述
    对了,为了画这个思维导图昨天博主找了好久思维导图的软件,最后发现了Vscode上面有一个非常不错的插件:drawio,样子大概是这样的:
    在这里插入图片描述
    大家如果没有合适的思维导图绘制根据,可以试试这个。好了,话不多说,开始今天的内容。

读取配置信息,获取日志信息

前言

这里读取日志信息我们选择的是go-ini这一第三方包,具体的使用方法在我前面的博文这种有所介绍,大家不了解的话可以参考:
go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件

需求分析

这里配置文件中我们主要要知道两个消息,一个Kafka的配置信息,一个是日志文件的路径,配置文件应该是这样的:

[kafka]
address=127.0.0.1:9092
topic=web.log
chan_size=100000[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1

而为了方便我们利用反射来读取配置文件,我们来创建几个结构体来存储我们读到的配置信息:

  • Kafka结构体
type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}
  • tail结构体
type LogFilePath struct {Path string `ini:"logfile_path"`
}
  • 总的结构体

type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`
}

然后读取配置信息放入结构体中:

	//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}

这样我们就获得我们所需要的配置消息了

初始化服务

前言

这里我们初始服务主要是初始化Kafka以及tail包,利用它们读取日志信息并将其发送Kafka中,具体介绍可以参考前面的几篇文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(四) 利用tail包实现对日志文件的实时监控

Kafka的初始化

	//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")

tail的初始化

func InitTail(filename string) (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}

根据路径来读取日志

需求分析

一般我们常见的想法会是我们先将日志消息读取出来然后发送给Kafka但是这样的串行操作无疑会大大增加程序的运行时间,所以这里我们选择将读到的日志信息打包发送到管道中,然后再看起一个协程来发送数据,这样实现了读取与发送的一步操作,可以有效降低程序的运行时间,而上面出现的MessageSiz也就是我们设置的管道大小

func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}

发送消息到KafKa

func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}

完整代码

  • main.go
package mainimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/tailFile""time"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化tailerr = tailFile.InitTail(ConfigObj.LogFilePath.Path)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")//利用sarama报发送消息到Kafka中err = run(ConfigObj)
}
  • Kafka.go
package Kafkaimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)var (client  sarama.SyncProducerMsgChan chan *sarama.ProducerMessage
)func InitKafka(address []string, Chan_size int64) (err error) {//初始化MsgChanMsgChan = make(chan *sarama.ProducerMessage, Chan_size)//初始化configconfig := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = true//连接Kafkaclient, err = sarama.NewSyncProducer(address, config)if err != nil {logrus.Error("kafka connect error,err:%v", err)return}go SendMsg()return
}func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail"
)var TailObj *tail.Tailfunc InitTail(filename string) (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}

运行结果

在运行前打开ZooKeeper与Kafka,然后对日志文件进行操作,会出现:
在这里插入图片描述
出现

2024/04/22 20:26:34 Seeked G:\goproject\-goroutine-\log-agent\log\log1 - &{Offset:0 Whence:2}
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 3 
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 4

就代表运行成功了。

结语

今天的有关内容就到此为止啦,有问题的话欢迎在评论区评论,大家可以集思广益,如果你觉得博主的内容对你有帮助,欢迎三连一下和订阅专栏
如果博主文章里面有什么错误页欢迎斧正(毕竟博主页只是个小蒟蒻鸡),下篇文章我们要进入etcd的有关学习了,好了,大家下篇文章见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/640994.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AJAX——同步代码和异步代码

1.同步代码 同步代码&#xff1a;浏览器是按照我们书写代码的顺序一行一行地执行程序的。浏览器会等待代码的解析和工作&#xff0c;在上一行完成后才会执行下一行。这样做是很有必要的&#xff0c;因为每一行新的代码都是建立在前面代码的基础之上的。 这也使得它成为一个同步…

SLICEM是如何将查找表配置为分布式RAM/移位寄存器的

1.首先说SliceM和SliceL如何配置为ROM的 一个SLICE包含4个六输入查找表&#xff0c;因此每个查找表就能存储64bit的数据&#xff0c;要实现128bit的ROM&#xff0c;只需要通过两个LUT就可实现&#xff0c;具体如下表: 2.如何配置成为分布式RAM SLICEM中的LUT如下图&#xff…

一文讲透彻Redis 持久化

文章目录 ⛄1.RDB持久化&#x1fa82;&#x1fa82;1.1.执行时机&#x1fa82;&#x1fa82;1.2.RDB原理&#x1fa82;&#x1fa82;1.3.小结 ⛄2.AOF持久化&#x1fa82;&#x1fa82;2.1.AOF原理&#x1fa82;&#x1fa82;2.2.AOF配置&#x1fa82;&#x1fa82;2.3.AOF文件…

springcloudgateway集成knife4j

上篇我们聊聊springboot是怎么继承knife4j的。springboot3 集成knife4j-CSDN博客 本次我们一起学习springcloudgateway集成knife4j。 环境介绍 java&#xff1a;17 SpringBoot&#xff1a;3.2.0 SpringCloud&#xff1a;2023.0.0 knife4j &#xff1a; 4.4.0 引入maven配置…

【JavaScriptThreejs】判断路径在二维平面上投影的方向顺逆时针

原理分析 可以将路径每个连续的两点向量叉乘相加&#xff0c;根据正负性判断路径顺逆时针性 当我们计算两个向量的叉积时&#xff0c;结果是一个新的向量&#xff0c;其方向垂直于这两个向量所在的平面&#xff0c;并且其大小与这两个向量构成的平行四边形的面积成正比。这个新…

MyBatisCodeHelperPro插件免激活安装

1、下载 MyBatisCodeHelperPro 插件包&#xff08;内部已经激活&#xff09; 链接: https://pan.baidu.com/s/1i2Nvlnaea92U1Jx5E8xJUA 提取码: jmms 2、安装&#xff0c;点开插件&#xff0c;选择本地安装&#xff0c;选择下载的MyBatisCodeHelper-Pro.zip即可完成安装。

面试十七、list和deque

一、 Deque Deque容器是连续的空间&#xff0c;至少逻辑上看来如此&#xff0c;连续现行空间总是令我们联想到array和vector,array无法成长&#xff0c;vector虽可成长&#xff0c;却只能向尾端成长&#xff0c;而且其成长其实是一个假象&#xff0c;事实上(1) 申请更大空间 (…

数据结构-循环队列和循环双端队列的多角度实现

文章目录 1. 循环队列的数组形式实现2. 循环队列的链表实现3. 循环双端队列的数组形式实现4. 循环双端队列的链表实现 在力扣的题面如下 1. 循环队列的数组形式实现 其实循环队列的数组形式只有下面要注意的点,只要掌握了下面的这几点,代码层面上就没有什么问题了 用数组模拟的…

图论基础知识 深度搜索(DFS,Depth First Search),广度搜索(BFS,Breathe First Search)

图论基础知识 学习记录自代码随想录 dfs 与 bfs 区别 dfs是沿着一个方向去搜&#xff0c;不到黄河不回头&#xff0c;直到搜不下去了&#xff0c;再换方向&#xff08;换方向的过程就涉及到了回溯&#xff09;。 bfs是先把本节点所连接的所有节点遍历一遍&#xff0c;走到下…

windows SDK编程 --- 消息(3)

前置知识 一、消息的分类 1. 鼠标消息 处理与鼠标交互相关的事件&#xff0c;比如移动、点击和滚动等。例如&#xff1a; WM_MOUSEMOVE: 当鼠标在窗口客户区内移动时发送。WM_LBUTTONDOWN: 当用户按下鼠标左键时发送。WM_LBUTTONUP: 当用户释放鼠标左键时发送。WM_RBUTTOND…

C语言--基础面试真题

1、局部变量和静态变量的区别 普通局部变量和静态局部变量区别 存储位置&#xff1a; 普通局部变量存储在栈上 静态局部变量存储在静态存储区 生命周期&#xff1a; 当函数执行完毕时&#xff0c;普通局部变量会被销毁 静态局部变量的生命周期则是整个程序运行期间&#…

Oracle 21 C 安装详细操作手册,并配置客户端连接

Oracle 21 C 安装详细操作手册 Win 11 Oracle 21C 下载&#xff1a; Database Software Downloads | Oracle 中国 云盘共享 链接&#xff1a;https://pan.baidu.com/s/12XCilnFYyLFnSVoU_ShaSA 提取码&#xff1a;nfwc Oracle 21C 配置与登陆&#xff1a; 开始菜单 NetMa…