强化学习_06_pytorch-TD3实践(CarRacing-v2)

0、TD3算法原理简介

详见笔者前一篇实践强化学习_06_pytorch-TD3实践(BipedalWalkerHardcore-v3)

1、CarRacing环境观察及调整

Action SpaceBox([-1. 0. 0.], 1.0, (3,), float32)
Observation SpaceBox(0, 255, (96, 96, 3), uint8)

动作空间是[-1~1, 0~1, 0~1], 状态空间是 96 × 96 × 3 96\times96\times3 96×96×3 的图片。

1.1 图片裁剪及跳帧

环境初始的时候有40-50帧是没有意义的,可能还会影响模型训练。同时图片下面黑色部分也是没有太多意义,所以可以直接对图片截取s = s[:84, 6:90]
在这里插入图片描述

对环境进行简单观察会发现,一个step是一帧,一帧很难捕捉动作产生的影响(移动量,奖励等)。所以我们进行跳帧观察(1个action进行n个step,期间累计奖励),从红线看,每隔5帧已经可以看出小车在移动。
在这里插入图片描述

1.2 车驶离赛道判断 & reward调整

我们可以看出在gymnasiumCarRacing-V2连续的环境中没有驶出赛道终止的设定,所以我们可以基于像素进行判断是否驶离赛道。观察三个channel,我们可以看出在第二个channel中可以基于大约75行左右的一行像素进行是否行驶出去的判断
经过试验我们可以直接用s[75, 35:50, 1] 前2个和后2个像素点来判断是否行驶到赛道外。
在这里插入图片描述

    def judge_out_of_route(self, obs):s = obs[:84, 6:90, :]out_sum = (s[75, 35:48, 1][:2] > 200).sum() + (s[75, 35:48, 1][-2:] > 200).sum()return out_sum == 4

在加入了是否行驶到赛道外的判断后,如果判断出了赛道则reward=-10

1.4 对多个输出进行通道叠加FrameStack

进行跳帧可以看出车辆的移动,但是只有多张的连续输入,CNN才能感知连续的动作。所以我们这两将4次跳帧组成一个observe,即最终20个step返回一个observe和叠加reward
在这里插入图片描述

1.5 最终环境构建python code

import gymnasium as gym
import torch
import numpy as np
from torchvision import transforms
from gymnasium.spaces import Box
from gymnasium.wrappers import FrameStackclass CarV2SkipFrame(gym.Wrapper):def __init__(self, env, skip: int):"""skip frameArgs:env (_type_): _description_skip (int): skip frames"""super().__init__(env)self._skip = skipdef step(self, action):tt_reward_list = []done = Falsetotal_reward = 0for i in range(self._skip):obs, reward, done, info, _ = self.env.step(action)out_done = self.judge_out_of_route(obs)done_f = done or out_donereward = -10 if out_done else reward# reward = -100 if out_done else reward# reward = reward * 10 if reward > 0 else rewardtotal_reward += rewardtt_reward_list.append(reward)if done_f:breakreturn obs[:84, 6:90, :], total_reward, done_f, info, _def judge_out_of_route(self, obs):s = obs[:84, 6:90, :]out_sum = (s[75, 35:48, 1][:2] > 200).sum() + (s[75, 35:48, 1][-2:] > 200).sum()return out_sum == 4def reset(self, seed=0, options=None):s, info = self.env.reset(seed=seed, options=options)# steering  gas  breakinga = np.array([0.0, 0.0, 0.0])for i in range(45):obs, reward, done, info, _ = self.env.step(a)return obs[:84, 6:90, :], infoclass SkipFrame(gym.Wrapper):def __init__(self, env, skip: int):"""skip frameArgs:env (_type_): _description_skip (int): skip frames"""super().__init__(env)self._skip = skipdef step(self, action):total_reward = 0.0done = Falsefor _ in range(self._skip):obs, reward, done, info, _ = self.env.step(action)total_reward += rewardif done:breakreturn obs, total_reward, done, info, _class GrayScaleObservation(gym.ObservationWrapper):def __init__(self, env):"""RGP -> Gray(high, width, channel) -> (1, high, width) """super().__init__(env)self.observation_space = Box(low=0, high=255, shape=self.observation_space.shape[:2], dtype=np.uint8)def observation(self, observation):tf = transforms.Grayscale()# channel firstreturn tf(torch.tensor(np.transpose(observation, (2, 0, 1)).copy(), dtype=torch.float))class ResizeObservation(gym.ObservationWrapper):def __init__(self, env, shape: int):"""reshape observeArgs:env (_type_): _description_shape (int): reshape size"""super().__init__(env)self.shape = (shape, shape)obs_shape = self.shape + self.observation_space.shape[2:]self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)def observation(self, observation):#  Normalize -> input[channel] - mean[channel]) / std[channel]transformations = transforms.Compose([transforms.Resize(self.shape), transforms.Normalize(0, 255)])return transformations(observation).squeeze(0)env_name = 'CarRacing-v2'
env = gym.make(env_name)
SKIP_N = 5
STACK_N = 4
env_ = FrameStack(ResizeObservation(GrayScaleObservation(CarV2SkipFrame(env, skip=SKIP_N)), shape=84), num_stack=STACK_N
)

