机器人模仿学习之动作分块ACT算法的代码剖析、部署训练

前言

本文最早是属于《斯坦福Mobile ALOHA背后的关键技术:动作分块ACT算法的原理解析》的第二、第三部分,涉及到动作分块ACT的代码剖析与部署训练

但因为想把ACT的代码逐行剖析的更细致些,加之为避免上一篇文章太过于长,故把动作分块ACT的代码剖析与部署实践这块独立出来成本文

第一部分 动作分块算法ACT的代码剖析

关于ACT的代码,我们可以重点研究下这个仓库:GitHub - tonyzhaozh/act,我司同事杜老师也于24年1.10日跑通了这份代码(如何跑通的教程见下文第二部分)

  • imitate_episodes.py,训练和评估 ACT
  • policy.py,An adaptor for ACT policy
  • detr,ACT 的模型定义 修改自 DETR
  • sim_env.py,具有 joint space control的 Mujoco + DM_Control 环境
  • ee_sim_env.py,具有EE space control的 Mujoco + DM_Control 环境
  • scripted_policy.py,模拟环境的脚本化策略
  • constants.py,跨文件共享的常量
  • utils.py,数据加载和辅助函数等实用程序
  • visualize_episodes.py,保存 .hdf5 数据集中的视频

1.1 ACT的训练与评估imitate_episodes.py

1.1.1 主程序

  1. 从命令行参数中获取模型训练和评估的相关配置
    def main(args):set_seed(1)  # 设置随机种子以保证结果可重现# 解析命令行参数is_eval = args["eval"]  # 是否为评估模式的布尔标志ckpt_dir = args["ckpt_dir"]  # 保存/加载检查点的目录policy_class = args["policy_class"]  # 使用的策略类onscreen_render = args["onscreen_render"]  # 是否进行屏幕渲染的标志task_name = args["task_name"]  # 任务名称batch_size_train = args["batch_size"]  # 训练批大小batch_size_val = args["batch_size"]  # 验证批大小num_epochs = args["num_epochs"]  # 训练的总周期数use_waypoint = args["use_waypoint"]  # 是否使用航点constant_waypoint = args["constant_waypoint"]  # 持续航点的设置# 根据是否使用航点打印相应信息if use_waypoint:print("Using waypoint")  # 使用航点if constant_waypoint is not None:print(f"Constant waypoint: {constant_waypoint}")  # 持续航点
  2. 根据任务名称和配置获取任务参数,例如数据集目录、任务类型等
        # 获取任务参数is_sim = True  # 硬编码为True以避免从aloha中查找常量# 如果是模拟任务,从constants导入SIM_TASK_CONFIGSif is_sim:from constants import SIM_TASK_CONFIGStask_config = SIM_TASK_CONFIGS[task_name]else:from aloha_scripts.constants import TASK_CONFIGStask_config = TASK_CONFIGS[task_name]# 从任务配置中获取相关参数dataset_dir = task_config["dataset_dir"]num_episodes = task_config["num_episodes"]episode_len = task_config["episode_len"]camera_names = task_config["camera_names"]
  3. 定义模型的架构和超参数,包括学习率、网络结构、层数等
       # 固定参数state_dim = 14  # 状态维度lr_backbone = 1e-5  # 主干网络的学习率backbone = "resnet18"  # 使用的主干网络类型
  4. 根据策略类别设置策略配置
        # 根据策略类别设置策略配置if policy_class == "ACT":# ACT策略的特定参数enc_layers = 4dec_layers = 7nheads = 8policy_config = {"lr": args["lr"],"num_queries": args["chunk_size"],"kl_weight": args["kl_weight"],"hidden_dim": args["hidden_dim"],"dim_feedforward": args["dim_feedforward"],"lr_backbone": lr_backbone,"backbone": backbone,"enc_layers": enc_layers,"dec_layers": dec_layers,"nheads": nheads,"camera_names": camera_names,}elif policy_class == "CNNMLP":# CNNMLP策略的特定参数policy_config = {"lr": args["lr"],"lr_backbone": lr_backbone,"backbone": backbone,"num_queries": 1,"camera_names": camera_names,}else:raise NotImplementedError
  5. 配置训练参数
        # 配置训练参数config = {"num_epochs": num_epochs,"ckpt_dir": ckpt_dir,"episode_len": episode_len,"state_dim": state_dim,"lr": args["lr"],"policy_class": policy_class,"onscreen_render": onscreen_render,"policy_config": policy_config,"task_name": task_name,"seed": args["seed"],"temporal_agg": args["temporal_agg"],"camera_names": camera_names,"real_robot": not is_sim,}
  6. 如果设置为评估模式,加载保存的模型权重并在验证集上评估模型性能,计算成功率和平均回报
        # 如果为评估模式,执行评估流程if is_eval:ckpt_names = [f"policy_best.ckpt"]results = []for ckpt_name in ckpt_names:success_rate, avg_return = eval_bc(config, ckpt_name, save_episode=True)results.append([ckpt_name, success_rate, avg_return])for ckpt_name, success_rate, avg_return in results:print(f"{ckpt_name}: {success_rate=} {avg_return=}")print()exit()# 加载数据train_dataloader, val_dataloader, stats, _ = load_data(dataset_dir,num_episodes,camera_names,batch_size_train,batch_size_val,use_waypoint,constant_waypoint,)# 保存数据集统计信息if not os.path.isdir(ckpt_dir):os.makedirs(ckpt_dir)stats_path = os.path.join(ckpt_dir, f"dataset_stats.pkl")with open(stats_path, "wb") as f:pickle.dump(stats, f)
    
  7. 最后,将结果打印出来
        # 训练并获取最佳检查点信息best_ckpt_info = train_bc(train_dataloader, val_dataloader, config)best_epoch, min_val_loss, best_state_dict = best_ckpt_info# 保存最佳检查点ckpt_path = os.path.join(ckpt_dir, f"policy_best.ckpt")torch.save(best_state_dict, ckpt_path)print(f"Best ckpt, val loss {min_val_loss:.6f} @ epoch{best_epoch}")

