[MIT6.824] Lab 3: Fault-tolerant Key/Value Service

[MIT6.824] Lab 3: Fault-tolerant Key/Value Service

目标

通过在Lab2中实现的Raft库,构建一个可容灾的KV数据库。

需要实现的服务有三种操作:

  • Put(key, value) key和value都是string,put设置指定key的value.

  • Append(key, arg) 将arg append到key对应的value。上,如果没有该key,相当于put操作。

  • Get(key) 返回值,如果没有该key,返回空字符串。

strong consistency

If called one at a time, the Get/Put/Append methods should act as if the system had only one copy of its state, and each call should observe the modifications to the state implied by the preceding sequence of calls.

需要实现linearizability.

实验介绍:

This lab has two parts. In part A, you will implement the service without worrying that the Raft log can grow without bound. In part B, you will implement snapshots

在A中实现基本功能, B中实现快照。

We supply you with skeleton code and tests in src/kvraft. You will need to modify kvraft/client.go, kvraft/server.go, and perhaps kvraft/common.go.

Part A: Key/value service without snapshots

Each of your key/value servers (“kvservers”) will have an associated Raft peer.

每个kv服务器对应一个raft peer。

Clerks send Put(), Append(), and Get() RPCs to the kvserver whose associated Raft is the leader.

clerk 将请求发送给raft leader之上的kvserver。然后kvserver将operation的log交给raft leader(通过之前的start)。

All of the kvservers execute operations from the Raft log in order, applying the operations to their key/value databases

回想之前的applymsg,kvserver通过raft,获取apply的log,并应用到kv数据库中。

A Clerk sometimes doesn’t know which kvserver is the Raft leader. If the Clerk sends an RPC to the wrong kvserver, or if it cannot reach the kvserver, the Clerk should re-try by sending to a different kvserver.

Clerk可能判断错误谁是raft的leader,如果发错了rpc,应当充实。

If the key/value service commits the operation to its Raft log (and hence applies the operation to the key/value state machine), the leader reports the result to the Clerk by responding to its RPC.

需要commit才能返回clerk的RPC.

If the operation failed to commit (for example, if the leader was replaced), the server reports an error, and the Clerk retries with a different server.

如果没有commit,应当report error(我认为此处是RPC携带错误信息)。

Your kvservers should not directly communicate; they should only interact with each other through Raft.

kvserver之前不应当直接交流,而是通过Raft库。(类似于网络分层)。

You’ll need to add RPC-sending code to the Clerk Put/Append/Get methods in client.go, and implement PutAppend() and Get() RPC handlers in server.go. These handlers should enter an Op in the Raft log using Start();

server.go中实现RPC的handler。

you should fill in the Op struct definition in server.go so that it describes a Put/Append/Get operation. Each server should execute Op commands as Raft commits them, i.e. as they appear on the applyCh. An RPC handler should notice when Raft commits its Op, and then reply to the RPC.

当raft提交op时,返回给RPC。

Client

You have completed this task when you reliably pass the first test in the test suite: “One client”.

在没有网络问题的情况下,首先开始实现只有一个客户端的测试了=。

首先看测试用例

func TestBasic3A(t *testing.T) {// Test: one client (3A) ...GenericTest(t, "3A", 1, 5, false, false, false, -1, false)
}		

开启一个客户端,5个服务器,没有RPC fail的情况,服务器不会崩溃,没有网络分区,不需要快照。

在测试里面,通过AppendGet(),Put()函数调用clerk的对应函数,并记录相应的log.

func Get(cfg *config, ck *Clerk, key string, log *OpLog, cli int) string {start := time.Now().UnixNano()v := ck.Get(key)end := time.Now().UnixNano()cfg.op()if log != nil {log.Append(porcupine.Operation{Input:    models.KvInput{Op: 0, Key: key},Output:   models.KvOutput{Value: v},Call:     start,Return:   end,ClientId: cli,})}return v
}

那么首先实现client.go/Clerk中的对应接口。