二、智能体构建

因为是用的CNN,所以需要注意梯度消失的问题。

2.1 actor

主要架构就是CNN + MLP + maxMinScale

  • CNN: 因为环境比较简单第一层用MaxPool2d采样,第二层进行AvgPool2d平滑
    nn.Sequential(nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),nn.ReLU(),nn.MaxPool2d(2, 2, 0),nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),nn.ReLU(),nn.AvgPool2d(2, 2, 0),nn.Flatten()
    )
    
  • MLP
    • 对cnn提取的特征进行 LayerNorm (一定程度干预梯度消失)
    • 对最后层全连接层的输出进行 LayerNorm (一定程度干预梯度消失)
  • maxMinScale
    • 最后通过tanh激活层action全部归一化到[-1,1]之间
    • 基于环境的动作上线限,用maxMinScale方式将最终的输出映射到[动作下限,动作上限]

actor 网络

class TD3CNNPolicyNet(nn.Module):"""输入state, 输出action"""def __init__(self, state_dim: int, hidden_layers_dim: typ.List, action_dim: int, action_bound: typ.Union[float, gym.Env]=1.0, state_feature_share: bool=False):super(TD3CNNPolicyNet, self).__init__()self.state_feature_share = state_feature_shareself.low_high_flag = hasattr(action_bound, "action_space")print('action_bound=',action_bound)self.action_bound = action_boundif self.low_high_flag:self.action_high = torch.FloatTensor(action_bound.action_space.low)self.action_low = torch.FloatTensor(action_bound.action_space.high)self.cnn_feature = nn.Sequential(nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),nn.ReLU(),nn.MaxPool2d(2, 2, 0),nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),nn.ReLU(),nn.AvgPool2d(2, 2, 0),nn.Flatten())self.cnn_out_ln = nn.LayerNorm([512])self.features = nn.ModuleList()for idx, h in enumerate(hidden_layers_dim):self.features.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),'linear_action': nn.ReLU()}))self.fc_out = nn.Linear(hidden_layers_dim[-1], action_dim)self.final_ln = nn.LayerNorm([action_dim])def max_min_scale(self, act):"""X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))X_scaled = X_std * (max - min) + min"""# print("max_min_scale(", act, ")")device_ = act.deviceaction_range = self.action_high.to(device_) - self.action_low.to(device_)act_std = (act - -1.0) / 2.0return act_std * action_range.to(device_) + self.action_low.to(device_)def forward(self, state):if len(state.shape) == 3:state = state.unsqueeze(0)try:x = self.cnn_feature(state)except Exception as e:print(state.shape)state = state.permute(0, 3, 1, 2)x = self.cnn_feature(state)x = self.cnn_out_ln(x)for layer in self.features:x = layer['linear_action'](layer['linear'](x))device_ = x.deviceif self.low_high_flag:return self.max_min_scale(torch.tanh(self.final_ln(self.fc_out(x))))return torch.tanh(self.final_ln(self.fc_out(x)).clip(-6.0, 6.0)) * self.action_bound