1.1.2 make_policy、make_optimizer、get_image

根据指定的policy_class(策略类别,目前支持两种类型:"ACT"和"CNNMLP"),和policy_config(策略配置)创建一个策略模型对象

def make_policy(policy_class, policy_config):if policy_class == 'ACT':policy = ACTPolicy(policy_config)  # 如果策略类是 ACT,创建 ACTPolicyelif policy_class == 'CNNMLP':policy = CNNMLPPolicy(policy_config)  # 如果策略类是 CNNMLP,创建 CNNMLPPolicyelse:raise NotImplementedError  # 如果不是以上两种类型,则抛出未实现错误return policy  # 返回创建的策略对象

make_optimizer用于创建策略模型的优化器(optimizer),并返回创建的优化器对象。优化器的作用是根据策略模型的损失函数来更新模型的参数,以使损失函数尽量减小

def make_optimizer(policy_class, policy):if policy_class == 'ACT':optimizer = policy.configure_optimizers()  # 如果策略类是 ACT,配置优化器elif policy_class == 'CNNMLP':optimizer = policy.configure_optimizers()  # 如果策略类是 CNNMLP,配置优化器else:raise NotImplementedError  # 如果不是以上两种类型,则抛出未实现错误return optimizer  # 返回配置的优化器

get_image的作用是获取一个时间步(ts)的图像数据。函数接受两个参数:tscamera_names

def get_image(ts, camera_names):curr_images = []for cam_name in camera_names:curr_image = rearrange(ts.observation['images'][cam_name], 'h w c -> c h w')  # 重排图像数组curr_images.append(curr_image)  # 将处理后的图像添加到列表中curr_image = np.stack(curr_images, axis=0)  # 将图像列表堆叠成数组curr_image = torch.from_numpy(curr_image / 255.0).float().cuda().unsqueeze(0)  # 将数组转换为 PyTorch 张量return curr_image  # 返回处理后的图像张量
  1. ts是一个时间步的数据,包含了多个相机(摄像头)拍摄的图像
    ts.observation["images"]包含了各个相机拍摄的图像数据,而camera_names是一个列表,包含了要获取的相机的名称
  2. 函数通过循环遍历camera_names中的相机名称,从ts.observation["images"]中获取对应相机的图像数据
    这些图像数据首先通过rearrange函数重新排列维度,将"height-width-channels"的顺序变为"channels-height-width",以适应PyTorch的数据格式
  3. 获取的图像数据被放入curr_images列表中
  4. 接下来,函数将curr_images列表中的所有图像数据堆叠成一个张量(tensor),np.stack(curr_images, axis=0)这一行代码实现了这个操作
  5. 接着,图像数据被归一化到[0, 1]的范围,然后转换为PyTorch的float类型,并移到GPU上(如果可用)。最后,图像数据被增加了一个额外的维度(unsqueeze(0)),以适应模型的输入要求