func (ck *Clerk) Get(key string) string {// You will have to modify this function.commandId := ck.lastAppliedCommandId + 1args := GetArgs{Key:       key,ClientId:  ck.clientId,CommandId: commandId,}serverId := ck.lastFoundLeaderserverNum := len(ck.servers)for ; ; serverId = (serverId + 1) % serverNum {var reply GetReplyok := ck.servers[serverId].Call("KVServer.Get", &args, &reply)if !ok || reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {continue}ck.lastFoundLeader = serverIdck.lastAppliedCommandId = commandIdif reply.Err == ErrNoKey {return ""}return reply.Value}
}

因为不确定谁是leader,这里采用了循环的方式。如果响应超时,或者不是leader,尝试下一个服务器。

clerk调用了"KVServer.Get",接下来应当实现KVServer.Get

KVServer

type Op struct {// Your definitions here.// Field names must start with capital letters,// otherwise RPC will break.OpType    stringKey       stringValue     stringClientId  int64CommandId int
}

这里的Op用于交给Raft的log。

type ApplyResult struct {ErrMsg ErrValue  stringTerm   int
}

ApplyResult是KV层提交所返回的消息。term用于判断该msg是否有效。

type CommandContext struct {CommandId intReply     ApplyResult
}

CommandContext中,CommandId指示返回消息的指令序列号。

在KVServer中,先添加了三个字段

type KVServer struct {mu sync.Mutexme intrf *raft.RaftapplyCh chan raft.ApplyMsgdead    int32 // set by Kill()maxraftstate int // snapshot if log grows this big// Your definitions here.clientReply map[int64]CommandContextreplyChMap  map[int]chan ApplyResult // log的index对应的返回信息。lastApplied int
}

其中,clientReply存储每个client上一个已经执行完成操作的结果。

replyChMap的key是操作的index,对应一个接受消息的通道。

lastApplied是当前服务器的数据库上最后一次执行命令的index。

