go读写文件总结

别人的经验:

如今任何计算机系统每天都会产生大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,主要用于分析和解决故障的目的。因此,企业倾向于将其存储在文件中,并保存在本地磁盘中。

我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,该文件有数百万行。

直接上代码,首先打开文件,将使用标准Go os.File来读取文件IO:

f, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦打开文件后,我们有以下两个选择进行下一步处理。
1、逐行读取文件,这有助于减少系统内存压力,但会在IO中花费更多时间。
2、一次性将整个文件都读入内存并处理该文件,这会消耗很大内存,但会节约时间。
由于文件太大例如16GB,无法将整个文件加载到内存中。但是第一个选择也不可行,因为我们希望几秒钟内处理该文件。

但是你猜怎么着,还有第三种选择。不是加载整个文件到内存,而是使用bufio.NewReader()以块的形式加载文件。

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into bufferbuf = buf[:n]
if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}
}

一旦我们读取到文件块,我们将fork一个线程,也就是Go协程,来同时处理每个块。上面的代码将改为如下:

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {lines := make([]byte, 500*1024)return lines
}}
stringPool := sync.Pool{New: func() interface{} {lines := ""return lines
}}
slicePool := sync.Pool{New: func() interface{} {lines := make([]string, 100)return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {buf := linesPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]
if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}
nextUntillNewline, err := r.ReadBytes('\n')//read entire lineif err != io.EOF {buf = append(buf, nextUntillNewline...)}wg.Add(1)go func() { //process each chunk concurrently//start -> log start time, end -> log end timeProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()}()
}
wg.Wait()
}

上面的代码引入了两个新的优化:
1、sync.Pool是一个强大的实例池,可以重用实例来减少垃圾收集器的压力。我们将重用分配的内存片。这将减少内存消耗,使代码优化更高效。
2、Go协程并发处理缓冲区块,大大提高了处理速度。
下面来实现ProcessChunk函数,处理如下格式的日志信息:

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志:

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logslogsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow noOfThread++}
length := len(logsSlice)
//traverse the chunkfor i := 0; i < length; i += chunkSize {wg2.Add(1)
//process each chunk in saperate chunkgo func(s int, e int) {for i:= s; i<e;i++{text := logsSlice[i]
if len(text) == 0 {continue}logParts := strings.SplitN(text, ",", 2)logCreationTimeString := logParts[0]logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)return}
// check if log's timestamp is inbetween our desired periodif logCreationTime.After(start) && logCreationTime.Before(end) {fmt.Println(text)}}textSlice = nilwg2.Done()}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))//passing the indexes for processing
}  wg2.Wait() //wait for a chunk to finishlogsSlice = nil
}

上面的代码使用16GB的日志文件进行基础测试,提取日志所花费的时间约25秒。
下面是完成代码:

func main() {s := time.Now()args := os.Args[1:]if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"fmt.Println("Please give proper command line arguments")return}startTimeArg := args[1]finishTimeArg := args[3]fileName := args[5]file, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}defer file.Close() //close after checking errqueryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)if err != nil {fmt.Println("Could not able to parse the start time", startTimeArg)return}queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)if err != nil {fmt.Println("Could not able to parse the finish time", finishTimeArg)return}filestat, err := file.Stat()if err != nil {fmt.Println("Could not able to get the file stat")return}fileSize := filestat.Size()offset := fileSize - 1lastLineSize := 0for {b := make([]byte, 1)n, err := file.ReadAt(b, offset)if err != nil {fmt.Println("Error reading file ", err)break}char := string(b[0])if char == "\n" {break}offset--lastLineSize += n}lastLine := make([]byte, lastLineSize)_, err = file.ReadAt(lastLine, offset+1)if err != nil {fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)return}logSlice := strings.SplitN(string(lastLine), ",", 2)logCreationTimeString := logSlice[0]lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)if err != nil {fmt.Println("can not able to parse time : ", err)}if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {Process(file, queryStartTime, queryFinishTime)}fmt.Println("\nTime taken - ", time.Since(s))
}func Process(f *os.File, start time.Time, end time.Time) error {linesPool := sync.Pool{New: func() interface{} {lines := make([]byte, 250*1024)return lines}}stringPool := sync.Pool{New: func() interface{} {lines := ""return lines}}r := bufio.NewReader(f)var wg sync.WaitGroupfor {buf := linesPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}nextUntillNewline, err := r.ReadBytes('\n')if err != io.EOF {buf = append(buf, nextUntillNewline...)}wg.Add(1)go func() {ProcessChunk(buf, &linesPool, &stringPool, start, end)wg.Done()}()}wg.Wait()return nil
}func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {var wg2 sync.WaitGrouplogs := stringPool.Get().(string)logs = string(chunk)linesPool.Put(chunk)logsSlice := strings.Split(logs, "\n")stringPool.Put(logs)chunkSize := 300n := len(logsSlice)noOfThread := n / chunkSizeif n%chunkSize != 0 {noOfThread++}for i := 0; i < (noOfThread); i++ {wg2.Add(1)go func(s int, e int) {defer wg2.Done() //to avaoid deadlocksfor i := s; i < e; i++ {text := logsSlice[i]if len(text) == 0 {continue}logSlice := strings.SplitN(text, ",", 2)logCreationTimeString := logSlice[0]logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)if err != nil {fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)return}if logCreationTime.After(start) && logCreationTime.Before(end) {//fmt.Println(text)}}}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))}wg2.Wait()logsSlice = nil
}

自己实践

server/server.go

package mainimport ("bufio""fmt""net""os""sync"//"time"
)func SendFile(con net.Conn,fileName string){file, err := os.Open(fileName)if err != nil {fmt.Println("cannot able to read the file", err)return}defer file.Close() filestat, err := file.Stat()if err != nil {fmt.Println("Could not able to get the file stat")return}fileSize := filestat.Size()bufPool := sync.Pool{New: func() interface{} {buf := make([]byte, 100*1024*1024) //100M per block to memreturn buf}}r := bufio.NewReader(file)var wg sync.WaitGroupblock := 0var sendSize int = 0for {buf := bufPool.Get().([]byte)n, err := r.Read(buf)buf = buf[:n]if err != nil {fmt.Println(err)break}// nextUntillNewline, err := r.ReadBytes('\n')// if err != io.EOF {//     buf = append(buf, nextUntillNewline...)// }wg.Add(1) //just test wg not multi goroutinego func() {defer wg.Done()n1, err := con.Write(buf)if err != nil{fmt.Println(err)}else if n1 != n{fmt.Printf("send len n1 =%d != n %d\n",n1,n)}return}()wg.Wait()block++sendSize += nfmt.Printf("send block num = %d\n",block)}fmt.Printf("file size = %d ,send size = %d\n",fileSize,sendSize)}//处理客户端连接请求
func process(coon net.Conn) {defer coon.Close()//定义接收信息的字节数组var buf [1024]byte//读取数据n, err := coon.Read(buf[:])if err != nil {fmt.Println("获取信息失败,err:", err)return}filename := string(buf[:n])fmt.Printf("对方获取文件是:%s", filename)SendFile(coon, filename)fmt.Printf("发送文件文件完毕:%s", filename)
}//TCP服务端配置
func main() {//1:启用监听listener, err := net.Listen("tcp", "127.0.0.1:20000")//连接失败处理if err != nil {fmt.Println("启动服务失败,err:", err)return}//程序退出时释放端口defer listener.Close()for {conn, err := listener.Accept() //2.建立连接if err != nil {fmt.Println("接收客户连接失败,err:", err)continue}//3.启动一个人goroutine处理客户端连接go process(conn)}
}

client/client.go

package mainimport ("fmt""net""os""bufio""sync""time"
)//TCP客户端
func main() {args := os.Args[1:]if len(args) != 1 { // for format  clien fileNamefmt.Println("Please give proper command line arguments")return}fileName := args[0]f, err := os.Create(fileName) //创建文件if err != nil{fmt.Println("create file fail" + fileName)}defer f.Close()//1:拨号方式建立与服务端连接conn, err := net.Dial("tcp", "127.0.0.1:20000")if err != nil {fmt.Println("连接服务端失败,err:", err)return}//注意:关闭连接位置,不能写在连接失败判断上面defer conn.Close()//2:向服务器发送信息_, err = conn.Write([]byte(fileName))if err != nil {fmt.Println("发送信息失败,err:", err)return}s := time.Now()bufPool := sync.Pool{New: func() interface{} {buf := make([]byte, 100*1024*1024) //100M per block to memreturn buf}}w := bufio.NewWriterSize(f,100*1024*1024)block := 0var reveiveSize int = 0for {buf := bufPool.Get().([]byte)n, err := conn.Read(buf)if err != nil {fmt.Println("获取结束", err)break}buf = buf[:n]wn , err := w.Write(buf)if err != nil{fmt.Println(err)break}else if wn != n{fmt.Printf("write len wn =%d != n %d\n",wn,n)break}block++reveiveSize += nfmt.Printf("reveive block num = %d,size = %d\n",block,reveiveSize)}w.Flush()spend := time.Since(s)fmt.Printf("receive end size = %d \n",reveiveSize)fmt.Println("time spend ",spend)
}

测试2.4G本地go tcp 发送接收花费17秒

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/3980.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用conda虚拟环境,Jupyter Notebook 链接不上 kernel

1&#xff0c;检查 ipykernel 和 ipython 是否一致 输入pip list 或者conda list检查一下相应库的版本是不一致 不一致的话&#xff0c;可以更新这两个库的版本&#xff1a;pip install --upgrade 库名 2&#xff0c;看控制台的报错&#xff0c;如果是报404&#xff0c;内核找不…

【Linux】软硬链接与动静态库

系列文章 收录于【Linux】文件系统 专栏 关于文件描述符与文件重定向的相关内容可以移步 文件描述符与重定向操作。 可以到 浅谈文件原理与操作 了解文件操作的系统接口。 想进一步理解文件系统还可以看看文件缓冲区和文件系统。 目录 系列文章 软硬链接 软链接 硬链接…

vue(脚手架创建)代理解决跨域问题

目录 为什么会出现跨域问题 什么是跨域 Vue CLI Vue2解决跨域问题 不重写路径 重写路径 vue.config.js代码 Vue3解决跨域问题 ViteVue解决跨域问题 vite.config.ts代码 总结 为什么会出现跨域问题 出于浏览器的同源策略的限制。同源策略是一种约定&#xff0c;它是…

Linux网络环境配置

第一种方式&#xff08;自动获取&#xff09;&#xff1a; 说明&#xff1a;登陆后&#xff0c;通过界面的来设置自动获取IP 特点&#xff1a;Linux启动后会自动获取IP 缺点&#xff1a;是每次自动获取的IP地址可能不一样 第二种方法&#xff08;指定IP)&#xff1a; 1、说明…

科技资讯|2023Q1中国电动汽车销量增长 29%,充电桩市场持续增长

根据市场调查机构公布的 2023 年第 1 季度中国国内电动汽车市场报告&#xff0c;比亚迪继续引领竞争日益激烈的电动汽车市场。 报告称 2023 年第 1 季度中国乘用电动汽车销量同比增长 29%&#xff0c;其中纯电动汽车&#xff08;BEV&#xff09;占销售额的近 70%、插电式混合…

Java——《面试题——网络篇》

前文 java——《面试题——基础篇》 Java——《面试题——JVM篇》 Java——《面试题——多线程&并发篇》 Java——《面试题——Spring篇》 Java——《面试题——SpringBoot篇》 Java——《面试题——MySQL篇》​​​​​​ Java——《面试题——SpringCloud》 Java…

Python笔记-1

Python安装问题 1.python是一门解释性的计算机程序语言。 2.IDLE就是我们写Python程序的地方&#xff08;小型的集成开发环境&#xff0c;编辑器&#xff09;。 3.Pycharm是一个大型的集成开发环境&#xff08;IDLE的扩展&#xff0c;不仅可以写&#xff0c;还能管理、调试&am…

基于PyQt5的桌面图像调试仿真平台开发(1)环境搭建

系列文章目录 基于PyQt5的桌面图像调试仿真平台开发(1)环境搭建 基于PyQt5的桌面图像调试仿真平台开发(2)UI设计和控件绑定 基于PyQt5的桌面图像调试仿真平台开发(3)黑电平处理 基于PyQt5的桌面图像调试仿真平台开发(4)白平衡处理 基于PyQt5的桌面图像调试仿真平台开发(5)…

HCIP(HCIA回顾)

OSI/RM 七层 应用层 表示层 会话层 传输层 区分不同的流量&#xff0c;定义传输方式。 端口号由16位二进制构成&#xff0c;范围为0~65535(其中0不作为传输层的端口使用)&#xff0c;所以真实取值范围为1~65535&#xff1b;其中&#xff0c;1~1023称为知名端口号。 1、可靠…

【Docker】Docker的优势、与虚拟机技术的区别、三个重要概念和架构及工作原理的详细讲解

前言 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux或Windows操作系统的机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。 &#x1f4d5;作者简介&#xff1a;热…

Ubuntu查看显卡信息

查看显卡信息&#xff0c;终端输入 lspci | grep VGA 输出结果 0000:65:00.0 VGA compatible controller: NVIDIA Corporation Device 24b0 (rev a1) 发现是十六进制码&#xff0c;进入网址PCI Devices查询&#xff0c;输入 24b0 并点击 Jump&#xff0c;得到结果 显卡型号…

软考A计划-系统集成项目管理工程师-项目整体管理-中

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…