最终,函数返回包含时间步图像数据的PyTorch张量。这个图像数据可以被用于输入到神经网络模型中进行处理

1.1.3 eval_bc:评估一个行为克隆(behavior cloning)模型

  1. def eval_bc(config, ckpt_name, save_episode=True):set_seed(1000)  # 设置随机种子为 1000# 从配置中获取参数ckpt_dir = config['ckpt_dir']state_dim = config['state_dim']real_robot = config['real_robot']policy_class = config['policy_class']onscreen_render = config['onscreen_render']policy_config = config['policy_config']camera_names = config['camera_names']max_timesteps = config['episode_len']task_name = config['task_name']temporal_agg = config['temporal_agg']onscreen_cam = 'angle'# 加载策略和统计信息ckpt_path = os.path.join(ckpt_dir, ckpt_name)policy = make_policy(policy_class, policy_config)loading_status = policy.load_state_dict(torch.load(ckpt_path))print(loading_status)policy.cuda()policy.eval()print(f'Loaded: {ckpt_path}')stats_path = os.path.join(ckpt_dir, f'dataset_stats.pkl')with open(stats_path, 'rb') as f:stats = pickle.load(f)# 定义预处理和后处理函数pre_process = lambda s_qpos: (s_qpos - stats['qpos_mean']) / stats['qpos_std']post_process = lambda a: a * stats['action_std'] + stats['action_mean']
  2.     # 加载环境if real_robot:from aloha_scripts.robot_utils import move_grippers  # 从 aloha_scripts.robot_utils 导入 move_grippersfrom aloha_scripts.real_env import make_real_env  # 从 aloha_scripts.real_env 导入 make_real_envenv = make_real_env(init_node=True)  # 创建真实机器人环境env_max_reward = 0else:from sim_env import make_sim_env  # 从 sim_env 导入 make_sim_envenv = make_sim_env(task_name)  # 创建模拟环境env_max_reward = env.task.max_reward# 设置查询频率和时间聚合参数query_frequency = policy_config['num_queries']if temporal_agg:query_frequency = 1num_queries = policy_config['num_queries']# 设置最大时间步数max_timesteps = int(max_timesteps * 1)  # 可以根据实际任务调整最大时间步数
  3. 设置评估的循环次数(num_rollouts),每次循环都会进行一次评估
    在每次循环中,初始化环境,执行模型生成的动作并观测环境的响应
    将每个时间步的观测数据(包括图像、关节位置等)存储在相应的列表中
        # 设置回放次数和初始化结果列表num_rollouts = 50episode_returns = []highest_rewards = []# 回放循环for rollout_id in range(num_rollouts):rollout_id += 0# 设置任务if 'sim_transfer_cube' in task_name:BOX_POSE[0] = sample_box_pose()  # 在模拟重置中使用的 BOX_POSEelif 'sim_insertion' in task_name:BOX_POSE[0] = np.concatenate(sample_insertion_pose())  # 在模拟重置中使用的 BOX_POSEts = env.reset()  # 重置环境# 处理屏幕渲染if onscreen_render:ax = plt.subplot()plt_img = ax.imshow(env._physics.render(height=480, width=640, camera_id=onscreen_cam))plt.ion()# 评估循环if temporal_agg:all_time_actions = torch.zeros([max_timesteps, max_timesteps+num_queries, state_dim]).cuda()qpos_history = torch.zeros((1, max_timesteps, state_dim)).cuda()image_list = []  # 用于可视化的图像列表qpos_list = []target_qpos_list = []rewards = []# 在不计算梯度的模式下执行with torch.inference_mode():for t in range(max_timesteps):# 更新屏幕渲染和等待时间if onscreen_render:image = env._physics.render(height=480, width=640, camera_id=onscreen_cam)plt_img.set_data(image)plt.pause(DT)# 处理上一时间步的观测值以获取 qpos 和图像列表obs = ts.observationif 'images' in obs:image_list.append(obs['images'])else:image_list.append({'main': obs['image']})qpos_numpy = np.array(obs['qpos'])qpos = pre_process(qpos_numpy)qpos = torch.from_numpy(qpos).float().cuda().unsqueeze(0)qpos_history[:, t] = qposcurr_image = get_image(ts, camera_names)# 查询策略if config['policy_class'] == "ACT":if t % query_frequency == 0:all_actions = policy(qpos, curr_image)if temporal_agg:all_time_actions[[t], t:t+num_queries] = all_actionsactions_for_curr_step = all_time_actions[:, t]actions_populated = torch.all(actions_for_curr_step != 0, axis=1)actions_for_curr_step = actions_for_curr_step[actions_populated]k = 0.01exp_weights = np.exp(-k * np.arange(len(actions_for_curr_step)))exp_weights = exp_weights / exp_weights.sum()exp_weights = torch.from_numpy(exp_weights).cuda().unsqueeze(dim=1)raw_action = (actions_for_curr_step * exp_weights).sum(dim=0, keepdim=True)else:raw_action = all_actions[:, t % query_frequency]elif config['policy_class'] == "CNNMLP":raw_action = policy(qpos, curr_image)else:raise NotImplementedError# 后处理动作raw_action = raw_action.squeeze(0).cpu().numpy()action = post_process(raw_action)target_qpos = action# 步进环境ts = env.step(target_qpos)# 用于可视化的列表qpos_list.append(qpos_numpy)target_qpos_list.append(target_qpos)rewards.append(ts.reward)plt.close()  # 关闭绘图窗口if real_robot:move_grippers([env.puppet_bot_left, env.puppet_bot_right], [PUPPET_GRIPPER_JOINT_OPEN] * 2, move_time=0.5)  # 打开夹持器pass
    计算每次评估的总回报,以及每次评估的最高回报,并记录成功率
            # 计算回报和奖励rewards = np.array(rewards)episode_return = np.sum(rewards[rewards != None])episode_returns.append(episode_return)episode_highest_reward = np.max(rewards)highest_rewards.append(episode_highest_reward)print(f'Rollout {rollout_id}\n{episode_return=}, {episode_highest_reward=}, {env_max_reward=}, Success: {episode_highest_reward == env_max_reward}')
    如果指定了保存评估过程中的图像数据,将每次评估的图像数据保存为视频
            # 保存视频if save_episode:save_videos(image_list, DT, video_path=os.path.join(ckpt_dir, f'video{rollout_id}.mp4'))
  4. 输出评估结果,包括成功率、平均回报以及回报分布
    将评估结果保存到文本文件中
    # 计算成功率和平均回报# 计算成功率,即最高奖励的次数与环境最大奖励相等的比率success_rate = np.mean(np.array(highest_rewards) == env_max_reward)# 计算平均回报avg_return = np.mean(episode_returns)# 创建一个包含成功率和平均回报的摘要字符串summary_str = f'\n成功率: {success_rate}\n平均回报: {avg_return}\n\n'# 遍历奖励范围,计算每个奖励范围内的成功率for r in range(env_max_reward + 1):# 统计最高奖励大于等于 r 的次数more_or_equal_r = (np.array(highest_rewards) >= r).sum()# 计算成功率more_or_equal_r_rate = more_or_equal_r / num_rollouts# 将结果添加到摘要字符串中summary_str += f'奖励 >= {r}: {more_or_equal_r}/{num_rollouts} = {more_or_equal_r_rate*100}%\n'# 打印摘要字符串print(summary_str)# 将成功率保存到文本文件result_file_name = 'result_' + ckpt_name.split('.')[0] + '.txt'
    with open(os.path.join(ckpt_dir, result_file_name), 'w') as f:f.write(summary_str)  # 写入摘要字符串f.write(repr(episode_returns))  # 写入回报数据f.write('\n\n')f.write(repr(highest_rewards))  # 写入最高奖励数据# 返回成功率和平均回报return success_rate, avg_return

