我们继续追踪 pos_cli --dump --dir /root/ckpt --pid [your program pid]
的checkpoint阶段
phos的checkpoint在handle_dump函数(pos/cli/src/dump.cpp)中实现
函数开始阶段定义各种变量和初始化
我们先来看--pid
在识别到clio.action_type == kPOS_CliMeta_Pid后,我们来到规则pid,将pid的值从字符串类型转到长整型并存入 clio.metas.ckpt.pid
validate_and_cast_args(/* clio */ clio,/* rules */ {{/* meta_type */ kPOS_CliMeta_Pid,/* meta_name */ "pid",/* meta_desp */ "pid of the process to be migrated",/* cast_func */ [](pos_cli_options_t& clio, std::string& meta_val) -> pos_retval_t {pos_retval_t retval = POS_SUCCESS;clio.metas.ckpt.pid = std::stoull(meta_val);exit:return retval;},/* is_required */ true},
再来看--dir
{/* meta_type */ kPOS_CliMeta_Dir,/* meta_name */ "dir",/* meta_desp */ "directory to store the checkpoint files",/* cast_func */ [](pos_cli_options_t &clio, std::string& meta_val) -> pos_retval_t {pos_retval_t retval = POS_SUCCESS;std::filesystem::path absolute_path;std::string dump_dir;//将路径转化为绝对路径absolute_path = std::filesystem::absolute(meta_val);//校验路径长度是否超过最大长度if(absolute_path.string().size() >= oob_functions::cli_ckpt_dump::kCkptFilePathMaxLen){POS_WARN("ckpt file path too long: given(%lu), expected_max(%lu)",absolute_path.string().size(),oob_functions::cli_ckpt_dump::kCkptFilePathMaxLen);retval = POS_FAILED_INVALID_INPUT;goto exit;}//清空目标路径缓冲区,然后将转换后的路径拷贝进去memset(clio.metas.ckpt.ckpt_dir, 0, oob_functions::cli_ckpt_dump::kCkptFilePathMaxLen);memcpy(clio.metas.ckpt.ckpt_dir, absolute_path.string().c_str(), absolute_path.string().size());exit:return retval;},/* is_required */ true},
nb_targets
表示用户通过 --target
指定了要转储的资源类型(白名单)。
nb_skip_targets
表示用户通过 --skip-target
指定了要跳过的资源类型(黑名单)。
第一个unlikely : 检查是否同时指定了这两者,这是不允许的。如果两个都指定,会发出警告并返回错误。
第二个unlikely : 如果两者都没指定,意味着用户没有明确需要或需要跳过哪些资源类型。这里会默认转储所有资源,并发出一个提示。
/* collapse_rule */ [](pos_cli_options_t& clio) -> pos_retval_t {pos_retval_t retval = POS_SUCCESS;if(unlikely(clio.metas.ckpt.nb_targets > 0 && clio.metas.ckpt.nb_skip_targets > 0)){POS_WARN("you can't specified both the whitelist and blacklist of resource types to dump (use either '--target' or '--skip-target')");retval = POS_FAILED_INVALID_INPUT;goto exit;}if(unlikely(clio.metas.ckpt.nb_targets == 0 && clio.metas.ckpt.nb_skip_targets == 0)){POS_WARN("no target and skip-target specified, default to dump all kinds of resource");}exit:return retval;});
第一步
我们需要判断存储目录是否存在,并检查是否存在挂载文件
// step 1: make sure the directory exist and freshif (std::filesystem::exists(clio.metas.ckpt.ckpt_dir)) {try {
//if(std::filesystem::exists(mount_existance_file)){has_mount_before = true;}
清理旧文件
nb_removed_files = 0;for(auto& de : std::filesystem::directory_iterator(clio.metas.ckpt.ckpt_dir)) {// returns the number of deleted entities since c++17:nb_removed_files += std::filesystem::remove_all(de.path());}
如果删除过程中出现异常(如权限问题、目录被占用等),会记录错误并返回失败
POS_LOG("clean old assets under specified dump dir: dir(%s), nb_removed_files(%lu)",clio.metas.ckpt.ckpt_dir, nb_removed_files);POS_LOG("reuse dump dir: %s", clio.metas.ckpt.ckpt_dir);} catch (const std::exception& e) {POS_WARN("failed to remove old assets under specified dump dir: dir(%s), error(%s)",clio.metas.ckpt.ckpt_dir, e.what());retval = POS_FAILED;goto exit;}}
如果存储目录不存在,则创建一个
else {try {std::filesystem::create_directories(clio.metas.ckpt.ckpt_dir);} catch (const std::filesystem::filesystem_error& e) {POS_WARN("failed to create dump directory: dir(%s), error(%s)",clio.metas.ckpt.ckpt_dir, e.what());retval = POS_FAILED;goto exit;}POS_LOG("create dump dir: %s", clio.metas.ckpt.ckpt_dir);}
第二步
把 dump 目录挂载到 tmpfs(一个基于内存的文件系统),目的是加速 dump 操作,减少磁盘 I/O
// step 2: mount the memory to tmpfs//若tmpfs 还没有挂载,需要执行挂载操作if(has_mount_before == false){// obtain available memory on the systemretval = POSUtilSystem::get_memory_info(total_mem_bytes, avail_mem_bytes);if(unlikely(retval != POS_SUCCESS)){POS_WARN("failed dump, failed to obtain memory information of the ststem");retval = POS_FAILED;goto exit;}if(unlikely(avail_mem_bytes <= MB(128))){POS_WARN("failed dump, not enough memory on the system: total(%lu bytes), avail(%lu bytes)",total_mem_bytes, avail_mem_bytes);retval = POS_FAILED;goto exit;}// execute mount cmdmount_cmd = std::string("mount -t tmpfs -o size=")+ POSUtilSystem::format_byte_number(avail_mem_bytes * 0.8)+ std::string(" tmpfs ") + std::string(clio.metas.ckpt.ckpt_dir);retval = POSUtil_Command_Caller::exec_sync(mount_cmd,mount_result,/* ignore_error */ false,/* print_stdout */ true,/* print_stderr */ true);if(unlikely(retval != POS_SUCCESS)){POS_WARN("failed to mount dump directory to tmpfs, the dump might be slowed down due to storage IO");} else {POS_LOG("mount dump dir to tmpfs: size(%s), dir(%s)",POSUtilSystem::format_byte_number(avail_mem_bytes * 0.8).c_str(),clio.metas.ckpt.ckpt_dir);}}
第三步
创建一个空文件 mount_existance_file,作为成功挂载 tmpfs 的标志
// step 3: create mount existance filePOS_ASSERT(!std::filesystem::exists(mount_existance_file));mount_existance_file_stream.open(mount_existance_file);if(unlikely(!mount_existance_file_stream.is_open())){POS_WARN("failed to create mount existance file, yet still successfully mount to tmpfs: path(%s)",mount_existance_file.c_str());}mount_existance_file_stream << std::to_string(static_cast<int>(avail_mem_bytes * 0.8));mount_existance_file_stream.close();
第四步
gpu侧dump
准备 call_data 数据结构,作为向 GPU 端发起 dump 请求的参数。
数据包括:
- pid:需要 dump 的进程 ID。
- ckpt_dir:checkpoint 文件的目标目录,即刚才挂载到 tmpfs 的目录。
- targets 和 skip_targets:需要 dump 的对象以及需要跳过的对象。
- do_cow(copy-on-write)和 force_recompute:控制 dump 策略。
// step 4: GPU-side dump (sync)call_data.pid = clio.metas.ckpt.pid;memcpy(call_data.ckpt_dir,clio.metas.ckpt.ckpt_dir,oob_functions::cli_ckpt_dump::kCkptFilePathMaxLen);memcpy(call_data.targets,clio.metas.ckpt.targets,sizeof(call_data.targets));memcpy(call_data.skip_targets,clio.metas.ckpt.skip_targets,sizeof(call_data.skip_targets));call_data.nb_targets = clio.metas.ckpt.nb_targets;call_data.nb_skip_targets = clio.metas.ckpt.nb_skip_targets;call_data.do_cow = clio.metas.ckpt.do_cow;call_data.force_recompute = clio.metas.ckpt.force_recompute;retval = clio.local_oob_client->call(kPOS_OOB_Msg_CLI_Ckpt_Dump, &call_data);if(POS_SUCCESS != call_data.retval){POS_WARN("dump failed, gpu-side dump failed, %s", call_data.retmsg);goto exit;}
第五步
cpu侧dump
构造 criu(Checkpoint/Restore in Userspace)的命令,用于执行 CPU 侧 dump。
参数解释:
--images-dir:保存 dump 数据的目录,这里正是挂载的 tmpfs。
--shell-job:允许 dump 包含 shell 作业。
--display-stats:显示 dump 过程中的统计信息。
--tree:指定需要 dump 的进程树,通过 pid 确认。
// step 5: CPU-side dump (sync)criu_cmd = std::string("criu dump")+ std::string(" --images-dir ") + std::string(clio.metas.ckpt.ckpt_dir)+ std::string(" --shell-job --display-stats")+ std::string(" --tree ") + std::to_string(clio.metas.ckpt.pid);//执行 criu dump 命令,将 CPU 侧数据 dump 到 tmpfs 中。retval = POSUtil_Command_Caller::exec_sync(criu_cmd, criu_result,/* ignore_error */ false,/* print_stdout */ true,/* print_stderr */ true);if(unlikely(retval != POS_SUCCESS)){POS_WARN("dump failed, failed to dump cpu-side: retval(%u)", retval);// POS_WARN("failed to execute CRIU");goto exit;}
详细看一下gpu 侧的 dump
//pos/src/oob/ckpt_dump.cpppos_retval_t sv(int fd, struct sockaddr_in* remote, POSOobMsg_t* msg, POSWorkspace* ws, POSOobServer* oob_server){pos_retval_t retval = POS_SUCCESS;oob_payload_t *payload;
//payload:存储请求负载的指针,包含 pid、ckpt_dir(存储 checkpoint 目录)、do_cow(是否启用 Copy-On-Write 机制)、force_recompute(是否强制重新计算)等信息。POSClient *client; //表示目标进程的 POS 客户端,用于管理 GPU 端的数据std::string retmsg;POSCommand_QE_t* cmd; //存储即将执行的命令对象std::vector<POSCommand_QE_t*> cmds;uint32_t i;typename std::map<pos_resource_typeid_t,std::string>::iterator map_iter;
//检查payload是否为空,确保信息负载有效POS_CHECK_POINTER(payload = (oob_payload_t*)msg->payload);// obtain client with specified pidclient = ws->get_client_by_pid(payload->pid);//如果找不到对应的 client,说明该进程不存在,返回 POS_FAILED_NOT_EXIST 并进入 response 处理流程if(unlikely(client == nullptr)){retmsg = "no client with specified pid was found";payload->retval = POS_FAILED_NOT_EXIST;memcpy(payload->retmsg, retmsg.c_str(), retmsg.size());goto response;}
创建cmd对象作为dump命令
// form cmdPOS_CHECK_POINTER(cmd = new POSCommand_QE_t);cmd->client_id = client->id;cmd->type = kPOS_Command_Oob2Parser_Dump;cmd->ckpt_dir = std::string(payload->ckpt_dir) + std::string("/phos");cmd->do_cow = payload->do_cow;cmd->force_recompute = payload->force_recompute;//force_recompute == true 时,必须开启 do_cow,否则触发 POS_ASSERT 断言if(cmd->force_recompute == true)POS_ASSERT(cmd->do_cow == true);
选择dump资源
//nb_targets > 0:指定目标资源。 //nb_skip_targets > 0:默认选择所有资源,并删除 skip_targets 指定的资源。//两者皆为 0:Dump 所有资源POS_ASSERT(!(payload->nb_targets > 0 && payload->nb_skip_targets > 0));if(payload->nb_targets > 0){for(i=0; i<payload->nb_targets; i++)cmd->target_resource_type_idx.insert(payload->targets[i]);} else if(payload->nb_skip_targets > 0){ for(map_iter = pos_resource_map.begin(); map_iter != pos_resource_map.end(); map_iter++){cmd->target_resource_type_idx.insert(map_iter->first);}for(i=0; i<payload->nb_skip_targets; i++)cmd->target_resource_type_idx.erase(payload->skip_targets[i]);} else { // payload->nb_targets == 0 && payload->nb_skip_targets == 0for(map_iter = pos_resource_map.begin(); map_iter != pos_resource_map.end(); map_iter++){cmd->target_resource_type_idx.insert(map_iter->first);}}
创建gpu dump目录
// create ckpt directory for GPU-sidePOS_ASSERT(std::filesystem::exists(payload->ckpt_dir));POS_ASSERT(!std::filesystem::exists(cmd->ckpt_dir));try {std::filesystem::create_directories(cmd->ckpt_dir);} catch (const std::filesystem::filesystem_error& e) {POS_WARN("failed dump, failed to create directory for GPU-side: dir(%s), error(%s)",cmd->ckpt_dir.c_str(), e.what());retmsg = "see posd log for more details";payload->retval = POS_FAILED;memcpy(payload->retmsg, retmsg.c_str(), retmsg.size());goto response;}POS_LOG("create dump dir for GPU-side: %s", cmd->ckpt_dir.c_str());
发送 Dump 命令到解析器
// send to parser//通过 client->push_q 发送 cmd 到 Oob2Parser 解析队列。retval = client->template push_q<kPOS_QueueDirection_Oob2Parser, kPOS_QueueType_Cmd_WQ>(cmd);if(unlikely(retval != POS_SUCCESS)){retmsg = "see posd log for more details";payload->retval = POS_FAILED;memcpy(payload->retmsg, retmsg.c_str(), retmsg.size());goto response;}
轮询 Oob2Parser,直到获取 Dump 结果。
// wait parser replycmds.clear();while(cmds.size() == 0){client->template poll_q<kPOS_QueueDirection_Oob2Parser, kPOS_QueueType_Cmd_CQ>(&cmds);}POS_ASSERT(cmds.size() == 1);POS_ASSERT(cmds[0]->type == kPOS_Command_Oob2Parser_Dump);
处理解析结果
// transfer error status//根据 cmds[0]->retval 判断 Dump 是否成功:payload->retval = cmds[0]->retval;if(unlikely(cmds[0]->retval != POS_SUCCESS)){//POS_FAILED_NOT_ENABLED:posd 未启用 checkpoint。if(cmds[0]->retval == POS_FAILED_NOT_ENABLED){retmsg = "posd doesn't enable ckpt support";//POS_FAILED_ALREADY_EXIST:Dump 过于频繁,发生冲突。} else if (cmds[0]->retval == POS_FAILED_ALREADY_EXIST){retmsg = "dump too frequent, conflict";//其他情况,打印错误日志。} else {retmsg = "see posd log for more details";}memcpy(payload->retmsg, retmsg.c_str(), retmsg.size());goto response;}
释放前先持久化客户端,防止数据丢失
// before remove client, we persist the state of the clientif(unlikely(POS_SUCCESS != (payload->retval = client->persist(cmd->ckpt_dir)))){POS_WARN("failed to persist the state of client");retmsg = "see posd log for more details";memcpy(payload->retmsg, retmsg.c_str(), retmsg.size());}
释放客户端
// remove client//如果 Dump 成功,移除 clientif(likely(cmds[0]->retval == POS_SUCCESS)){ws->remove_client(cmd->client_id);}response:POS_ASSERT(retmsg.size() < kServerRetMsgMaxLen);__POS_OOB_SEND();return retval;}