2.2 critic

  • CNN: 设计同Actor
  • concat状态和action
    • 进行observe和action concat 之前对action进行线性变换(一定程度解决梯度消失 及 原地转圈)
class TD3CNNValueNet(nn.Module):"""输入[state, cation], 输出value"""def __init__(self, state_dim: int, action_dim: int, hidden_layers_dim: typ.List, state_feature_share=False):super(TD3CNNValueNet, self).__init__()self.state_feature_share = state_feature_shareself.q1_cnn_feature = nn.Sequential(nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),nn.ReLU(),nn.MaxPool2d(2, 2, 0),nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),nn.ReLU(),nn.AvgPool2d(2, 2, 0),nn.Flatten())self.q2_cnn_feature = nn.Sequential(nn.Conv2d(in_channels=4, out_channels=16, kernel_size=4, stride=2),nn.ReLU(),nn.MaxPool2d(2, 2, 0),nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),nn.ReLU(),nn.AvgPool2d(2, 2, 0),nn.Flatten())self.features_q1 = nn.ModuleList()self.features_q2 = nn.ModuleList()for idx, h in enumerate(hidden_layers_dim + [action_dim]):self.features_q1.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),'linear_activation': nn.ReLU()}))self.features_q2.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else 512, h),'linear_activation': nn.ReLU()}))self.act_q1_fc = nn.Linear(action_dim, action_dim)self.act_q2_fc = nn.Linear(action_dim, action_dim)self.head_q1_bf = nn.Linear(action_dim * 2, action_dim)self.head_q2_bf = nn.Linear(action_dim * 2, action_dim)self.head_q1 = nn.Linear(action_dim, 1)self.head_q2 = nn.Linear(action_dim, 1)def forward(self, state, action):if len(state.shape) == 3:state = state.unsqueeze(0)try:x1 = self.q1_cnn_feature(state)x2 = self.q2_cnn_feature(state)except Exception as e:state = state.permute(0, 3, 1, 2)x1 = self.q1_cnn_feature(state)x2 = self.q2_cnn_feature(state)for layer1, layer2 in zip(self.features_q1, self.features_q2):x1 = layer1['linear_activation'](layer1['linear'](x1))x2 = layer2['linear_activation'](layer2['linear'](x2))# 拼接状态和动作act1 = torch.relu(self.act_q1_fc(action.float()))act2 = torch.relu(self.act_q2_fc(action.float()))x1 = torch.relu( self.head_q1_bf(torch.cat([x1, act1], dim=-1).float()))# print("torch.cat([x1, action], dim=-1)=", torch.cat([x1, act1], dim=-1)[:5, :])x2 = torch.relu( self.head_q2_bf(torch.cat([x2, act2], dim=-1).float()))return self.head_q1(x1), self.head_q2(x2)def Q1(self, state, action):if len(state.shape) == 3:state = state.unsqueeze(0)try:x = self.q1_cnn_feature(state)except Exception as e:state = state.permute(0, 3, 1, 2)x = self.q1_cnn_feature(state)for layer in self.features_q1:x = layer['linear_activation'](layer['linear'](x))# 拼接状态和动作act1 = torch.relu(self.act_q1_fc(action.float()))x = torch.relu( self.head_q1_bf(torch.cat([x, act1], dim=-1).float()))return self.head_q1(x) 

2.3 TD3算法简单调整

  1. policy_noise: 分布调整为(mean=0, std=每个维度动作范围) * self.policy_noise
  2. expl_noise: 分布调整为(mean=0, std=每个维度动作范围) * self.train_noise

3、训练