// 待更

第二部分 Mobile Aloha或Aloha软件层面代码的跑通与部署

// 待更

参考文献与推荐阅读

  1. Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware
  2. Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware(阅读笔记)
  3. Aloha 机械臂的学习记录2——AWE:AWE + ACT
  4. ..

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/345230.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3 - 自定义弹框组件

写了一个弹框组件 <template><transition name"modal-fade"><div v-if"showFlag" class"myModal"><div class"content"><div class"topBox"><div class"leftTitle"><spa…

AUTO SEG-LOSS SEARCHING METRIC SURROGATES FOR SEMANTIC SEGMENTATION

AUTO SEG-LOSS: 搜索度量替代语义分割 论文链接&#xff1a;https://arxiv.org/abs/2010.07930 项目链接&#xff1a;https://github.com/fundamentalvision/Auto-Seg-Loss ABSTRACT 设计合适的损失函数是训练深度网络的关键。特别是在语义分割领域&#xff0c;针对不同的场…

苹果手机怎么恢复备份?详细攻略为你整理好了!

随着智能手机和互联网的普及&#xff0c;手机中存储的个人信息、照片、视频、聊天记录等数据会变得越来越多。一旦手机丢失、损坏或系统出现问题&#xff0c;我们很可能会面临数据丢失的风险。因此&#xff0c;越来越多的人开始意识到保护手机数据的重要性。 当苹果手机数据丢…

