Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】

Go实现LogAgent:海量日志收集系统【下篇】

0 前置文章

Go实现LogAgent:海量日志收集系统【上篇——LogAgent实现】

前面的章节我们已经完成了日志收集(LogAgent),接下来我们需要将日志写入到kafka中,然后将数据落地到Elasticsearch中。

项目架构图:
在这里插入图片描述
项目逻辑图:
在这里插入图片描述

1 docker搭建Elasticsearcsh、Kibana

如果没有docker环境的,可以在本机安装docker desktop

# 1 创建一个docker网络
docker network create es-net
# 查看本机网络
docker network ls
# 删除一个网络
docker network rm es-net# 2 拉取es、kibana镜像
docker pull elasticsearch:7.17.4
docker pull kibana:7.17.4# 3 创建es容器并挂在数据卷
mkdir -p /Users/xxx/docker-home/es-data/_data
mkdir -p /Users/xxx/docker-home/es-plugins
mkdir -p /Users/xxx/docker-home/es-config
mkdir -p /Users/xxx/docker-home/kibana-configtouch elasticsearch.yml
touch kibana.yml

1.需要保证要挂载的目录有读写权限,包括要挂载的配置文件。如果没有则用chmod 777命令
2.如果要挂载配置文件,则需要提前把配置文件内容写好,不能为空,否则可能会影响es和kibana运行。
3.如果只挂载到配置文件目录,不准备配置文件,会导致创建容器后没有配置文件。报错

elasticsearch.yml:

cluster.name: "docker-cluster"
network.host: 0.0.0.0

kibana.yml:

server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

启动es:

docker run -d \--name es7.17.4 -p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx128m" \-v /Users/xxx/docker-home/es-data/_data:/usr/share/elasticsearch/data \-v  /Users/xxx/docker-home/es-plugins:/usr/share/elasticsearch/plugins \-v  /Users/xxx/docker-home/es-config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \--privileged \--network es-net \elasticsearch:7.17.4

启动Kibana:

docker run -d \
--name kibana17 \
--network=es-net \
-p 5601:5601 \
-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 
kibana:7.17.4

-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 其中,es7.17.4的名称为上面es容器的名称

结果:
在这里插入图片描述

2 golang操作es

执行下面代码在es中添加索引,然后到kibana页面创建索引

package mainimport ("context""fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}func main() {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))if err != nil {fmt.Println("connect es error", err)return}fmt.Println("conn es succ")tweet := Tweet{User: "haohan", Message: "This is a test"}_, err = client.Index().Index("twitter").Id("1").BodyJson(tweet).Do(context.Background())if err != nil {// Handle errorpanic(err)return}fmt.Println("insert succ")
}
# 执行上面的go代码执行,控制台输出如下表明插入成功
conn es succ
insert succ

然后我们手动到kibana中添加对应的index即可搜索出对应数据

在这里插入图片描述
在这里插入图片描述

3 开发LogTransfer:从kafka中读取数据并写入es

在前面的开发中,我们已经将日志写入到了kafka。接下来我们要做的就是从kafka中消费数据,然后写入到es中。LogTransfer做的就是这个工作。

3.1 项目结构

├─config
│      logTransfer.conf
│
├─es
│      elasticsearch.go
│   
├─logs
│      my.log
│
└─mainkafka.goconfig.golog.gomain.go

在这里插入图片描述

3.2 项目代码

①LogTransfer/main/main.go

package mainimport ("github.com/astaxie/beego/logs"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")
}

②LogTransfer/main/log.go

package mainimport ("encoding/json""fmt""github.com/astaxie/beego/logs"
)func convertLogLevel(level string) int {switch level {case "debug":return logs.LevelDebugcase "warn":return logs.LevelWarncase "info":return logs.LevelInfocase "trace":return logs.LevelTrace}return logs.LevelDebug
}func initLogger(logPath string, logLevel string) (err error) {config := make(map[string]interface{})config["filename"] = logPathconfig["level"] = convertLogLevel(logLevel)configStr, err := json.Marshal(config)if err != nil {fmt.Println("初始化日志, 序列化失败:", err)return}_ = logs.SetLogger(logs.AdapterFile, string(configStr))return
}

③LogTransfer/main/kafka.go

package mainimport ("github.com/IBM/sarama""github.com/astaxie/beego/logs""strings"
)type KafkaClient struct {client sarama.Consumeraddr   stringtopic  string
}var (kafkaClient *KafkaClient
)func InitKafka(addr string, topic string) (err error) {kafkaClient = &KafkaClient{}consumer, err := sarama.NewConsumer(strings.Split(addr, ","), nil)if err != nil {logs.Error("启动Kafka消费者错误: %s", err)return nil}kafkaClient.client = consumerkafkaClient.addr = addrkafkaClient.topic = topicreturn
}

④LogTransfer/main/config.go

