背景
对于一个文件服务器来说,对于同一文件,应该只保存一份在服务器上。基于这个原则,引发出本篇内容。
本篇仅阐述文件服务器在同一时间接收同一文件的并发问题,这种对于小体量的服务来说并不常见,但是最好还是要留意一下这种极端情况。
实现原理
常见的流程:数据库记录文件的基本属性:文件名、大小、哈希值、文件路径等,以哈希值作为唯一标志。当用户新上传文件时,先查询数据库,若已存在哈希值(客户端计算并传给服务端,客户端最常见的 spark-md5)相同的记录,则不保存文件,直接标记为上传成功,使用已存在的文件副本,即通常所说的秒传实现。
上述流程缺失的就是当数据库中不存在的文件,同一时间上传了多个相同文件时,如果不做处理,服务器上是会存在多个该文件副本。所以当一个用户上传文件时,可以将文件标记为锁定状态,其他用户若上传同一文件,需查看文件的锁定状态,待锁定解除后才能进行操作。
代码实现
文字表现力有点差,还是上代码吧!本例中的服务端使用的是 go
的 gin
框架,仅简单模拟了同一文件并发上传的情况。
文件目录结构
- go.mod- go.sum- hash_cache.go- main.go- spark-md5-min.js- upload.html
js 客户端
upload.html
选择文件后,可重复点击上传按钮测试并发,或者自己改下脚本。
<!DOCTYPE html>
<html>
<head><title>文件上传</title><script src="spark-md5.min.js"></script>
</head>
<body>
<h1>文件上传</h1>
<input id="file" type="file" name="file"/>
<button onclick="upload();">上传</button><script>var file_md5 = {};function upload() {if (!file_md5.md5) {alert("请先选择文件");return}var form = new FormData();form.append("md5", file_md5.md5);form.append("file", file_md5.file);var xhr = new XMLHttpRequest();var action = "/upload"; // 上传服务的接口地址xhr.open("POST", action);xhr.send(form); // 发送表单数据xhr.onreadystatechange = function () {if (xhr.readyState == 4 && xhr.status == 200) {var resultObj = JSON.parse(xhr.responseText);// 处理返回的数据......console.log(resultObj)}}}document.getElementById('file').addEventListener('change', function (event) {var blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice,file = this.files[0],chunkSize = 2097152, // Read in chunks of 2MBchunks = Math.ceil(file.size / chunkSize),currentChunk = 0,spark = new SparkMD5.ArrayBuffer(),fileReader = new FileReader();fileReader.onload = function (e) {console.log('read chunk nr', currentChunk + 1, 'of', chunks);spark.append(e.target.result); // Append array buffercurrentChunk++;if (currentChunk < chunks) {loadNext();} else {console.log('finished loading');var md5=spark.end()console.info('computed hash', md5); // Compute hashfile_md5 = {file:file,md5:md5}}};fileReader.onerror = function () {console.warn('oops, something went wrong.');};function loadNext() {var start = currentChunk * chunkSize,end = ((start + chunkSize) >= file.size) ? file.size : start + chunkSize;fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));}loadNext();});
</script>
</body>
</html>
go gin 服务端
main.go
以 4780
为端口开放了一个 http
服务,可访问 http://127.0.0.1:4780/client/upload.html
来访问 html
页面。
为了模拟并发场景,服务端 /upload
接口中故意让其睡眠了 30s
。
package mainimport ("crypto/md5""embed""encoding/hex""errors""fmt""github.com/gin-gonic/gin""io""net/http""os""path/filepath""runtime""time"
)//go:embed upload.html spark-md5.min.js
var client embed.FSvar hashCache = NewHashCache()func main() {engine := gin.New()engine.StaticFS("/client", http.FS(client))engine.POST("/upload", doUpload)engine.Run(":4780")
}func doUpload(c *gin.Context) {printMem("start")clientMd5 := c.PostForm("md5")// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断if hashCache.Has(clientMd5) {info, er := hashCache.Wait(clientMd5)if er != nil {c.String(http.StatusInternalServerError, er.Error())return}if info.Err == nil {c.String(http.StatusOK, "上传成功: "+info.SavedPath)return}// 若是出错了,则继续接收}hashCache.Set(clientMd5)// 模拟并发,这里睡一下time.Sleep(time.Second * 30)savedPath, err := doSaveFile(c, clientMd5)if err != nil {hashCache.SetDone(clientMd5, "", err)c.String(http.StatusInternalServerError, err.Error())return}hashCache.SetDone(clientMd5, savedPath, nil)c.String(http.StatusOK, "上传成功: "+savedPath)
} func doSaveFile(c *gin.Context, clientMd5 string) (savedPath string, err error) {fh, err := c.FormFile("file")if err != nil {return}fn := fmt.Sprintf("%s_%d", fh.Filename, time.Now().UnixMilli())savedPath = filepath.Join("uploaded", fn)err = c.SaveUploadedFile(fh, savedPath)if err != nil {return}md5Str, err := getFileMd5(savedPath)if err != nil {return}if clientMd5 != md5Str {os.Remove(savedPath)err = errors.New("哈希不匹配")return}return
}func getFileMd5(p string) (md5Str string, err error) {f, err := os.Open(p)if err != nil {return}defer f.Close()h := md5.New()_, err = io.Copy(h, f)if err != nil {return}md5Str = hex.EncodeToString(h.Sum(nil))return
}func printMem(prefix string) {var m runtime.MemStatsruntime.ReadMemStats(&m)fmt.Printf("%s: %d Kb\n", prefix, m.Alloc/1024)
}
hash_cache.go
维护了一个 map
用于判断是否有相同的文件上传。
package mainimport ("errors""sync"
)type HashCache struct {mutex sync.RWMutexm map[string]*HashCacheInfo
}func NewHashCache() *HashCache {return &HashCache{m: make(map[string]*HashCacheInfo),}
}type HashCacheInfo struct {Done chan struct{}SavedPath stringErr error
}func (this *HashCache) Set(md5Hash string) {this.mutex.Lock()defer this.mutex.Unlock()this.m[md5Hash] = &HashCacheInfo{Done: make(chan struct{}),}
}func (this *HashCache) SetDone(md5Hash, savedPath string, err error) error {this.mutex.Lock()defer this.mutex.Unlock()data, ok := this.m[md5Hash]if !ok {return errors.New("no hash: " + md5Hash)}data.SavedPath = savedPathdata.Err = errclose(data.Done)delete(this.m, md5Hash)//这里的 data 不能直接释放,wait 那里需要用,垃圾收集器自己去回收吧return nil
}func (this *HashCache) Has(md5Hash string) bool {this.mutex.RLock()defer this.mutex.RUnlock()_, has := this.m[md5Hash]return has
}func (this *HashCache) Wait(md5Hash string) (info HashCacheInfo, err error) {this.mutex.RLock()data, ok := this.m[md5Hash]if !ok {this.mutex.RUnlock()err = errors.New("no hash: " + md5Hash)return}this.mutex.RUnlock()<-data.Doneinfo = *datareturn
} 维护了一个 map 用于
服务端日志输出,可见每多提交一次请求,内存占用就会增加。
在此服务中,若是同一时间上传了大量相同的文件,会导致内存占用飙升(c.PostForm
解析 formdata
数据时,会将数据读入内存)。如果要解决该问题,需要自己去做数据的读取,如下:
doUpload1
方法的更改
package mainimport ("crypto/md5""embed""encoding/hex""errors""fmt""github.com/gin-gonic/gin""io""mime/multipart""net/http""os""path/filepath""runtime""time"
)//go:embed upload.html spark-md5.min.js
var client embed.FSvar hashCache = NewHashCache()func main() {engine := gin.New()engine.StaticFS("/client", http.FS(client))engine.POST("/upload", doUpload1)engine.Run(":4780")
}func doUpload(c *gin.Context) {printMem("start")clientMd5 := c.PostForm("md5")// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断if hashCache.Has(clientMd5) {info, er := hashCache.Wait(clientMd5)if er != nil {c.String(http.StatusInternalServerError, er.Error())return}if info.Err == nil {c.String(http.StatusOK, "上传成功: "+info.SavedPath)return}// 若是出错了,则继续接收}hashCache.Set(clientMd5)// 模拟并发,这里睡一下//time.Sleep(time.Second * 30)savedPath, err := doSaveFile(c, clientMd5)if err != nil {hashCache.SetDone(clientMd5, "", err)c.String(http.StatusInternalServerError, err.Error())return}hashCache.SetDone(clientMd5, savedPath, nil)c.String(http.StatusOK, "上传成功: "+savedPath)
}func doSaveFile(c *gin.Context, clientMd5 string) (savedPath string, err error) {fh, err := c.FormFile("file")if err != nil {return}fn := fmt.Sprintf("%s_%d", fh.Filename, time.Now().UnixMilli())savedPath = filepath.Join("uploaded", fn)err = c.SaveUploadedFile(fh, savedPath)if err != nil {return}md5Str, err := getFileMd5(savedPath)if err != nil {return}if clientMd5 != md5Str {os.Remove(savedPath)err = errors.New("哈希不匹配")return}return
}func getFileMd5(p string) (md5Str string, err error) {f, err := os.Open(p)if err != nil {return}defer f.Close()h := md5.New()_, err = io.Copy(h, f)if err != nil {return}md5Str = hex.EncodeToString(h.Sum(nil))return
}func printMem(prefix string) {var m runtime.MemStatsruntime.ReadMemStats(&m)fmt.Printf("%s: %d Kb\n", prefix, m.Alloc/1024)
}func doUpload1(c *gin.Context) {printMem("start")reader, err := c.Request.MultipartReader()if err != nil {c.String(http.StatusBadRequest, err.Error())return}clientMd5, err := readMd5(reader) // 读 md5if err != nil {c.String(http.StatusBadRequest, err.Error())return}// 查询是否有其他正在上传,若有,则等待其上传完毕,根据返回值来做判断if hashCache.Has(clientMd5) {info, er := hashCache.Wait(clientMd5)if er != nil {c.String(http.StatusInternalServerError, er.Error())return}if info.Err == nil {er = closeReaderParts(reader)if er != nil {c.String(http.StatusInternalServerError, er.Error())} else {c.String(http.StatusOK, "上传成功: "+info.SavedPath)}return}}hashCache.Set(clientMd5)// 模拟并发,这里睡一下time.Sleep(time.Second * 30)savedPath, err := saveFilePart(reader, clientMd5)hashCache.SetDone(clientMd5, savedPath, err)if err != nil {c.String(http.StatusInternalServerError, err.Error())return}c.String(http.StatusOK, "上传成功: "+savedPath)
}func readMd5(reader *multipart.Reader) (md5Hash string, err error) {part, err := reader.NextPart() // 读 md5if err != nil {return}name := part.FormName()if name != "md5" {err = errors.New("first key is not match")return}buf, err := io.ReadAll(part)if err != nil {return}md5Hash = string(buf)return
}func closeReaderParts(reader *multipart.Reader) (err error) {for {p, er := reader.NextPart()if er == io.EOF {break}if er != nil {err = erreturn}p.Close()}return
}func saveFilePart(reader *multipart.Reader, clientMd5 string) (fp string, err error) {part, err := reader.NextPart() // 读 fileif err != nil {return}name := part.FormName()if name != "file" {err = errors.New("key not match")return}fn := fmt.Sprintf("%s_%d", part.FileName(), time.Now().UnixMilli())fp = filepath.Join("uploaded", fn)f, err := os.Create(fp)if err != nil {return}defer f.Close()_, err = io.Copy(f, part)if err != nil {return}md5Str, err := getFileMd5(fp)if err != nil {return}if clientMd5 != md5Str {os.Remove(fp)err = errors.New("哈希不匹配")return}returnreturn
}
服务端日志输出,可见内存已不像之前消耗的多了。
这里需要注意:客户端在对 formdata
中添加数据时,需要将 md5
放在第一位,不然逻辑会出错。还有一点就是服务端若是不将请求的 body
数据读完(closeReaderParts
就是做这个的),直接将 api
返回,也会导致 js
客户端请求出错(目前只是在上传文件比较大时碰到过,但是我用 go
的客户端测试是不会有问题的,应该是浏览器实现原因,有知晓的小伙伴可以评论留言)。
其他的实现方式,也可以将 md5
放在请求 url
中(http://127.0.0.1:4780/client/upload?md5=xxx
),然后做匹配(这里也像上述一样,如果请求的 body
不读完,客户端会报错)。
总结
本篇只是给个思路,抛砖引玉,介绍了如何实现客户端和服务器端的并发上传控制。通过示例代码,能够确保在并发上传时服务器中只存在一份文件副本。
在实际的生产环境中,可能需要进一步优化和增强这些代码,以满足性能、安全性和可靠性方面的需求。