整体训练脚本可以看笔者的github test_TD3.py : CarRacing_TD3_test()

  1. 对训练做了一些调整: 在训练的过程中增加测试阶段:每隔test_ep_freq进行测试
  2. 基于多次测试的奖励均值进行最佳模型参数保存
def CarRacing_TD3_test():env_name = 'CarRacing-v2'gym_env_desc(env_name)env = gym.make(env_name)env = FrameStack(ResizeObservation(GrayScaleObservation(CarV2SkipFrame(env, skip=5)), shape=84), num_stack=4)print("gym.__version__ = ", gym.__version__ )path_ = os.path.dirname(__file__)cfg = Config(env, # 环境参数save_path=os.path.join(path_, "test_models" ,'TD3_CarRacing-v2_test2-3'), seed=42,# 网络参数actor_hidden_layers_dim=[128], # 256critic_hidden_layers_dim=[128],# agent参数actor_lr=2.5e-4, #5.5e-5,critic_lr=1e-3, #7.5e-4,  gamma=0.99,# 训练参数num_episode=15000,sample_size=128,# 环境复杂多变,需要保存多一些bufferoff_buffer_size=1024*100,  off_minimal_size=256,max_episode_rewards=50000,max_episode_steps=1200, # 200# agent 其他参数TD3_kwargs={'CNN_env_flag': 1,'pic_shape': env.observation_space.shape,"env": env,'action_low': env.action_space.low,'action_high': env.action_space.high,# soft update parameters'tau': 0.05, # trick2: Delayed Policy Update'delay_freq': 1,# trick3: Target Policy Smoothing'policy_noise': 0.2,'policy_noise_clip': 0.5,# exploration noise'expl_noise': 0.5,# 探索的 noise 指数系数率减少 noise = expl_noise * expl_noise_exp_reduce_factor^t'expl_noise_exp_reduce_factor':  1 - 1e-4})agent = TD3(state_dim=cfg.state_dim,actor_hidden_layers_dim=cfg.actor_hidden_layers_dim,critic_hidden_layers_dim=cfg.critic_hidden_layers_dim,action_dim=cfg.action_dim,actor_lr=cfg.actor_lr,critic_lr=cfg.critic_lr,gamma=cfg.gamma,TD3_kwargs=cfg.TD3_kwargs,device=cfg.device)agent.train()train_off_policy(env, agent, cfg, done_add=False, train_without_seed=True, wandb_flag=False, test_ep_freq=100)agent.load_model(cfg.save_path)agent.eval()env = gym.make(env_name, render_mode='human') # env = FrameStack(ResizeObservation(GrayScaleObservation(CarV2SkipFrame(env, skip=5)), shape=84), num_stack=4)play(env, agent, cfg, episode_count=2)

4、训练结果观察及后续工作

由于上传大小限制5MB, 所以对较多直线部分进行了裁剪

最终训练的时候发现会突然陷入低分状态,可以考虑间隔n(可以设置较大比如2000)个episode和最佳的reward比较,分数低于x%个百分点,就重新载入最佳参数,以继续训练。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/300722.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java新纪元:一探JDK 15的全新特性

欢迎来到我的博客,代码的世界里,每一行都是一个故事 Java新纪元:一探JDK 15的全新特性 前言Sealed ClassesSealed Classes的基本概念:Sealed Classes的优势:使用Sealed Classes的示例:注意事项:…

格密码:傅里叶矩阵

目录 一. 铺垫性介绍 1.1 傅里叶级数 1.2 傅里叶矩阵的来源 二. 格基与傅里叶矩阵 2.1 傅里叶矩阵详细解释 2.2 格基与傅里叶矩阵 写在前面:有关傅里叶变换的解释太多了,这篇博客主要总结傅里叶矩阵在格密码中的运用。对于有一定傅里叶变换基础的同…

Github远程仓库操作指南

目录 1 前言2 创建远程仓库3 创建远程仓库别名4 推送本地分支到远程仓库4.1 提交本地库4.2 远程仓库设置别名4.3 确认分支4.4 推送远程库 5 拉取远程库到本地5.1 确认拉取分支5.2 拉取到本地 6 克隆远程库到本地7 结语 1 前言 在软件开发过程中,使用Git来管理代码是…

C语言学习day10:while语句

while语句属于循环结构&#xff1b; while语句运行图&#xff1a; while语句表达式&#xff1a; while (表达式) {} 代码&#xff1a; int main() {//while (表达式) {//}int i 0;//死循环while (i < 10){printf("%d\n",i);i;}system("pause");ret…

SpringBoot整合JWT+Spring Security+Redis实现登录拦截(二)权限认证

上篇博文中我们已经实现了登录拦截&#xff0c;接下来我们继续补充代码&#xff0c;实现权限的认证 一、RBAC权限模型 什么事RBAC权限模型&#xff1f; RBAC权限模型&#xff08;Role-Based Access Control&#xff09;即&#xff1a;基于角色的权限访问控制。在RBAC中&#x…

分别使用OVP-UVP和OFP-UFP算法以及AFD检测算法实现反孤岛检测simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 OVP-UVP算法 4.2 OFP-UFP算法 4.3 AFD检测算法 5.完整工程文件 1.课题概述 分别使用OVP-UVP和OFP-UFP算法以及AFD检测算法实现反孤岛检测simulink建模与仿真。 2.系统仿真结果 3.核心程序与模型…

金和OA C6 gethomeinfo sql注入漏洞

产品介绍 金和网络是专业信息化服务商,为城市监管部门提供了互联网监管解决方案,为企事业单位提供组织协同OA系统开发平台,电子政务一体化平台,智慧电商平台等服务。 漏洞概述 金和 OA C6 gethomeinfo接口处存在SQL注入漏洞&#xff0c;攻击者除了可以利用 SQL 注入漏洞获取…

顺序表的基本操作(必学)

目录 线性表&#xff1a; 顺序表&#xff1a; 概念和结构&#xff1a; 动态顺序表常用操作实现&#xff1a; 头文件&#xff08;数组顺序表的声明&#xff09;&#xff1a; 各种基本操作总的声明&#xff1a; 顺序表的初始化&#xff1a; 顺序表的销毁 顺序表的打印 …

【中小型企业网络实战案例 二】配置网络互连互通

​【中小型企业网络实战案例 一】规划、需求和基本配置-CSDN博客 热门IT技术视频教程&#xff1a;https://xmws-it.blog.csdn.net/article/details/134398330?spm1001.2014.3001.5502 配置接入层交换机 1.以接入交换机ACC1为例&#xff0c;创建ACC1的业务VLAN 10和20。 <…

因吹斯汀!只需上传照片,GPT-4V精准识别食物的卡路里和摄入热量

健身和减肥的朋友有福啦&#xff01; 最近一篇文章探索了GPT-4V在膳食评估领域的强大能力&#xff0c;可以根据饮食图片精准判断食物的种类与重量&#xff0c;并给出营养成分的分析&#xff0c;包括碳水化合物、蛋白质、脂肪占比。 最最重要的是&#xff0c;它还能告诉我们这…

c语言的初始学习(练习)

##初学c语言---MOOC浙江大学翁恺先生学习c语言 那么我们先看看这个题目吧&#xff0c;这是初始语法的应用。 记住&#xff0c;我们的程序是按步骤执行的&#xff0c;并不是在不同的两行同时进行。 程序设计&#xff1a;1.了解题目的需要&#xff0c;几个变量需要用到&#x…

App应用如何在应用市场获得更多下载量?

App的转化率至关重要&#xff0c;App如何获得更多用户&#xff0c;提高应用的下载量&#xff1f; 据 Apple 称&#xff0c;每周有 6.5亿访问者访问应用商店&#xff0c;77%的应用下载来自 iOS 应用商店的自然搜索。随着 Apple 默认关闭了IDFA&#xff0c;自然搜索比以往任何时…