cornerstone中raft_server源码解析

news/2025/2/27 8:16:04/文章来源:https://www.cnblogs.com/Tomgeller/p/18559434

1.概述

cornerstone中核心即为raft_server的实现。
在raft里面有follower,leader,candidate三种角色,且角色身份还可以相互切换。
写三个类follower,leader,candidate显得没必要,因为三个类可以共享许多成员变量,如term,log_store等等。因此在cornerstone中抽象出raft_server这一个类,而raft_server的角色可以在三种状态相互切换。
下图为cornerstone中关于管理三种角色的示意图。

在本文中我们先解析单个raft_server节点中角色变化的过程,再关注leader与follower的通信。

2. raft_server节点中角色变化

2.1.1 逻辑概览

示意图如下

  • 1.当election_timeout事件发生后,followerA便按照上图的顺序先向自己的peer-follower发送prevote,得到半数以上的同意后开始下一步。
  • 2.followerA通过prevote知道自己网络状态良好,成为candidate,然后发送正式的request_vote请求,得到半数以上的同意后开始下一步。
  • 3.followerA调用become_leader,成为leader

2.1.2 election_timeout代码解析:

void raft_server::handle_election_timeout()
{recur_lock(lock_);if (steps_to_down_ > 0){if (--steps_to_down_ == 0){l_->info("no hearing further news from leader, remove this server from cluster and step down");for (std::list<ptr<srv_config>>::iterator it = config_->get_servers().begin();it != config_->get_servers().end();++it){if ((*it)->get_id() == id_){config_->get_servers().erase(it);ctx_->state_mgr_->save_config(*config_);break;}}ctx_->state_mgr_->system_exit(-1);return;}l_->info(sstrfmt("stepping down (cycles left: %d), skip this election timeout event").fmt(steps_to_down_));restart_election_timer();return;}if (catching_up_){// this is a new server for the cluster, will not send out vote req until conf that includes this srv is// committedl_->info("election timeout while joining the cluster, ignore it.");restart_election_timer();return;}if (role_ == srv_role::leader){l_->err("A leader should never encounter election timeout, illegal application state, stop the application");ctx_->state_mgr_->system_exit(-1);return;}if (ctx_->params_->prevote_enabled_ && role_ == srv_role::follower){if (prevote_state_ && !prevote_state_->empty()){l_->debug("Election timeout, but there is already a prevote ongoing, ignore this event");}else{l_->debug("Election timeout, start prevoting");request_prevote();}}else{l_->debug("Election timeout, change to Candidate");become_candidate();}
}
  • 1.首先steps_to_down_--,判断steps_to_down_是否减为0了,为0则继续下一步,不为0则不处理,重置election_timer。
  • 2.判断是不是新加入的server在catching-up集群的log_entry及相应配置信息,是则不处理,重置election_timer,否则继续下一步。
  • 3.判断进行了prevote没有,进行了就become_candidate,否则就去prevote。

知识点:
采用step_down机制,给server可能因偶然网络故障一次缓冲的机会,初始化step_down为2,先给step_down--,如果是偶然故障减为1依然还有1次机会。

2.1.3 request_prevote源码解析

void raft_server::request_prevote()
{l_->info(sstrfmt("prevote started with term %llu").fmt(state_->get_term()));bool change_to_candidate(false);{read_lock(peers_lock_);if (peers_.size() == 0){change_to_candidate = true;}}if (change_to_candidate){l_->info("prevote done, change to candidate and start voting");become_candidate();return;}if (!prevote_state_){prevote_state_ = std::make_unique<prevote_state>();}prevote_state_->inc_accepted_votes();prevote_state_->add_voted_server(id_);{read_lock(peers_lock_);for (peer_itor it = peers_.begin(); it != peers_.end(); ++it){ptr<req_msg> req(cs_new<req_msg>(state_->get_term(),msg_type::prevote_request,id_,it->second->get_id(),term_for_log(log_store_->next_slot() - 1),log_store_->next_slot() - 1,quick_commit_idx_));l_->debug(sstrfmt("send %s to server %d with term %llu").fmt(__msg_type_str[req->get_type()], it->second->get_id(), state_->get_term()));it->second->send_req(req, ex_resp_handler_);}}
}
  • 1.特判peer的大小是否为0,为0直接跳过prevote与vote阶段,直接become_candidate,否则继续。
  • 2.遍历每一个peer,向peer发送req_msg,类型为msg_type::prevote_request,req_msg里面包含自身的log_store中entry的last_idx,last_term,commit_idx情况给peer决定是否投票。

知识点:
为什么peer的大小为0就直接become_candidate而不是报持follower状态呢?

2.1.4 request_vote源码解析

void raft_server::request_vote()
{l_->info(sstrfmt("requestVote started with term %llu").fmt(state_->get_term()));state_->set_voted_for(id_);ctx_->state_mgr_->save_state(*state_);votes_granted_ += 1;voted_servers_.insert(id_);bool change_to_leader(false);{read_lock(peers_lock_);// is this the only server?if (votes_granted_ > (int32)(peers_.size() + 1) / 2){election_completed_ = true;change_to_leader = true;}else{for (peer_itor it = peers_.begin(); it != peers_.end(); ++it){ptr<req_msg> req(cs_new<req_msg>(state_->get_term(),msg_type::vote_request,id_,it->second->get_id(),term_for_log(log_store_->next_slot() - 1),log_store_->next_slot() - 1,quick_commit_idx_));l_->debug(sstrfmt("send %s to server %d with term %llu").fmt(__msg_type_str[req->get_type()], it->second->get_id(), state_->get_term()));it->second->send_req(req, resp_handler_);}}}if (change_to_leader){become_leader();}
}
  • 整体与prevote类似,关键点在于计算是否有一半以上的节点支持的技巧:if (votes_granted_ > (int32)(peers_.size() + 1) / 2)。不管奇数还是偶数,一半以上都是⌊(x + 1) / 2⌋。

2.1.5 become_leader源码解析

void raft_server::become_leader()
{stop_election_timer();role_ = srv_role::leader;leader_ = id_;srv_to_join_.reset();ptr<snapshot> nil_snp;{read_lock(peers_lock_);for (peer_itor it = peers_.begin(); it != peers_.end(); ++it){it->second->set_next_log_idx(log_store_->next_slot());it->second->set_snapshot_in_sync(nil_snp);it->second->set_free();enable_hb_for_peer(*(it->second));}}if (config_->get_log_idx() == 0){config_->set_log_idx(log_store_->next_slot());bufptr conf_buf = config_->serialize();ptr<log_entry> entry(cs_new<log_entry>(state_->get_term(), std::move(conf_buf), log_val_type::conf));log_store_->append(entry);l_->info("save initial config to log store");config_changing_ = true;}if (ctx_->event_listener_){ctx_->event_listener_->on_event(raft_event::become_leader);}request_append_entries();
}
  • 1.把election_timer给停了,同时更新自身的role等属性。
  • 2.清空每一个peer原有leader的信息,同时给每个peer设置hb来宣示自己主权。
  • 3.如果config_为空,更新config_

知识点:
这里的election_timeout事件其实不发生在election里面,而是在正常任期内发生的,用于触发election。follower在给定时间内没收到leader消息那么就启动vote,就是通过election_timer来实现的,如果收到了leader消息就restart_election_timer继续定时。

3.leader向follower发送消息

3.1 request_append_entries源码解析

void raft_server::request_append_entries()
{read_lock(peers_lock_);if (peers_.size() == 0){commit(log_store_->next_slot() - 1);return;}for (peer_itor it = peers_.begin(); it != peers_.end(); ++it){request_append_entries(*it->second);}
}bool raft_server::request_append_entries(peer& p)
{if (p.make_busy()){ptr<req_msg> msg = create_append_entries_req(p);p.send_req(msg, resp_handler_);return true;}l_->debug(sstrfmt("Server %d is busy, skip the request").fmt(p.get_id()));return false;
}
ptr<req_msg> raft_server::create_append_entries_req(peer& p)
{ulong cur_nxt_idx(0L);ulong commit_idx(0L);ulong last_log_idx(0L);ulong term(0L);ulong starting_idx(1L);{recur_lock(lock_);starting_idx = log_store_->start_index();cur_nxt_idx = log_store_->next_slot();commit_idx = quick_commit_idx_;term = state_->get_term();}{std::lock_guard<std::mutex> guard(p.get_lock());if (p.get_next_log_idx() == 0L){p.set_next_log_idx(cur_nxt_idx);}last_log_idx = p.get_next_log_idx() - 1;}if (last_log_idx >= cur_nxt_idx){l_->err(sstrfmt("Peer's lastLogIndex is too large %llu v.s. %llu, server exits").fmt(last_log_idx, cur_nxt_idx));ctx_->state_mgr_->system_exit(-1);return ptr<req_msg>();}// for syncing the snapshots, for starting_idx - 1, we can check with last snapshotif (last_log_idx > 0 && last_log_idx < starting_idx - 1){return create_sync_snapshot_req(p, last_log_idx, term, commit_idx);}ulong last_log_term = term_for_log(last_log_idx);ulong end_idx = std::min(cur_nxt_idx, last_log_idx + 1 + ctx_->params_->max_append_size_);ptr<std::vector<ptr<log_entry>>> log_entries((last_log_idx + 1) >= cur_nxt_idx ? ptr<std::vector<ptr<log_entry>>>(): log_store_->log_entries(last_log_idx + 1, end_idx));l_->debug(lstrfmt("An AppendEntries Request for %d with LastLogIndex=%llu, LastLogTerm=%llu, EntriesLength=%d, ""CommitIndex=%llu and Term=%llu").fmt(p.get_id(), last_log_idx, last_log_term, log_entries ? log_entries->size() : 0, commit_idx, term));ptr<req_msg> req(cs_new<req_msg>(term, msg_type::append_entries_request, id_, p.get_id(), last_log_term, last_log_idx, commit_idx));std::vector<ptr<log_entry>>& v = req->log_entries();if (log_entries){v.insert(v.end(), log_entries->begin(), log_entries->end());}return req;
}
  • 1.cornerstone无处不体现封装隔离的思想,将append-entry向所有peer的请求的实现下放到更小粒度的针对单个peer的append-entry,而即使是针对单个peer的append-entry,依然把底层的发送请求与对peer的状态管理分隔开来。
  • 2.create_append_entries_req才是底层的发送请求,这里要分三种情况讨论
    (1).follower的last_log_idx >= leader的cur_nxt_idx,说明follower
    (2).last_log_idx > 0 && last_log_idx < starting_idx - 1,说明follower的log_store差太多,直接给follower安装snapshot而不是按传统发送leader的log_store。
    (3).最后一种情况说明follower与leader的log_store有重合,选出非重合的log_store发送给follower。

知识点:
follower的日志落后很多的时候,可以直接发送snapshot加快同步速度。

3.2 create_sync_snapshot_req源码解析

ptr<req_msg> raft_server::create_sync_snapshot_req(peer& p, ulong last_log_idx, ulong term, ulong commit_idx)
{std::lock_guard<std::mutex> guard(p.get_lock());ptr<snapshot_sync_ctx> sync_ctx = p.get_snapshot_sync_ctx();ptr<snapshot> snp;if (sync_ctx != nilptr){snp = sync_ctx->get_snapshot();}if (!snp || (last_snapshot_ && last_snapshot_->get_last_log_idx() > snp->get_last_log_idx())){snp = last_snapshot_;if (snp == nilptr || last_log_idx > snp->get_last_log_idx()){l_->err(lstrfmt("system is running into fatal errors, failed to find a snapshot for peer %d(snapshot null: ""%d, snapshot doesn't contais lastLogIndex: %d").fmt(p.get_id(), snp == nilptr ? 1 : 0, last_log_idx > snp->get_last_log_idx() ? 1 : 0));ctx_->state_mgr_->system_exit(-1);return ptr<req_msg>();}if (snp->size() < 1L){l_->err("invalid snapshot, this usually means a bug from state machine implementation, stop the system to ""prevent further errors");ctx_->state_mgr_->system_exit(-1);return ptr<req_msg>();}l_->info(sstrfmt("trying to sync snapshot with last index %llu to peer %d").fmt(snp->get_last_log_idx(), p.get_id()));p.set_snapshot_in_sync(snp);}ulong offset = p.get_snapshot_sync_ctx()->get_offset();int32 sz_left = (int32)(snp->size() - offset);int32 blk_sz = get_snapshot_sync_block_size();bufptr data = buffer::alloc((size_t)(std::min(blk_sz, sz_left)));int32 sz_rd = state_machine_->read_snapshot_data(*snp, offset, *data);if ((size_t)sz_rd < data->size()){l_->err(lstrfmt("only %d bytes could be read from snapshot while %d bytes are expected, must be something wrong, exit.").fmt(sz_rd, data->size()));ctx_->state_mgr_->system_exit(-1);return ptr<req_msg>();}bool done = (offset + (ulong)data->size()) >= snp->size();std::unique_ptr<snapshot_sync_req> sync_req(new snapshot_sync_req(snp, offset, std::move(data), done));ptr<req_msg> req(cs_new<req_msg>(term,msg_type::install_snapshot_request,id_,p.get_id(),snp->get_last_log_term(),snp->get_last_log_idx(),commit_idx));req->log_entries().push_back(cs_new<log_entry>(term, sync_req->serialize(), log_val_type::snp_sync_req));return req;
}
  • 1.首先获取旧的snapshot,判断是否能更新,能的话就更新。
  • 2.把snapshot绑定到peer身上,因为snapshot挺大,需要分段发,所以要绑定到peer身上。
  • 3.offset记录snapshot发送到哪里了,bool done就是记录是否发送完了snapshot。
  • 4.发送snapshot_req。

知识点:
即使使用了offset记录发送的偏移,但是根据这里的代码很明显只发送了一次,那怎么能做到分段发送呢?
答案在cornerstone对于resp的处理里面,因为客户端接受snapshot,安装snapshot需要一定时间。不可能leader发送完一段snapshot紧跟着又发送下一段,leader需要等待follower处理完当前一段snapshot发送ack过来后再发送下一段,收到follower的resp后leader会再次调用这个函数,实现分段发送。

4.集群cluster的变更

4.1 cluster添加server

ptr<async_result<bool>> raft_server::add_srv(const srv_config& srv)
{bufptr buf(srv.serialize());ptr<log_entry> log(cs_new<log_entry>(0, std::move(buf), log_val_type::cluster_server));ptr<req_msg> req(cs_new<req_msg>((ulong)0, msg_type::add_server_request, 0, 0, (ulong)0, (ulong)0, (ulong)0));req->log_entries().push_back(log);return send_msg_to_leader(req);
}ptr<async_result<bool>> raft_server::send_msg_to_leader(ptr<req_msg>& req)
{typedef std::unordered_map<int32, ptr<rpc_client>>::const_iterator rpc_client_itor;int32 leader_id = leader_;ptr<cluster_config> cluster = config_;bool result(false);if (leader_id == -1){return cs_new<async_result<bool>>(result);}if (leader_id == id_){ptr<resp_msg> resp = process_req(*req);result = resp->get_accepted();return cs_new<async_result<bool>>(result);}ptr<rpc_client> rpc_cli;{auto_lock(rpc_clients_lock_);rpc_client_itor itor = rpc_clients_.find(leader_id);if (itor == rpc_clients_.end()){ptr<srv_config> srv_conf = config_->get_server(leader_id);if (!srv_conf){return cs_new<async_result<bool>>(result);}rpc_cli = ctx_->rpc_cli_factory_->create_client(srv_conf->get_endpoint());rpc_clients_.insert(std::make_pair(leader_id, rpc_cli));}else{rpc_cli = itor->second;}}if (!rpc_cli){return cs_new<async_result<bool>>(result);}ptr<async_result<bool>> presult(cs_new<async_result<bool>>());rpc_handler handler = [presult](ptr<resp_msg>& resp, const ptr<rpc_exception>& err) -> void{bool rpc_success(false);ptr<std::exception> perr;if (err){perr = err;}else{rpc_success = resp && resp->get_accepted();}presult->set_result(rpc_success, perr);};rpc_cli->send(req, handler);return presult;
}
  • add_srv先生成一个req,把变更的srv信息存到req附带的log里面。由于不是用于follower与leader之间的log_store同步,所以原来的last_log_idx,last_log_term,commit_idx全部为0。
  • 调用send_msg_to_leader向leader发送变更srv的信息

4.2 cluster移除server

ptr<async_result<bool>> raft_server::remove_srv(const int srv_id)
{bufptr buf(buffer::alloc(sz_int));buf->put(srv_id);buf->pos(0);ptr<log_entry> log(cs_new<log_entry>(0, std::move(buf), log_val_type::cluster_server));ptr<req_msg> req(cs_new<req_msg>((ulong)0, msg_type::remove_server_request, 0, 0, (ulong)0, (ulong)0, (ulong)0));req->log_entries().push_back(log);return send_msg_to_leader(req);
}
  • 同add_srv的分析。

5.总结

  • 1.合理架构raft中各角色关系,采用一个server外加peers的组合,server内部可follower,candidate,leader相互转换。
  • 2.采用step_down机制,给server可能因偶然网络故障一次缓冲的机会。
  • 3.计算是否有一半以上的节点支持的技巧:if (votes_granted_ > (int32)(peers_.size() + 1) / 2)。不管奇数还是偶数,一半以上都是⌊(x + 1) / 2⌋。
  • 4.follower的日志落后很多的时候,可以直接发送snapshot加快同步速度。
  • 5.发送大文件采用offset机制分段传送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/839793.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

封装、权限修饰符、封装的案例

1.面向对象的三大特征其中继承和多态放在后面讲,今天只聊封装 . 2.封装的介绍举例如果要我们处理一堆学生的数据,我们可以直接设计一个类,将我们需要处理的数据封装到类当中变为成员变量,然后我们直接创建两个对象,再将数据封装到对象中即可 . 3.封装的好处 1.便于我们维护…

【Atcoder训练记录】AtCoder Beginner Contest 381

训练情况赛后反思 简单题A题做红温了,怒吃 6 罚时,C题双指针其实差不多想出来了,但是对于判断字符串合法其实可以只判断两个端点,不需要全部遍历,中途还想了二分做法(?),然而写到最后发现并没有二分单调性。 A题 记得判断字符串的长度必须是奇数,\(1 \sim \frac{n+1}…

putty中设置白色背景、黑色字体

001、 002、 003、 前景色全部改为0 004、 背景色全部改为255 。

【牛客训练记录】牛客小白月赛105

训练情况赛后反思 看十佳歌手去了,比较晚回来,只开了A题,B题不看数据范围直接一眼丁真直接WA了一发 A题 两个数相乘进行比较,再判断即可。 #include <bits/stdc++.h> #define int long long #define endl \nusing namespace std;void solve(){int a1,b1,a2,b2; cin&g…

Win 11史诗升级,终于能完美兼容安卓生态

曾几何时,Windows与安卓设备之间的壁垒让人头疼不已。如今,微软再一次引领变革!Windows 11迎来了“史诗级”升级,安卓生态的完美兼容终于成为现实。这次升级将如何改变我们的使用体验? Win 11 的安卓兼容性升级具体实现了哪些突破?它对用户工作和娱乐方式有怎样的深远影响…

MPU6050 原理及驱动

MPU6050 姿态传感器原理分析MPU6050 是一款由 InvenSense 设计的低成本紧凑型 6 轴姿态传感器。该传感器可提供三轴角速度、三轴加速度和芯片温度数据。主设备可通过 I2C 与之进行通讯,从而获取并解算测量数据。MPU6050 技术参数3 轴陀螺仪(16位):量程范围为 250、500、100…

1.11--01:查找最接近的元素

http://noi.openjudge.cn/ch0111/01/查找最接近的元素 题目传送门 思路 可以看到数据范围,如果我们两层循环mn,10000*100000,时间复杂度绝对爆炸 所以我们就需要用到二分查找 什么是二分呢? 通俗的讲,就是每次砍半 假设我们在10个数据中查找接近7的元素 \(1 3 4 8 10 12 3…

一行代码搞定Windows/Office,史上最简单操作

曾几何时,安装和激活 Windows 或 Office 是让小白头疼的大工程,繁琐的操作让人心力交瘁。而今天,我们只需要一行代码就能轻松搞定!是不是听起来很不可思议? 一行代码究竟如何实现 Windows 和 Office 的快速激活?它是否既高效又安全?如今,无论是学习还是办公,都离不开 …

触想工业显示器应用于光伏自动化设备,助力绿色低碳能源发展

一、行业发展背景受能源危机影响及全球碳中和政策利好,近年来,我国光伏产业呈现快速扩张趋势。作为光伏组件核心发电设备,电池片的市场供需被同步拉高,也带动了电池片制造设备等周边产业的迅速成长。△光伏产业快速成长与此同时,旺盛需求带来的激烈竞争格局不断加剧产业“…

Drools与动态加载规则文件

Drools简介Drools是一款基于Java的开源规则引擎,将规则与业务代码解耦。规则以脚本的形式存储在一个文件中,使规则的变化不需要修改代码,重新启动机器即可在线上环境中生效。规则引擎实现了业务决策从应用程序代码中分离出来,并使用预定义的语义模块编写业务决策。接受数据…

Drools决策表实践运用

决策表使用方式在Drools中,决策表通常是以Excel(.xls或.xlsx)或者CSV格式存储的,这些格式的决策表并不能直接被Drools执行。它们需要被转换成Drools规则语言(DRL)的文本格式,之后才能被编译和执行。Drools提供了工具来帮助完成这一转换,主要使用org.drools.decisiontab…

Economies of Scale

规模经济(Economies of Scale) 规模经济指的是随着生产规模的扩大,单位产品的平均成本下降的现象。这通常发生在企业生产规模增加到一定程度时,由于固定成本(如厂房、机器设备等)被更多的产品分摊,单位产品的固定成本降低,同时由于规模的扩大,企业可能会享受到采购原材…