这里先判断Get Rpc中的Command是否已经被执行过。

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {if clientReply, ok := kv.clientReply[args.ClientId]; ok {if args.CommandId == clientReply.CommandId {reply.Value = clientReply.Reply.Valuereply.Err = clientReply.Reply.ErrMsgreturn} else if args.CommandId < clientReply.CommandId {reply.Err = ErrOutDate // 已经没有了之前执行的记录。}}

然后构造一个op,交给raft

op := Op{OpType:    GetOperation,Key:       args.Key,Value:     "",CommandId: args.CommandId,ClientId:  args.ClientId,
}
index, term, isLeader := kv.rf.Start(op)
if !isLeader {reply.Err = ErrWrongLeaderreturn
}

接下来思考,raft的start是立即返回的,如何知道该op已经apply呢?这就要使用channel来等待了。可以先创建一个goroutine,用于在kvserver层接收raft层的apply消息,然后通过该apply的index消息,传给在Handler中开启的channel

replyCh := make(chan ApplyMsg)
kv.replyChMap[index] = replyCh
select {
case replyMsg := <-replyCh:{if term != replyMsg.Term {// 已经进入之后的term,leader改变(当前server可能仍然是leader,但是已经是几个term之后了)reply.Err = ErrWrongLeaderreturn} else {reply.Value = replyMsg.Value}}
case <-time.After(KVTimeOut):{reply.Err = ErrTimeoutreturn}
}

这里解释为什么不能接受term != replyMsg.Term

考虑这么一种情况:有三个服务器:A、B、C

首先A在Term1收到GetA请求,并交给RAFT层,index=1。但是此时leader从A变到B,第一个请求可能已经commit了,也可能没有发送给B,B受到另外一个客户端的请求Put(A,1),并且在term2成功提交。此时A的管道接收到index=1的提交信息,但是term改变了。RAFT只能保证term和index都相同时log相同,所以此时发生错误。

接下来考虑:什么时候在handler中创建的管道应当接收到信息呢?应该是在raft发送applyCh的时候。所以我们在KV服务器开始时应当kick out一个协程,专门用于处理applyCh的信息。

func (kv *KVServer) handleApplyMsg() {for !kv.killed() {applyMsg := <-kv.applyChif applyMsg.CommandValid {go kv.applyCommand(applyMsg)}if applyMsg.SnapshotValid {go kv.applySnapshot(applyMsg)}_, err := DPrintf("Error Apply Msg: [%v]", applyMsg)if err != nil {return}}
}
func (kv *KVServer) applyCommand(msg raft.ApplyMsg) {
}func (kv *KVServer) applySnapshot(msg raft.ApplyMsg) {
}

接下来实现applycommand

func (kv *KVServer) applyCommand(msg raft.ApplyMsg) {var applyResult ApplyResultop := msg.Command.(Op)applyResult.Term = msg.CommandTermcommandContext, ok := kv.clientReply[op.ClientId]if ok && commandContext.CommandId >= op.CommandId {// 该指令已经被应用过。// applyResult = commandContext.Replyreturn}switch op.OpType {case GetOperation:{value, ok := kv.kvdb.Get(op.Key)if ok {applyResult.Value = value} else {applyResult.Value = ""applyResult.ErrMsg = ErrNoKey}}case PutOperation:{value := kv.kvdb.Put(op.Key, op.Value)applyResult.Value = value}case AppendOperation:{value := kv.kvdb.Append(op.Key, op.Value)applyResult.Value = value}default:DPrintf("Error Op Type %s", op.OpType)}ch, ok := kv.replyChMap[msg.CommandIndex]if ok {ch <- applyResult}kv.clientReply[op.ClientId] = CommandContext{CommandId: op.CommandId,Reply:     applyResult,}
}

其中,kvdb就是用map操作实现的kv内存数据库。

调试

关于心跳选择

在[MIT6.824] Spring2021 Lab 2: Raft - AntiO2’s Blog中,我的RAFT写法是当收到log时不触发心跳,而是在等待固定间隔时发送心跳。这样就会导致TestSpeed3A中的操作过慢,因为客户端在上一条操作成功后才会进行下一条,导致但客户端时一条操作的时间必然大于心跳间隔,但是测试中要求一个心跳间隔至少进行3次操作。

所以还是需要在收到log时发送一条信息,但是这样又会导致多个client同时append log时,发送的RPC过多。

于是我想了如下优化:

首先是在TestPersistConcurrent3A测试中,采用每次接收新log都要发送一轮新消息的形式,共发送32104条log

… Passed – 21.7 5 32104 1379

在raft的ticker中修改

func (rf *Raft) ticker() {rf.mu.Lock()rf.setElectionTime()rf.mu.Unlock()// for append new loggo func() {for rf.killed() == false {time.Sleep(rf.heartBeat / 30)rf.mu.Lock()if rf.status == leader {// 如果是leader状态,发送空包rf.setElectionTime()rf.mu.Unlock()rf.appendEntries(false)continue}rf.mu.Unlock()}}()for rf.killed() == false {// Your code here to check if a leader election should// be started and to randomize sleeping time using// time.Sleep().time.Sleep(rf.heartBeat)rf.mu.Lock()if rf.status == leader {// 如果是leader状态,发送空包rf.setElectionTime()rf.mu.Unlock()rf.appendEntries(true)continue}if time.Now().After(rf.electionTime) {// 如果已经超时, 开始选举go rf.startElection()}rf.mu.Unlock()}
}

相当于新增加了一个间隔为heartbeat/5的检测,如果有新的log就发送RPC请求,没有就不发送RPC(正常的heartbeat是无论有无新log都要发送)。

再次进行测试,

​ … Passed – 21.5 5 25873 1623

发送的RPC减少了,而且进行的操作数量也有所提升。(相当于进行了更高频的心跳,但是只有在有新log未同步时,才会触发这种高频心跳)

关于Data Race

在多客户端时,因为收到apply msg时,会启动一个apply command的go程,如果多个客户端同时get或put一个key,会导致data race.

解决方法:因为要严格按照log顺序apply,所以应当是直接调用apply command函数,而不是启动go程。

Part B: Key/value service with snapshots

The tester passes maxraftstate to your StartKVServer(). maxraftstate indicates the maximum allowed size of your persistent Raft state in bytes (including the log, but not including snapshots).

maxraftstate指示了Raft最大的logs大小。

You should compare maxraftstate to persister.RaftStateSize(). Whenever your key/value server detects that the Raft state size is approaching this threshold, it should save a snapshot using Snapshot, which in turn uses persister.SaveRaftState(). If maxraftstate is -1, you do not have to snapshot. maxraftstate applies to the GOB-encoded bytes your Raft passes to persister.SaveRaftState().

当raft的statesize快要接近maxraftstate时,使用snapshot制作快照。

首先实现状态机(KV数据库)生成和安装快照

func (kv *KvDataBase) GenSnapshot() []byte {w := new(bytes.Buffer)encode := gob.NewEncoder(w)err := encode.Encode(kv.KvData)if err != nil {return nil}return w.Bytes()
}
func (kv *KvDataBase) InstallSnapshot(data []byte) {if data == nil || len(data) < 1 { // bootstrap without any state?return}r := bytes.NewBuffer(data)encode := gob.NewDecoder(r)err := encode.Decode(&kv.KvData)if err != nil {panic(err.Error())}
}

在kvserver收到raft层的快照消息后,尝试将快照状态传递给raft,告诉它:我准备应用了。(如果是过期快照就不会应用),然后如果raft应用该快照,就安装。

func (kv *KVServer) applySnapshot(msg raft.ApplyMsg) {if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {kv.lastAppliedIndex = msg.SnapshotIndexkv.kvdb.InstallSnapshot(msg.Snapshot)}
}

首先在内存的KV数据库中应用快照,然后告诉raft已经应用了快照。

接下来就要考虑生成快照的时机了。因为append log是并发的,在Start Op时判断是否生成快照感觉不太好,因为此时新的操作还没有被应用到数据库里面。

如果在apply里面生成快照,还能起到阻塞新的操作的作用。感觉是个挺好的选择。

我们还需要维护一个新的lastApplied int, 表示制作快照时已经应用的raft索引。kvserver在初始化时,应当将lastApplied初始化为snapshot所包含的最后一条索引。

kv.kvdb.Init(persister.ReadSnapshot())
kv.lastAppliedIndex = int(kv.rf.GetLastIncludeIndex())

然后在ApplyCommand中,增加制作快照。

kv.lastAppliedIndex = max(kv.lastAppliedIndex, msg.CommandIndex)
if kv.rf.GetRaftStateSize() > kv.maxraftstate {kv.rf.Snapshot(msg.CommandIndex, kv.kvdb.GenSnapshot())
}

调试

关于生成snapshot的时机

上面的几行代码已经足够通过3B的前几个测试,但是在TestSnapshotRecoverManyClients3B中出现了问题。

报错test_test.go:362: logs were not trimmed (56767 > 8*1000)

在多个客户端时,虽然成功生成了快照,但是出现了这么一种情况:

在apply一个新的操作后,发现raft的size过大,比如是10000,这个时候生成快照,但是新apply的index可能只往前推了几条,比如旧log有167条,其中只有5条被确认apply了,生成快照后还有162条在raft state中。

出现这种情况,可能是apply过慢, 因为snapshot本身要对数据库编码存储,需要耗费比较长的时间,所以不能每次apply都进行一次snapshot操作。

所以应当每隔一段时间进行snapshot操作,而不是每次applycommand都进行。比如目前commit index是100(假设超过了设定的max state),apply index 是0,使用定时器,就可以让kvserver快速apply command。而不是在每次apply之后进行snapshot操作。

需要修改raft代码

因为所有的kvserver都能独立生成snapshot,所以可能出现leader尝试发送的log已经不存在于follower的log中的情况,这种时候就需要判断哪些logs是有用的。

if args.PrevLogIndex < lastIncludeIndex {// 这里可能出现args.PrevLogIndex < lastIncludeIndex的情况// 如果所有的logs都很过时(已经包含在当前快照里面),就都不需要,否则保留新的。entryLen := len(args.Entries)if args.PrevLogIndex+IndexT(entryLen) <= lastIncludeIndex {rf.logger.Printf("[%d] receive outdated logs before snapshot", rf.me)reply.Success = truereply.Conflict = falserf.logLatch.RUnlock()return}trim := lastIncludeIndex - args.PrevLogIndex // 多余的部分// args.PrevLogIndex+IndexT(entryLen) > lastIncludeIndex//     IndexT(entryLen) > trimargs.PrevLogIndex = args.Entries[trim-1].Indexargs.PrevLogTerm = args.Entries[trim-1].Termargs.Entries = args.Entries[trim:]
}

因为这个问题在Lab2D中没有出现,所以当时没有发现这个bug。

snapshot的内容:

保存对client的历史回复也应当被保存在snapshot的快照中。

关于重复的Put/Append

当putappend重复时,也应当返回。这样client才能更新commandid

因为在不可靠网络环境下,可能已经进行了更新操作,但是没有及时返回,clerk再次尝试请求,此时结果已经存放好了。

image-20231006164207915

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/126006.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速了解Spring Cache

SpringCache是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单的加一个注解&#xff0c;就可以实现缓存功能。 SpringCache提供了一层抽象&#xff0c;底层可以切换不同的缓存实现。例如&#xff1a; EHChche Redis Caffeine 常用注解&#xff1a; Enabl…

正点原子嵌入式linux驱动开发——TF-A初探

上一篇笔记中&#xff0c;正点原子的文档简单讲解了一下什么是TF-A&#xff0c;并且也学习了如何编译TF-A。但是TF-A是如何运行的&#xff0c;它的一个运行流程并未涉及。TF-A的详细运行过程是很复杂的&#xff0c;涉及到很多ARM处理器底层知识&#xff0c;所以这一篇笔记的内容…

信看课堂-厘米GNSS定位

我们常常说GPS 定位&#xff0c;不过定位远不止GPS定位&#xff0c;通过本节课程&#xff0c;我们将会了解到&#xff0c;原来GPS只是定位的一种&#xff1a; GNSS概述 不同的GNSS系统使用不同的频段来传输导航信号。以下是一些主要的GNSS系统及其相应的频段&#xff0c;用表…

JMeter接口自动化测试(数据驱动)

之前我们的用例数据都是配置在HTTP请求中&#xff0c;每次需要增加&#xff0c;修改用例都需要打开JMeter重新编辑&#xff0c;当用例越来越多的时候&#xff0c;用例维护起来就越来越麻烦&#xff0c;有没有好的方法来解决这种情况呢&#xff1f;我们可以将用例的数据存放在cs…

【智能家居项目】裸机版本——字体子系统 | 显示子系统

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《智能家居项目》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 今天实现上图整个项目系统中的字体子系统和显示子系统。 目录 &#x1f004;设计思路&#x1…

学校项目培训之Carla仿真平台之Carla学习内容

一、Blender Blender入门&#xff1a;https://www.bilibili.com/video/BV1fb4y1e7PD/ Blender导入骨骼&#xff1a;https://www.bilibili.com/video/BV1hc41157nL 做一个车&#xff1a;https://www.bilibili.com/video/BV1hY411q7w2 收获&#xff1a; 学习Blender建模的使用…

OpenResty编译安装详解

文章目录 一、概述1、OpenResty是什么2、官方文档 二、cengos安装OpenResty1、从官网下载2、目录结构3、编译安装 一、概述 1、OpenResty是什么 OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台&#xff0c;其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖…

Springboot使用Aop保存接口请求日志到mysql

1、添加aop依赖 <!-- aop日志 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency> 2、新建接口保存数据库的实体类RequestLog.java package com.example…

【算法】关于排序你应该知道的一切(下)

和光同尘_我的个人主页 单程孤舟&#xff0c;出云入霞&#xff0c;如歌如吟。 --门孔 八大排序 &#x1f56f;️前言1. 常见排序算法2. 常见排序算法实现2.1. 冒泡排序2.1.1. 基本思想2.1.2. 代码实现2.1.3. 特性 2.2. 快速排序2.2.1. hoare法基本思想代码实现 2.2.2. 快速排…

学习搜狗的workflow,MacBook上如何编译

官网说可以在MacBook上也可以运行&#xff0c;但是编译的时候却有找不到openssl的错误&#xff1a; 看其他博客也有类似的错误&#xff0c;按照类似的思路去解决 问题原因和解决办法 cmake编译的时候&#xff0c;没有找到openssl的头文件&#xff0c;需要设置cmake编译环境下…

命令解释器-Shell

目录 1. 概述 1.1. 概念 1.2. 分类&#xff1a; 1.3. type 命令 1.4.命令执行原理 2. Linux 中的特殊符号 3. 命令别名 3.1. 查看设置的别名 3.2. 常用的别名 3.3. 删除别名 3.6. 注意&#xff08;alias永久化&#xff09;&#xff1a; 4. history 命令历史 例&a…

【状态估计】将变压器和LSTM与卡尔曼滤波器结合到EM算法中进行状态估计(Python代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…