go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

前言

在之前我们j基于sarama,tail还有go-ini实现了日志收集系统客户端的编写,但是我们梳理一下可以发现,该客户端还存在一些问题:

  • 客户端一次只能读取一个日志文件,无法同时读取多个分区
  • 无法管理日志存放的分区(topic)
    那我们一个如何去解决这个问题呢?在前两篇文章中我们介绍了etcd,它通过可以存储键值对并且通过watch操作来实现对键值对的实时监控,那我们能不能尝试用`etcd``来储存日志文件信息与对应分区信息?这就是我们今天这篇文章所探究的主题.

备注: etcd博文地址:
go语言并发实战——日志收集系统(七) etcd的介绍与简单使用
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控

初步实现的流程

存储数据格式

这里为了存储数据方便,我们利用json格式来存储数据,示例如下:

[{"path": "G:/goproject/-goroutine-/log-agent/log/log1","topic": "web.log"},{"path": "G:/goproject/-goroutine-/log-agent/log/log2","topic": "s4.log"}
]

etcd初始化的编写

在之前有段etcd的博文中,我们已经介绍过etcd的基本使用,这里不做赘述,首先我们在log-agent文件夹下创建etcd文件夹,创建etcd.go文件,编写etcd的初始化:

func Init(address []string) (err error) {client, err = clientv3.New(clientv3.Config{Endpoints:   address,DialTimeout: 5 * time.Second,})if err != nil {logrus.Error("etcd client connect failed,err:%v", err)return}return
}

然后 在main.go文件中调用:

err = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")

通过etcd拉取要收集文件的配置项

在初始化etcd后,我们就要通过etcd来拉取要收集文件的配置项了,首先定义一个结构体来接收信息:

type collectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}

然后创建拉取配置项的函数:

func GetConf(key string) (err error, collectEntryList []collectEntry) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()response, err := client.Get(ctx, key)if err != nil {logrus.Error("get conf from etcd failed,err:%v", err)return}if len(response.Kvs) == 0 {logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)return}fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryListif err != nil {logrus.Error("json unmarshal failed,err:%v", err)return}return
}

然后在main.go里面调用一下就可以了:

	//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)

尝试用之前的demo设置一下配置文件中的key对应的value:

package mainimport ("context""fmt"clientv3 "go.etcd.io/etcd/client/v3""time"
)func main() {cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"127.0.0.1:2379"}, //服务端通信端口DialTimeout: 5 * time.Second,            //连接超时时间})//putctx, cancel := context.WithTimeout(context.Background(), time.Second)str := "[{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log1\",\"topic\":\"web.log\"},{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log2\",\"topic\":\"s4.log\"}]"_, err = cli.Put(ctx, "collect_log_conf", str)cancel()if err != nil {fmt.Println("put failed,err:%v", err)return}}

运行就可以看到我们接收到了配置项了:
在这里插入图片描述

涉及改动处的源代码

  • 配置文件
[kafka]
address=127.0.0.1:9092
topic=test1.log
chan_size=100000[etcd]
address=127.0.0.1:2379
collect_key=collect_log_conf[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
  • main.go
package mainimport ("fmt""github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile""strings""time"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(ConfigObj.LogFilePath.Path)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")//利用sarama报发送消息到Kafka中err = run(ConfigObj)if err != nil {logrus.Error("run failed, err:%v", err)return}
}
  • etcd.go
package etcdimport ("encoding/json""fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"clientv3 "go.etcd.io/etcd/client/v3""golang.org/x/net/context""time"
)var client *clientv3.Clienttype collectEntry struct {Path  string `json:"path"`Topic string `json:"topic"`
}func Init(address []string) (err error) {client, err = clientv3.New(clientv3.Config{Endpoints:   address,DialTimeout: 5 * time.Second,})if err != nil {logrus.Error("etcd client connect failed,err:%v", err)return}return
}func GetConf(key string) (err error, collectEntryList []collectEntry) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)response, err := client.Get(ctx, key)cancel()if err != nil {logrus.Error("get conf from etcd failed,err:%v", err)return}if len(response.Kvs) == 0 {logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)return}fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryListif err != nil {logrus.Error("json unmarshal failed,err:%v", err)return}return
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/650812.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于DEAP数据集的四种机器学习方法的情绪分类

在机器学习领域&#xff0c;KNN&#xff08;K-Nearest Neighbors&#xff09;、SVM&#xff08;Support Vector Machine&#xff09;、决策树&#xff08;Decision Tree&#xff09;和随机森林&#xff08;Random Forest&#xff09;是常见且广泛应用的算法。 介绍 1. KNN&am…

1-内核开发环境ubuntu+virtualbox+mobaXterm搭建

内核开发环境 ubuntuvirtualboxmobaXterm搭建 目录 内核开发环境 ubuntuvirtualboxmobaXterm搭建 1.virtualbox 安装 2.ubuntu 安装 3.网络设置 4.虚拟机安装ssh 服务&#xff0c;更新ubuntu 源安装基本软件 5.mobaXterm 个人免费版本安装 6.总结 本课程教程从0-1开始教…

ASP.NET某企业信息管理系统的设计与实现

摘 要 信息管理系统就是我们常说的MIS(Management Information System),它是一个计算机软硬件资源以及数据库的人-机系统。经过对题目和内容的分析,选用了Microsoft公司的ASP.NET开发工具,由于它提供了用于从数据库中访问数据的强大工具集,使用它可以建立开发比较完善的数据库…

等保测评有那些流程?为什么要做等保

根据《网络安全法》规定&#xff0c;网络运营者应当按照国家的网络安全技术标准和要求&#xff0c;采取技术措施保障网络安全&#xff0c;避免网络安全事件的发生。而等保测评是国家对企事业单位进行信息系统安全等级评定的一项重要制度&#xff0c;通过等级测评&#xff0c;可…

Linux下怎么快速部署MySQL服务,并使用

下载镜像 [zrylocalhost ~]$ docker pull mysql Using default tag: latest latest: Pulling from library/mysql bce031bc522d: Pull complete cf7e9f463619: Pull complete 105f403783c7: Pull complete 878e53a613d8: Pull complete 2a362044e79f: Pull complete 6e4d…

C语言进阶:进阶指针(下)

一、 函数指针数组 我们都知道 数组是一个存放相同类型数据的存储空间 那我们已经学习了指针数组 那么函数有没有对应的指针数组呢&#xff1f; 如果有那应该怎么定义呢&#xff1f; 1. 函数指针数组的定义 我们说 函数指针数组的定义 应该遵循以下格式 int (*p[10])(); 首…

thinkphp5.0.23漏洞复现以及脚本编写

1 thinkphp5.0.23远程代码执行漏洞简介 ThinkPHP5.0.23漏洞主要涉及远程代码执行(RCE)的安全隐患。这一漏洞的存在是因为ThinkPHP框架在底层对控制器名的过滤不够严格,导致攻击者有可能通过特定的URL构造,调用到框架内部的敏感函数,进而执行任意命令。 2 thinphp5.0.23漏…

安装或者更新VMware提示:

the file is not avalid installation package for the product Microsoft Visual C 2015 x86 Minimum Runtime Try to find the installation pakcage vc_runtimeMinimum_x86.msi in a folder from which you can install Microsoft Visual C 2015 x86 Minimum Runtime 解决方…

Baidu comate智能编程助手评测

Baidu comate智能编程助手评测 作者&#xff1a;知孤云出岫 目录 一&#xff0e; 关于comate产品 二&#xff0e; 关于comate产品体验 三&#xff0e; 关于实际案例. 四&#xff0e; 关于baidu comate编程助手的实测体验感悟 五&#xff0e; …

与健康码一起落寞的,还有不被投资者待见的圣湘生物们

&#xff08;题图&#xff09; 文&#xff5c;新熔财经 作者&#xff5c;宏一 都2024年了&#xff0c;没想到还有用到健康码的一天。 前段时间&#xff0c;我咳嗽比较严重&#xff0c;去当地医院挂号做了个肺部CT&#xff0c;结果医生居然让我出示健康码&#xff0c;让我感…

力扣HOT100 - 994. 腐烂的橘子

解题思路&#xff1a; 因为要记录轮数&#xff08;分钟数&#xff09;&#xff0c;所以不能一口气遍历到底&#xff0c;所以不能用深搜&#xff08;bfs&#xff09;&#xff0c;而要用广搜&#xff08;bfs&#xff0c;层序遍历&#xff09;。 先记录下新鲜橘子数&#xff0c;…

2024年618有哪些数码家电值得入手?全网最省钱攻略指南

作为全年唯一设在夏季的大型电商狂欢节&#xff0c;618一直是很多人购置数码类、家电类的最好时间节点之一。但是问题来了&#xff0c;现在的数码家电行业“鱼龙混杂”&#xff0c;不仅越来越多新品牌涌入市场&#xff0c;而且各个大品牌为了抢占市场&#xff0c;旗下产品的品类…