1 简介
缓存淘汰算法种类比较多,我们本次主要介绍 FIFO:
FIFO 算法的缺点也很明显,有些数据虽然被添加的比较早,但是使用频率也会很高,在这种策略下会被频繁添加进去,过一会又被淘汰掉,缓存命中率不高。
2 核心原理
- 存储元素使用队列,并且队列容量有一定限制。但是为了兼顾查询效率,需要同时使用哈希表来存储 key 和 Element 的映射。
- 为了删除的时候,能找到前后元素,队列需要维护成 双向队列。
- Set :先查找是否存在,分别执行两种操作。
- 新元素,追加在队尾。
- 已存在的元素,直接修改。
- Get:
- 按照 Key 在 Hash 表中查询出来。
- Delete:
- 先查找到元素,然后从链表以及哈希表中删除,更新缓存状态。
- Stat:获取存储的状态
3 实现
3.1 使用自带的双向链表结构
3.1.1 实体设计
3.1.2 缓存状态
缓存总的状态维护,定义一个 stat.go
type Stat struct { // key 的个数 keyCount int64 // value 使用得最大的容量 maxBytes int64 // value 已使用得最大的容量 usedBytes int64
3.1.3 核心接口
定义基础能力接口 cache.go
type Cache interface { Set(key string, value []byte) Get(key string) []byte Del(key string) Stat() Stat
3.1.4 元素封装
每个元素中,保存着 key 以及值,值统一使用 byte 数组进行存储,之所以用 []byte ,不用具体的数据结构,是因为需要抽象,适配所有的类型。
但是使用 []byte
我们取值后还需要转成 struct{}
,比较消耗 CPU,为什么不使用 interface{}
网上看到的一个答案是:内存中存储过多的 interface{} 会造成较大的 GC 影响:Getting to Go: The Journey of Go's Garbage Collector - The Go Programming Language
type entry struct { key string value []byte
3.1.5 整体实现
文件中,存储元素使用队列,并且队列容量有一定限制。但是为了兼顾查询效率,需要同时使用哈希表来存储 key 和 Element 的映射。
type fifo struct { // 为了快速找到该value cache map[string]*list.Element // 按照添加顺序保存的list head *list.List // 缓存数据 stat Stat
func New(maxBytes int64) Cache { return &fifo{ cache: map[string]*list.Element{}, head: list.New(), stat: Stat{ keyCount: 0, maxBytes: maxBytes, usedBytes: 0, }, }}
import ( "container/list"
) type fifo struct { // 为了快速找到该value cache map[string]*list.Element // 按照添加顺序保存的list head *list.List // 缓存数据 stat Stat
} func (f *fifo) Set(key string, value []byte) { if e, ok := f.cache[key]; ok && e != nil { f.setDataExisted(e, value) } else { f.setDataNotExisted(key, value) }} func (f *fifo) setDataNotExisted(key string, value []byte) { newEntry := &entry{ key: key, value: value, } if f.stat.maxBytes > 0 { for f.stat.usedBytes+int64(len(key)+len(value)) > f.stat.maxBytes { f.removeOldestElement() } } e := f.head.PushBack(newEntry) f.cache[key] = e f.stat.usedBytes = f.stat.usedBytes + int64(len(key)+len(value)) f.stat.keyCount++ } func (f *fifo) setDataExisted(e *list.Element, value []byte) { originEntry := e.Value.(*entry) originSize := len(originEntry.value) beRemoved := false if f.stat.maxBytes > 0 { for (int64)(len(value))-(int64)(originSize) > f.stat.maxBytes-f.stat.usedBytes { if f.head.Front() == e { beRemoved = true } f.removeOldestElement() } } if beRemoved { f.setDataNotExisted(originEntry.key, value) return } originEntry.value = value f.stat.usedBytes = f.stat.usedBytes + (int64)(len(value)) - (int64)(originSize) } func (f *fifo) removeOldestElement() { f.removeElement(f.head.Front())
} func (f *fifo) removeElement(e *list.Element) { if e == nil { return } // 双向链表的删除 f.head.Remove(e) originEntry := e.Value.(*entry) f.stat.keyCount-- // 重新计算使用空间 f.stat.usedBytes = f.stat.usedBytes - int64(len(originEntry.key)+len(originEntry.value)) // Hash表删除 delete(f.cache, originEntry.key)
} func (f *fifo) Get(key string) []byte { if e, ok := f.cache[key]; ok { return e.Value.(*entry).value } return nil
func (f *fifo) Del(key string) { if e, ok := f.cache[key]; ok { f.removeElement(e) }}
func (f *fifo) Stat() Stat { return f.stat
} func New(maxBytes int64) Cache { return &fifo{ cache: map[string]*list.Element{}, head: list.New(), stat: Stat{ keyCount: 0, maxBytes: maxBytes, usedBytes: 0, }, }}
测试一下,编写测试类 cache_test.go
import ( "Go-Cache/00_fifo/character01" "fmt" . "github.com/smartystreets/goconvey/convey" "testing") func TestCache(t *testing.T) { Convey("TestApplyFunc", t, func() { cache := character01.New(100) stat := fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{0 100 0}") cache.Set("hello", []byte("world")) cache.Set("hello2", []byte("world2")) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 22}") cache.Set("hello2", []byte("changeWorld2")) value := cache.Get("hello2") So(string(value), ShouldEqual, "changeWorld2") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 28}") cache.Set("k1", []byte("longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglongV1")) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 94}") cache.Set("hello2", []byte("newHelloWorld2newHelloWorld2")) value = cache.Get("hello2") So(string(value), ShouldEqual, "newHelloWorld2newHelloWorld2") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{1 100 34}") cache.Set("num", character01.IntToBytes(1)) num := cache.Get("num") So(character01.BytesToInt(num), ShouldEqual, 1) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 45}") cache.Set("num", nil) So(cache.Get("num"), ShouldEqual, nil) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 37}") cache.Del("num") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{1 100 34}") })}
3.2 增加锁
但是明显上面的代码是有问题的,当并发到一定程度,就会出现并发写的 panic:
func TestNoLock(t *testing.T) { // 包名是上面小节的包名cache := no_lock.New(100000) num := 1000 var wg sync.WaitGroup for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Set("hello"+string(rune(index)), []byte("world"+string(rune(index)))) wg.Done() //5 }() } wg.Wait() fmt.Printf("stat:%v\n", cache.Stat())
panic 的报错:
fatal error: concurrent map read and map writegoroutine 40 [running]:
runtime.throw({0x1125cae?, 0x1?})/usr/local/go/src/runtime/panic.go:992 +0x71 fp=0xc000133698 sp=0xc000133668 pc=0x1033791
runtime.mapaccess2_faststr(0x2?, 0x0?, {0xc0000960a8, 0x6})/usr/local/go/src/runtime/map_faststr.go:117 +0x3d4 fp=0xc000133700 sp=0xc000133698 pc=0x1011c74
Go-Cache/00_fifo/character01.(*fifo).Set(0xc00006e4b0, {0xc0000960a8, 0x6}, {0xc0000960a0, 0x6, 0x8})
type fifo struct { // 为了快速找到该value cache map[string]*list.Element // 按照添加顺序保存的list head *list.List // 缓存数据 stat Stat mutex sync.Mutex
在所有可能修改数据的操作之前,加上锁, 比如 set :
func (f *fifo) Set(key string, value []byte) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok && e != nil { f.setDataExisted(e, value) } else { f.setDataNotExisted(key, value) }}
import ( "container/list" "sync" "time") type fifo struct { // 为了快速找到该value cache map[string]*list.Element // 按照添加顺序保存的list head *list.List // 缓存数据 stat Stat mutex sync.Mutex
} func (f *fifo) Set(key string, value []byte) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok && e != nil { f.setDataExisted(e, value) } else { f.setDataNotExisted(key, value) }} func (f *fifo) setDataNotExisted(key string, value []byte) { newEntry := &entry{ key: key, value: value, } if f.stat.maxBytes > 0 { for f.stat.usedBytes+int64(len(key)+len(value)) > f.stat.maxBytes { f.removeOldestElement() } } e := f.head.PushBack(newEntry) f.cache[key] = e f.stat.usedBytes = f.stat.usedBytes + int64(len(key)+len(value)) f.stat.keyCount++ } func (f *fifo) setDataExisted(e *list.Element, value []byte) { originEntry := e.Value.(*entry) originSize := len(originEntry.value) beRemoved := false if f.stat.maxBytes > 0 { for (int64)(len(value))-(int64)(originSize) > f.stat.maxBytes-f.stat.usedBytes { if f.head.Front() == e { beRemoved = true } f.removeOldestElement() } } if beRemoved { f.setDataNotExisted(originEntry.key, value) return } originEntry.value = value f.stat.usedBytes = f.stat.usedBytes + (int64)(len(value)) - (int64)(originSize) } func (f *fifo) removeOldestElement() { f.removeElement(f.head.Front())
} func (f *fifo) removeElement(e *list.Element) { if e == nil { return } // 双向链表的删除 f.head.Remove(e) originEntry := e.Value.(*entry) f.stat.keyCount-- // 重新计算使用空间 f.stat.usedBytes = f.stat.usedBytes - int64(len(originEntry.key)+len(originEntry.value)) // Hash表删除 delete(f.cache, originEntry.key)
} func (f *fifo) Get(key string) []byte { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok { return e.Value.(*entry).value } return nil
func (f *fifo) Del(key string) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok { f.removeElement(e) }}
func (f *fifo) Stat() Stat { return f.stat
} func New(maxBytes int64) Cache { return &fifo{ cache: map[string]*list.Element{}, head: list.New(), stat: Stat{ keyCount: 0, maxBytes: maxBytes, usedBytes: 0, }, }}
再测试一下, 一切正常:
func TestCache(t *testing.T) { cache := character02.New(100000) num := 100 var wg sync.WaitGroup for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Set("hello"+string(rune(index)), []byte("world"+string(rune(index)))) wg.Done() //5 }() } wg.Wait() fmt.Printf("stat:%v\n", cache.Stat())
3.3 读写锁
import ( "container/list" "sync" "time") type fifo struct { // 为了快速找到该value cache map[string]*list.Element // 按照添加顺序保存的list head *list.List // 缓存数据 stat Stat mutex sync.RWMutex
} func (f *fifo) Set(key string, value []byte) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok && e != nil { f.setDataExisted(e, value) } else { f.setDataNotExisted(key, value) }} func (f *fifo) setDataNotExisted(key string, value []byte) { newEntry := &entry{ key: key, value: value, } if f.stat.maxBytes > 0 { for f.stat.usedBytes+int64(len(key)+len(value)) > f.stat.maxBytes { f.removeOldestElement() } } e := f.head.PushBack(newEntry) f.cache[key] = e f.stat.usedBytes = f.stat.usedBytes + int64(len(key)+len(value)) f.stat.keyCount++ } func (f *fifo) setDataExisted(e *list.Element, value []byte) { originEntry := e.Value.(*entry) originSize := len(originEntry.value) beRemoved := false if f.stat.maxBytes > 0 { for (int64)(len(value))-(int64)(originSize) > f.stat.maxBytes-f.stat.usedBytes { if f.head.Front() == e { beRemoved = true } f.removeOldestElement() } } if beRemoved { f.setDataNotExisted(originEntry.key, value) return } originEntry.value = value f.stat.usedBytes = f.stat.usedBytes + (int64)(len(value)) - (int64)(originSize) } func (f *fifo) removeOldestElement() { f.removeElement(f.head.Front())
} func (f *fifo) removeElement(e *list.Element) { if e == nil { return } // 双向链表的删除 f.head.Remove(e) originEntry := e.Value.(*entry) f.stat.keyCount-- // 重新计算使用空间 f.stat.usedBytes = f.stat.usedBytes - int64(len(originEntry.key)+len(originEntry.value)) // Hash表删除 delete(f.cache, originEntry.key)
} func (f *fifo) Get(key string) []byte { f.mutex.RLock() defer f.mutex.RUnlock() // time.Sleep(1000000) if e, ok := f.cache[key]; ok { return e.Value.(*entry).value } return nil
func (f *fifo) Del(key string) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok { f.removeElement(e) }}
func (f *fifo) Stat() Stat { return f.stat
} func New(maxBytes int64) Cache { return &fifo{ cache: map[string]*list.Element{}, head: list.New(), stat: Stat{ keyCount: 0, maxBytes: maxBytes, usedBytes: 0, }, }}
假设读取 Cache 是比较耗时的,我们加点耗时在 Get 操作中:
func (f *fifo) Get(key string) []byte { f.mutex.RLock() defer f.mutex.RUnlock() time.Sleep(1000000) if e, ok := f.cache[key]; ok { return e.Value.(*entry).value } return nil
- 读写锁:cost time: 4.560145ms
- 独占锁:cost time:12.012624518s
// cost time:4.560145ms
func TestRWLock(t *testing.T) { cache := character03.New(100000000000) num := 10000 var wg sync.WaitGroup for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Set("hello"+string(rune(index)), []byte("world"+string(rune(index)))) wg.Done() //5 }() } wg.Wait() startTime := time.Now() for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Get("hello" + string(rune(index))) wg.Done() }() } wg.Wait() endTime := time.Now() fmt.Printf("stat:%v\n", cache.Stat()) // cost time:31.587204813s fmt.Printf("cost time:%v\n", endTime.Sub(startTime))
} //cost time:12.012624518s
func TestLock(t *testing.T) { cache := lock.New(100000000000) num := 10000 var wg sync.WaitGroup for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Set("hello"+string(rune(index)), []byte("world"+string(rune(index)))) wg.Done() }() } wg.Wait() startTime := time.Now() for i := 0; i < num; i++ { index := i wg.Add(1) // go func() { cache.Get("hello" + string(rune(index))) wg.Done() //5 }() } wg.Wait() endTime := time.Now() fmt.Printf("stat:%v\n", cache.Stat()) //cost time:31.765169643s fmt.Printf("cost time:%v\n", endTime.Sub(startTime))
3.4 自实现双向队列
type List struct { head *Node // 表头节点 tail *Node // 表尾节点 len int // 长度
} type Node struct { next, prev *Node list *List Value any
} func NewList() *List { return &List{}
} func NewNode(v any) *Node { return &Node{ Value: v, }} func (l *List) Front() *Node { return l.head
} func (l *List) Remove(node *Node) { index := l.Find(node) if index == -1 { return } prev := node.prev next := node.next if prev == nil && next == nil { l.len = 0 l.tail = nil l.head = nil return } if prev == nil { next := node.next next.prev = nil l.head = next l.len-- return } if next == nil { prev := node.prev prev.next = nil l.tail = prev l.len-- return } prev.next = next next.prev = prev l.len-- return
} func (l *List) Find(node *Node) int { if l.len == 0 || node == nil { return -1 } index := 0 head := l.head for head != nil { if head != node { head = head.next index++ } else { return index } } return -1
} func (l *List) PushBack(v any) *Node { node := NewNode(v) if l.len == 0 { node.prev = nil node.next = nil l.head = node l.tail = node } else { tempTail := l.tail tempTail.next = node node.prev = tempTail l.tail = node } l.len++ return node
但凡使用到原生的 List 和 Node 的地方,都替换成自己实现的:
import ( "errors" "log" "sync") type fifo struct { // 为了快速找到该value cache map[string]*Node // 按照添加顺序保存的list list *List // 缓存数据 stat Stat mutex sync.RWMutex
} func (f *fifo) Set(key string, value []byte) { f.mutex.Lock() defer f.mutex.Unlock() err := f.checkMaxBytes(key, value) if err != nil { log.Fatalf("checkMaxBytes err:%v\n", err) return } if e, ok := f.cache[key]; ok && e != nil { f.setDataExisted(e, value) } else { f.setDataNotExisted(key, value) }} func (f *fifo) checkMaxBytes(key string, value []byte) error { if f.stat.maxBytes > 0 { if f.stat.maxBytes < int64(len(key)+len(value)) { return errors.New("over maxBytes") } } return nil
} func (f *fifo) setDataNotExisted(key string, value []byte) { newEntry := &entry{ key: key, value: value, } if f.stat.maxBytes > 0 { for f.stat.usedBytes+int64(len(key)+len(value)) > f.stat.maxBytes { f.removeOldestElement() } } e := f.list.PushBack(newEntry) f.cache[key] = e f.stat.usedBytes = f.stat.usedBytes + int64(len(key)+len(value)) f.stat.keyCount++ } func (f *fifo) setDataExisted(e *Node, value []byte) { originEntry := e.Value.(*entry) originSize := len(originEntry.value) beRemoved := false if f.stat.maxBytes > 0 { for (int64)(len(value))-(int64)(originSize) > f.stat.maxBytes-f.stat.usedBytes { if f.list.Front() == e { beRemoved = true } f.removeOldestElement() } } if beRemoved { f.setDataNotExisted(originEntry.key, value) return } originEntry.value = value f.stat.usedBytes = f.stat.usedBytes + (int64)(len(value)) - (int64)(originSize) } func (f *fifo) removeOldestElement() { f.removeElement(f.list.Front())
} func (f *fifo) removeElement(e *Node) { if e == nil { return } // 双向链表的删除 f.list.Remove(e) originEntry := e.Value.(*entry) f.stat.keyCount-- // 重新计算使用空间 f.stat.usedBytes = f.stat.usedBytes - int64(len(originEntry.key)+len(originEntry.value)) // Hash表删除 delete(f.cache, originEntry.key)
} func (f *fifo) Get(key string) []byte { f.mutex.RLock() defer f.mutex.RUnlock() // time.Sleep(1000000) if e, ok := f.cache[key]; ok { return e.Value.(*entry).value } return nil
func (f *fifo) Del(key string) { f.mutex.Lock() defer f.mutex.Unlock() if e, ok := f.cache[key]; ok { f.removeElement(e) }}
func (f *fifo) Stat() Stat { return f.stat
} func New(maxBytes int64) Cache { return &fifo{ cache: map[string]*Node{}, list: NewList(), stat: Stat{ keyCount: 0, maxBytes: maxBytes, usedBytes: 0, }, }}
func TestMyQueue(t *testing.T) { Convey("TestApplyFunc", t, func() { cache := character04.New(100) stat := fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{0 100 0}") cache.Set("hello", []byte("world")) cache.Set("hello2", []byte("world2")) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 22}") cache.Set("hello2", []byte("changeWorld2")) value := cache.Get("hello2") So(string(value), ShouldEqual, "changeWorld2") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 28}") cache.Set("k1", []byte("longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglongV1")) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 94}") cache.Set("hello2", []byte("newHelloWorld2newHelloWorld2")) value = cache.Get("hello2") So(string(value), ShouldEqual, "newHelloWorld2newHelloWorld2") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{1 100 34}") cache.Set("num", character01.IntToBytes(1)) num := cache.Get("num") So(character01.BytesToInt(num), ShouldEqual, 1) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 45}") cache.Set("num", nil) So(cache.Get("num"), ShouldEqual, nil) stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{2 100 37}") cache.Del("num") stat = fmt.Sprintf("stat:%v", cache.Stat()) So(stat, ShouldEqual, "stat:{1 100 34}") })}
3.5 http 改造
一般缓存集成启动后,除了能使用代码直接操作,还会支持以 http 的方式操作,我们的也不能少了这个能力,那就来改造一下吧!
先封装一个 server
, 监听 12345 端口,操作 Cache 和查询状态分别对应 cacheHandler
和 statHandler
import ( "Go-Cache/00_fifo/character05" "net/http") type Server struct { cache character05.Cache
} func New(c character05.Cache) *Server { return &Server{ c, }} func (s *Server) Listen() { // 注意是 /cache/ 包括下面的子页面 http.Handle("/cache/", s.cacheHandler()) http.Handle("/stat", s.statHandler()) http.ListenAndServe(":12345", nil)
} func (s *Server) cacheHandler() *cacheHandler { return &cacheHandler{ Server: s, }} func (s *Server) statHandler() *statHandler { return &statHandler{ Server: s, }}
状态查询的 handler 实现:
import ( "encoding/json" "log" "net/http") type statHandler struct { *Server
} func (h *statHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } b, e := json.Marshal(h.cache.Stat()) if e != nil { w.WriteHeader(http.StatusInternalServerError) return } res, err := w.Write(b) if err != nil { log.Fatalln(err) } log.Printf(string(rune(res)))
操作缓存 handler 的实现:
import ( "io/ioutil" "net/http" "strings") type cacheHandler struct { *Server
} func (h *cacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := strings.Split(r.URL.EscapedPath(), "/")[2] if len(key) == 0 { w.WriteHeader(http.StatusBadRequest) return } m := r.Method if m == http.MethodGet { b := h.cache.Get(key) if len(b) == 0 { w.WriteHeader(http.StatusNotFound) return } w.Write(b) return } if m == http.MethodPut { b, _ := ioutil.ReadAll(r.Body) if len(b) != 0 { h.cache.Set(key, b) } return } if m == http.MethodDelete { h.cache.Del(key) return } w.WriteHeader(http.StatusMethodNotAllowed)
import ( "Go-Cache/00_fifo/character05" "Go-Cache/00_fifo/character05/server") func main() { c := character05.New(0) server.New(c).Listen()
启动之后,我们打开 http://localhost:12345/stat
, 可以看到以下内容,就是启动成功了:
下面是 curl 命令行操作:
~ curl -X PUT -d "world" http://localhost:12345/cache/hello~ curl http://localhost:12345/stat{"KeyCount":1,"MaxBytes":0,"UsedBytes":10}%~ curl http://localhost:12345/cache/helloworld%~ curl -X PUT -d "world2" http://localhost:12345/cache/hello~ curl http://localhost:12345/cache/helloworld2%~ curl -X PUT -d "smart" http://localhost:12345/cache/xiaoming~ curl http://localhost:12345/cache/xiaoming
4 小结一下
总的来说,FIFO 是极其简单的缓存淘汰算法,思路很清晰,初学者可以试试,它并不高效,但是它是最简单粗暴的处理方法,先进先出,不会区别对待。