爬虫—响应页面乱码问题解决方法

爬虫—响应页面乱码问题解决方法 案例&#xff1a;腾牛网图片抓取 源代码如下&#xff1a; import requestsurl https://www.qqtn.com/wm/meinvtp_1.html headers {user-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) …

[易语言]易语言部署yolox的onnx模型

【官方框架地址】 https://github.com/Megvii-BaseDetection/YOLOX 【算法介绍】 YOLOX是YOLO系列目标检测算法的进一步演变和优化。它由Megvii Technology的研究团队开发&#xff0c;是一个高性能、可扩展的对象检测器。YOLOX在保留快速处理速度的同时&#xff0c;通过引入一…

NX二次开发PK获取对象类型

PK_ENTITY_ask_class(),获取对象类型建议用这个函数&#xff0c;比较通用&#xff0c;包含所有对象类型&#xff0c;可以替代UF_MODL_ask_edge_type(),UF_MODL_ask_body_type(),UF_MODL_ask_face_type()等函数 PK_ENTITY_t entity; PK_CLASS_t PK_TYPE; PK_ENTITY_ask_class(e…

html的全选反选

一、实验题目 html实现选择框的全选和反选 二、实验代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>全选和反选</title></head><body><ul>兴趣爱好</ul><input id"all"…

怎么取消开机密码?4个必备方法!

“每次我开机都要输入密码&#xff0c;感觉有点麻烦&#xff0c;有什么方可以快速取消开机密码的吗&#xff1f;快给我推荐推荐吧&#xff01;” 为电脑设置开机密码&#xff0c;可以更好地保护电脑中的重要数据。但是用户需要在每次开机时都输入密码。这对于部分用户来说可能是…

【重明】机器视觉QT/C++实现工业相机二次开发框架

工业相机二次开发是机器视觉行业必不可少的技能之一。 而如何实现一个框架&#xff0c;能够兼容所有工业相机二次开发&#xff0c;从而支持多种类型的工业相机&#xff0c;就是机器视觉行业的进阶技能了。 重明工业相机二次开发项目就是在实现相机二开框架的基础上&#xff0c…

NetApp E系列(E-Series)OEM产品介绍以及如何收集日志和保存配置信息

NetApp E系列是NetApp收购LSI存储后建立的一条新的产品线&#xff0c;由于LSI存储的历史悠久&#xff0c;所以这条产品线给NetApp带来了很多的OEM产品&#xff0c;可以说E系列是世界上OEM给最多公司的存储产品线也不为过&#xff0c;因为最早LSI的产品销售测率就是OEM&#xff…

阿尔泰科技——PXIe8912/8914/8916高速数据采集卡

阿尔泰科技PXIe8912/8914/8916高速数据采集卡是2通道同步采样数字化仪&#xff0c;专为输入信号高达 100M 的高频和高动态范围的信号而设计。 与Labview无缝连接&#xff0c;提供图形化API函数。模拟输入范围可以通过软件编程设置为1V 或者5V。配备了容量高达 2GB的板载内存。…