package mainimport ("fmt""github.com/astaxie/beego/config"
)type LogConfig struct {KafkaAddr  stringKafkaTopic stringEsAddr     stringLogPath    stringLogLevel   string
}var (logConfig *LogConfig
)func InitConfig(confType string, filename string) (err error) {conf, err := config.NewConfig(confType, filename)if err != nil {fmt.Printf("初始化配置文件出错:%v\n", err)return}// 导入配置信息logConfig = &LogConfig{}// 日志级别logConfig.LogLevel = conf.String("logs::log_level")if len(logConfig.LogLevel) == 0 {logConfig.LogLevel = "debug"}// 日志输出路径logConfig.LogPath = conf.String("logs::log_path")if len(logConfig.LogPath) == 0 {logConfig.LogPath = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"}// KafkalogConfig.KafkaAddr = conf.String("kafka::server_addr")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka addr失败")return}logConfig.KafkaTopic = conf.String("kafka::topic")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka topic失败")return}// EslogConfig.EsAddr = conf.String("elasticsearch::addr")if len(logConfig.EsAddr) == 0 {err = fmt.Errorf("初识化Es addr失败")return}return
}

④LogTransfer/config/log_transfer.conf

[logs]
log_level = debug
log_path = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"[kafka]
server_addr = localhost:9092
topic = nginx_log[elasticsearch]
addr = http://localhost:9200/

⑤LogTransfer/es/es.go

package mainimport ("context""fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}func main() {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))if err != nil {fmt.Println("connect es error", err)return}fmt.Println("conn es succ")tweet := Tweet{User: "haohan", Message: "This is a test"}_, err = client.Index().Index("twitter").Id("1").BodyJson(tweet).Do(context.Background())if err != nil {// Handle errorpanic(err)return}fmt.Println("insert succ")
}

结果

LogTransfer的运行日志在LogTransfer/logs/log_transfer.log中

logs/log_transfer.log:

2023/09/02 19:55:29.037 [D]  初始化日志模块成功
2023/09/02 19:55:29.074 [D]  初始化Kafka成功

在这里插入图片描述

4 完成LogTransfer:将日志入库到es并通过kibana展示

前面我们将LogTransfer的配置初始化成功了,下面我们将从Kafka中消费数据,然后将日志入库到es,最后通过kibana展示。

在这里插入图片描述

4.1 将日志保存到es

在LogTransfer/main/main.go中添加初始化InitEs函数

①main.go中添加InitEs函数

LogTransfer/main/main.go:

package mainimport ("github.com/astaxie/beego/logs""logtransfer.com/es"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")// 初始化Eserr = es.InitEs(logConfig.EsAddr)if err != nil {logs.Error("初始化Elasticsearch失败, err:", err)return}logs.Debug("初始化Es成功")}

运行LogTransfer下的main.go可以发现log_transfer.log中输出的日志信息
在这里插入图片描述

②LogTransfer/es/es.go

package esimport ("fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}var (esClient *elastic.Client
)func InitEs(addr string) (err error) {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))if err != nil {fmt.Println("connect es error", err)return nil}esClient = clientreturn
}

运行LogTransfer/main下的main函数

  • 可以从logs/log_transfer.log中看到打印初始化es、kafka等成功

③添加run.go:消费kafka中的数据

在main函数中添加run函数, 用于运行kafka消费数据到Es

package mainimport ("github.com/Shopify/sarama""github.com/astaxie/beego/logs"
)func run() (err error) {partitionList, err := kafkaClient.Client.Partitions(kafkaClient.Topic)if err != nil {logs.Error("Failed to get the list of partitions: ", err)return}for partition := range partitionList {pc, errRet := kafkaClient.Client.ConsumePartition(kafkaClient.Topic, int32(partition), sarama.OffsetNewest)if errRet != nil {err = errRetlogs.Error("Failed to start consumer for partition %d: %s\n", partition, err)return}defer pc.AsyncClose()kafkaClient.wg.Add(1)go func(pc sarama.PartitionConsumer) {for msg := range pc.Messages() {logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))err = es.SendToES(kafkaClient.topic, msg.Value)if err != nil {logs.Warn("send to es failed, err:%v", err)}}kafkaClient.wg.Done()}(pc)}kafkaClient.wg.Wait()return
}

④main.go中添加SendToES函数

package mainimport ("github.com/astaxie/beego/logs""logtransfer.com/es"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")// 初始化Eserr = es.InitEs(logConfig.EsAddr)if err != nil {logs.Error("初始化Elasticsearch失败, err:", err)return}logs.Debug("初始化Es成功")// 运行err = run()if err != nil {logs.Error("运行错误, err:", err)return}select {}
}

5 联调

5.1 运行LogAgent:采集数据并存储到kafka

# 用于向docker中的etcd写入对应key
docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 "[{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\",\"topic\":\"mysql_log\"},{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\",\"topic\":\"nginx_log\"}]"

通过上面的命令,用于向etcd中写入对应key,etcd的watcher监视到后会对应更新配置

在这里插入图片描述

查看LogAgent的运行日志:
在这里插入图片描述

5.2 运行LogTransfer:消费kafka数据并存到es

选中LogTransfer下main文件夹下的所有go文件,鼠标右击运行,查看控制台输出

在这里插入图片描述
查看LogTransfer的运行日志:
在这里插入图片描述

5.3 在kibana创建index并查看

Management - Stack Management - Kibana - Index Patterns ,根据kafka中的topic创建对应的索引。以nginx_log为例:

在这里插入图片描述
回到overview,根据nginx_log这个index搜索信息:
在这里插入图片描述

可以看到成功读取到日志信息,至此该项目已开发完成

参考文章:https://blog.csdn.net/qq_43442524/article/details/105072952

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/95216.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++入门】命名空间、缺省参数、函数重载、引用、内联函数

​👻内容专栏: C/C编程 🐨本文概括: C入门学习必备语法 🐼本文作者: 阿四啊 🐸发布时间:2023.9.3 前言 C是在C的基础之上,容纳进去了面向对象编程思想,并增加…

【C++技能树】继承概念与解析

Halo,这里是Ppeua。平时主要更新C,数据结构算法,Linux与ROS…感兴趣就关注我bua! 继承 0. 继承概念0.1 继承访问限定符 1. 基类和派生类对象赋值兼容转换2. 继承中的作用域3. 派生类中的默认成员函数4.友元5.继承中的静态成员6.菱…

如何使用Puppeteer进行新闻网站数据抓取和聚合

导语 Puppeteer是一个基于Node.js的库,它提供了一个高级的API来控制Chrome或Chromium浏览器。通过Puppeteer,我们可以实现各种自动化任务,如网页截图、PDF生成、表单填写、网络监控等。本文将介绍如何使用Puppeteer进行新闻网站数据抓取和聚…

Linux挖矿程序清除

1. 找到挖矿进程 2.找到病毒的文件地址 ls -l /proc/进程ID/exe3.删除文件命令 rm -rf 文件地址4.杀死挖矿进程 kill -9 进程ID

Python入门学习——Day2-控制流程

一、Python 控制流程 什么是控制流程: 在Python中,控制流程指的是根据不同的条件或规则来控制程序的执行顺序和逻辑。Python提供了多种控制流程的语句和结构,可以根据条件进行分支判断和循环迭代。 1.1 条件语句(if-elif-else&…

Kafka知识点总结

常见名词 生产者和消费者 同一个消费组下的消费者订阅同一个topic时,只能有一个消费者收到消息 要想让订阅同一个topic的消费者都能收到信息,需将它们放到不同的组中 分区机制 启动方法 生成者和消费者监听客户端

Docker consul 容器服务自动发现和更新

目录 一、什么是服务注册与发现 二、Docker-consul集群 1.Docker-consul consul提供的一些关键特性 2.registrator 3.Consul-template 三、Docker-consul实现过程 以配置nginx负载均衡为例 先配置consul-agent ,有两种模式server和client 四、Docker-cons…

自然语言处理(五):子词嵌入(fastText模型)

子词嵌入 在英语中,“helps”“helped”和“helping”等单词都是同一个词“help”的变形形式。“dog”和“dogs”之间的关系与“cat”和“cats”之间的关系相同,“boy”和“boyfriend”之间的关系与“girl”和“girlfriend”之间的关系相同。在法语和西…

WebRTC 安全之一

WebRTC 的安全需要满足三个基本需求 Authentication 用户访问需要认证Authorization 用户访问需要授权Audit 用户的访问应该可被追踪和审查 其中前两项也可以归结为 CIA Confidentiality 机密性:信息需要保密, 访问权限也需要控制Integrity 完整性&#…

解决:在宝塔站点上添加域名(8080,888等端口)显示“端口范围不合法“

在宝塔上给站点添加域名访问时,有时候需要部署站点的端口为8080或者888端口。但是添加之后显示: 解决方法 点击宝塔上的文件 切换到根目录搜索 public.py 包含子目录 选择这个: 修改其中的checkport函数: 最后,重启面…

数据库备份

数据库备份,数据库为school,素材如下 1.创建student和score表 目录 数据库备份,数据库为school,素材如下 1.创建student和score表 2.为student表和score表增加记录 3.备份数据库school到/backup目录 4.备份MySQL数据库为带…

html5——前端笔记

html 一、html51.1、理解html结构1.2、h1 - h6 (标题标签)1.3、p (段落和换行标签)1.4、br 换行标签1.5、文本格式化1.6、div 和 span 标签1.7、img 图像标签1.8、a 超链接标签1.9、table表格标签1.9.1、表格标签1.9.2、表格结构标签1.9.3、合并单元格 1.10、列表1.10.1、ul无序…