宇树RL代码分析

项目说明

整个项目目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
#unitree_rl_gym
#----isaacgym # NV Isaac Gym,老版本,已经被Isaac Lab取代
#----rsl_rl # 基于GPU的强化学习库
#----unitree_rl_gym # 宇树强化学习环境
#--------deploy
#--------doc
#----legged_gym # 宇树强化学习包
#--------envs # 强化学习环境定义:g1
#--------scripts # 训练及推理脚本
#--------utils # 其他辅助包
#----resources
#----setup.py

训练分析

启动训练命令为:

1
2
3
4
5
6
7
8
9
10
11
12
13
# `--task`: 必选参数,值可选(go2, g1, h1, h1_2)
# `--headless`: 默认启动图形界面,设为 true 时不渲染图形界面(效率更高)
# `--resume`: 从日志中选择 checkpoint 继续训练
# `--experiment_name`: 运行/加载的 experiment 名称
# `--run_name`: 运行/加载的 run 名称
# `--load_run`: 加载运行的名称,默认加载最后一次运行
# `--checkpoint`: checkpoint 编号,默认加载最新一次文件
# `--num_envs`: 并行训练的环境个数
# `--seed`: 随机种子
# `--max_iterations`: 训练的最大迭代次数
# `--sim_device`: 仿真计算设备,指定 CPU 为 `--sim_device=cpu`
# `--rl_device`: 强化学习计算设备,指定 CPU 为 `--rl_device=cpu`
python legged_gym/scripts/play.py --task=g1

此时传递的参数会被get_args()解析,并传递下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import numpy as np
from datetime import datetime
import sys

import isaacgym
from legged_gym.envs import *
from legged_gym.utils import get_args, task_registry
import torch

def train(args):
# 构建强化学习环境
env, env_cfg = task_registry.make_env(name=args.task, args=args)

# 构建强化学习算法
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args)

# 调用算法learn
ppo_runner.learn(num_learning_iterations=train_cfg.runner.max_iterations, init_at_random_ep_len=True)

if __name__ == '__main__':
args = get_args()
train(args)

我们的任务如”g1”,是在legged_gym包初始化脚本中注册的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from legged_gym import LEGGED_GYM_ROOT_DIR, LEGGED_GYM_ENVS_DIR

from legged_gym.envs.go2.go2_config import GO2RoughCfg, GO2RoughCfgPPO
from legged_gym.envs.h1.h1_config import H1RoughCfg, H1RoughCfgPPO
from legged_gym.envs.h1.h1_env import H1Robot
from legged_gym.envs.h1_2.h1_2_config import H1_2RoughCfg, H1_2RoughCfgPPO
from legged_gym.envs.h1_2.h1_2_env import H1_2Robot
from legged_gym.envs.g1.g1_config import G1RoughCfg, G1RoughCfgPPO
from legged_gym.envs.g1.g1_env import G1Robot
from .base.legged_robot import LeggedRobot

from legged_gym.utils.task_registry import task_registry

task_registry.register( "go2", LeggedRobot, GO2RoughCfg(), GO2RoughCfgPPO())
task_registry.register( "h1", H1Robot, H1RoughCfg(), H1RoughCfgPPO())
task_registry.register( "h1_2", H1_2Robot, H1_2RoughCfg(), H1_2RoughCfgPPO())

# 我们关注class G1Robot(LeggedRobot): unitree_rl_gym/legged_gym/envs/g1/g1_env.py
# 机器人关节配置class G1RoughCfg( LeggedRobotCfg ): unitree_rl_gym/legged_gym/envs/g1/g1_config.py
# 训练算法为class G1RoughCfgPPO( LeggedRobotCfgPPO ): unitree_rl_gym/legged_gym/envs/g1/g1_config.py
task_registry.register( "g1", G1Robot, G1RoughCfg(), G1RoughCfgPPO())

这里的注册函数,调用了任务注册模块,同时env创建、算法创建也在该模块中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# unitree_rl_gym/legged_gym/utils/task_registry.py

class TaskRegistry():
def __init__(self):
self.task_classes = {}
self.env_cfgs = {}
self.train_cfgs = {}

# 这里注册任务:传入任务名、任务类型、环境配置、PPO配置
# task_registry.register( "g1", G1Robot, G1RoughCfg(), G1RoughCfgPPO())
def register(self, name: str, task_class: VecEnv, env_cfg: LeggedRobotCfg, train_cfg: LeggedRobotCfgPPO):
self.task_classes[name] = task_class
self.env_cfgs[name] = env_cfg
self.train_cfgs[name] = train_cfg

在make_env()和make_alg_runner()分别创建训练环境,并且构建算法执行器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# unitree_rl_gym/legged_gym/utils/task_registry.py
class TaskRegistry():
def get_task_class(self, name: str) -> VecEnv:
return self.task_classes[name]

def get_cfgs(self, name) -> Tuple[LeggedRobotCfg, LeggedRobotCfgPPO]:
train_cfg = self.train_cfgs[name]
env_cfg = self.env_cfgs[name]
# copy seed
env_cfg.seed = train_cfg.seed
return env_cfg, train_cfg

def make_env(self, name, args=None, env_cfg=None) -> Tuple[VecEnv, LeggedRobotCfg]:
# if no args passed get command line arguments
if args is None:
args = get_args()
# check if there is a registered env with that name
if name in self.task_classes:
task_class = self.get_task_class(name)
else:
raise ValueError(f"Task with name: {name} was not registered")
if env_cfg is None:
# load config files
env_cfg, _ = self.get_cfgs(name)
# override cfg from args (if specified)
env_cfg, _ = update_cfg_from_args(env_cfg, None, args)
set_seed(env_cfg.seed)
# parse sim params (convert to dict first)
sim_params = {"sim": class_to_dict(env_cfg.sim)}
sim_params = parse_sim_params(args, sim_params)
# 这里相当于创建了G1Robot
env = task_class( cfg=env_cfg,
sim_params=sim_params,
physics_engine=args.physics_engine,
sim_device=args.sim_device,
headless=args.headless)
return env, env_cfg

def make_alg_runner(self, env, name=None, args=None, train_cfg=None, log_root="default") -> Tuple[OnPolicyRunner, LeggedRobotCfgPPO]:
# if no args passed get command line arguments
if args is None:
args = get_args()
# if config files are passed use them, otherwise load from the name
if train_cfg is None:
if name is None:
raise ValueError("Either 'name' or 'train_cfg' must be not None")
# load config files
_, train_cfg = self.get_cfgs(name)
else:
if name is not None:
print(f"'train_cfg' provided -> Ignoring 'name={name}'")
# override cfg from args (if specified)
_, train_cfg = update_cfg_from_args(None, train_cfg, args)

if log_root=="default":
log_root = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name)
log_dir = os.path.join(log_root, datetime.now().strftime('%b%d_%H-%M-%S') + '_' + train_cfg.runner.run_name)
elif log_root is None:
log_dir = None
else:
log_dir = os.path.join(log_root, datetime.now().strftime('%b%d_%H-%M-%S') + '_' + train_cfg.runner.run_name)

# 这里相当于创建了OnPolicyRunner,配置为G1RoughCfgPPO
train_cfg_dict = class_to_dict(train_cfg)
runner = OnPolicyRunner(env, train_cfg_dict, log_dir, device=args.rl_device)
#save resume path before creating a new log_dir
resume = train_cfg.runner.resume
if resume:
# load previously trained model
resume_path = get_load_path(log_root, load_run=train_cfg.runner.load_run, checkpoint=train_cfg.runner.checkpoint)
print(f"Loading model from: {resume_path}")
runner.load(resume_path)
return runner, train_cfg

# make global task registry
task_registry = TaskRegistry()

这样我们关注重点就来到了以下几个类:

  • VecEnv:我们向量化并行环境
  • OnPolicyRunner:在线强化学习算法类
  • LeggedRobotCfg:强化学习环境配置
  • LeggedRobotCfgPPO:PPO配置

强化学习环境

rsl-rl的VecEnv是并行化环境的抽象接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class VecEnv(ABC):
num_envs: int
num_obs: int
num_privileged_obs: int
num_actions: int
max_episode_length: int
privileged_obs_buf: torch.Tensor
obs_buf: torch.Tensor
rew_buf: torch.Tensor
reset_buf: torch.Tensor
episode_length_buf: torch.Tensor # current episode duration
extras: dict
device: torch.device
@abstractmethod
def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, Union[torch.Tensor, None], torch.Tensor, torch.Tensor, dict]:
pass
@abstractmethod
def reset(self, env_ids: Union[list, torch.Tensor]):
pass
@abstractmethod
def get_observations(self) -> torch.Tensor:
pass
@abstractmethod
def get_privileged_observations(self) -> Union[torch.Tensor, None]:
pass

我们的机器人任务,就是一个VecEnv(根据python鸭子类型):

1
2
3
4
5
class G1Robot(LeggedRobot):

class LeggedRobot(BaseTask):

class BaseTask(): 该类实现了VecEnv部分接口

TaskRegistry的make_env会创建G1Robot对象,其中G1Robot是一种VecEnv对象:

1
env, env_cfg = task_registry.make_env(name=args.task, args=args)

G1Robot没有定义自己构造函数,转接到LeggedRobot和BaseTask初始化训练环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class LeggedRobot(BaseTask):
def __init__(self, cfg: LeggedRobotCfg, sim_params, physics_engine, sim_device, headless):
# 1. 配置解析
self.cfg = cfg
self.sim_params = sim_params
self.height_samples = None
self.debug_viz = False
self.init_done = False
self._parse_cfg(self.cfg)

# 调用BaseTask.__init__,初始化仿真环境
super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless)

# 如果需要渲染,这里设置观测
if not self.headless:
self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat)
# 初始化状态等缓存
self._init_buffers()
self._prepare_reward_function()
self.init_done = True


class BaseTask():
def __init__(self, cfg, sim_params, physics_engine, sim_device, headless):
# isaac gym获取一个交互环境
self.gym = gymapi.acquire_gym()

self.sim_params = sim_params
self.physics_engine = physics_engine
self.sim_device = sim_device
sim_device_type, self.sim_device_id = gymutil.parse_device_str(self.sim_device)
self.headless = headless

# env device is GPU only if sim is on GPU and use_gpu_pipeline=True, otherwise returned tensors are copied to CPU by physX.
if sim_device_type=='cuda' and sim_params.use_gpu_pipeline:
self.device = self.sim_device
else:
self.device = 'cpu'

# graphics device for rendering, -1 for no rendering
self.graphics_device_id = self.sim_device_id
if self.headless == True:
self.graphics_device_id = -1

self.num_envs = cfg.env.num_envs
self.num_obs = cfg.env.num_observations
self.num_privileged_obs = cfg.env.num_privileged_obs
self.num_actions = cfg.env.num_actions

# optimization flags for pytorch JIT
torch._C._jit_set_profiling_mode(False)
torch._C._jit_set_profiling_executor(False)

# allocate buffers
self.obs_buf = torch.zeros(self.num_envs, self.num_obs, device=self.device, dtype=torch.float)
self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long)
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self.time_out_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)
if self.num_privileged_obs is not None:
self.privileged_obs_buf = torch.zeros(self.num_envs, self.num_privileged_obs, device=self.device, dtype=torch.float)
else:
self.privileged_obs_buf = None
# self.num_privileged_obs = self.num_obs

self.extras = {}

# 调用LeggedRobot创建仿真环境,这里不列出来代码,具体包括:
# 1. 定义z轴索引为2
# 2. self.gym.create_sim创建仿真环境
# 3. self.gym.add_ground创建地面
# 4. 创建训练环境
# 4.1. 加载机器人资源(设置机器人参数)
# 4.2. self.gym.load_asset加载机器人资源
# 4.3. 通过self.gym获取机器人机环境属性
# 4.4. self.gym.create_env根据预定数量创建isaacgym强化学习
# 4.5. self.gym.create_actor创建智能体
# 4.6. 设置智能体关节自由度
# 4.7. 设置智能体刚体属性
# 4.8. 保存环境和智能体
self.create_sim()
# 准备仿真
self.gym.prepare_sim(self.sim)

# todo: read from config
self.enable_viewer_sync = True
self.viewer = None

# if running with a viewer, set up keyboard shortcuts and camera
if self.headless == False:
# subscribe to keyboard shortcuts
self.viewer = self.gym.create_viewer(
self.sim, gymapi.CameraProperties())
self.gym.subscribe_viewer_keyboard_event(
self.viewer, gymapi.KEY_ESCAPE, "QUIT")
self.gym.subscribe_viewer_keyboard_event(
self.viewer, gymapi.KEY_V, "toggle_viewer_sync")

在创建完成环境后,代码紧接着创建了OnPolicyRunner,该类是PPO算法封装:

1
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args)

OnPolicyRunner在构造时,会创建ActorCritic并基于此,创建PPO算法类,该类定义于rsl_rl中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 注意这里引入了PPO算法,下面会根据名称创建这个类(有点多此一举)
from rsl_rl.algorithms import PPO
from rsl_rl.modules import ActorCritic, ActorCriticRecurrent
from rsl_rl.env import VecEnv

class OnPolicyRunner:
def __init__(self,
env: VecEnv,
train_cfg,
log_dir=None,
device='cpu'):
# 从配置中拿到训练配置
self.cfg=train_cfg["runner"]
self.alg_cfg = train_cfg["algorithm"]
self.policy_cfg = train_cfg["policy"]
self.device = device
self.env = env
if self.env.num_privileged_obs is not None:
num_critic_obs = self.env.num_privileged_obs
else:
num_critic_obs = self.env.num_obs

# 策略类:ActorCritic创建
actor_critic_class = eval(self.cfg["policy_class_name"]) # ActorCritic
actor_critic: ActorCritic = actor_critic_class( self.env.num_obs,
num_critic_obs,
self.env.num_actions,
**self.policy_cfg).to(self.device)
# 算法类:PPO创建
alg_class = eval(self.cfg["algorithm_class_name"]) # PPO
self.alg: PPO = alg_class(actor_critic, device=self.device, **self.alg_cfg)
self.num_steps_per_env = self.cfg["num_steps_per_env"]
self.save_interval = self.cfg["save_interval"]

# 这一这里,创建的环境数量由这个函数初始化
self.alg.init_storage(self.env.num_envs, self.num_steps_per_env, [self.env.num_obs], [self.env.num_privileged_obs], [self.env.num_actions])

# Log
self.log_dir = log_dir
self.writer = None
self.tot_timesteps = 0
self.tot_time = 0
self.current_learning_iteration = 0
# 环境复位
_, _ = self.env.reset()

初始化PPO算法所需要的配置定义在g1_config.py中,Actor和Critic都使用LSTM神经网络,策略类为ActorCriticRecurrent(调用torch定义LSTM):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class G1RoughCfgPPO( LeggedRobotCfgPPO ):
class policy:
init_noise_std = 0.8 # 噪声
actor_hidden_dims = [32] # 好像并没有用
critic_hidden_dims = [32] # 好像并没有用
activation = 'elu' # can be elu, relu, selu, crelu, lrelu, tanh, sigmoid
# only for 'ActorCriticRecurrent':
rnn_type = 'lstm' # 使用LSTM
rnn_hidden_size = 64 # 隐藏层大小64
rnn_num_layers = 1 # 只有一层

class algorithm( LeggedRobotCfgPPO.algorithm ):
entropy_coef = 0.01

class runner( LeggedRobotCfgPPO.runner ):
policy_class_name = "ActorCriticRecurrent"
max_iterations = 10000 # 训练最大迭代10000
run_name = ''
experiment_name = 'g1'

在环境和算法都构造完毕后,程序可以开始训练过程:

1
ppo_runner.learn(num_learning_iterations=train_cfg.runner.max_iterations, init_at_random_ep_len=True)

以下是强化学习训练代码,这里我直接在代码中解释(==TODO:actions观测不明确==):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def learn(self, num_learning_iterations, init_at_random_ep_len=False):
# 创建logger记录器
# 在训练过程中输出训练实时loss等
if self.log_dir is not None and self.writer is None:
self.writer = SummaryWriter(log_dir=self.log_dir, flush_secs=10)

# 外部传递参数init_at_random_ep_len=True,则回合缓存长度会随机
if init_at_random_ep_len:
self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
# 注意这里获取观测,观测是在 LeggedRobot.compute_observations()计算返回一个torch.Tensor
# 1. Base的线速度(缩放)
# 2. Base的角速度(缩放)
# 3. 投射到Base的重力
# 4. 外部速度
# 5. 关节角度(缩放)
# 6. 关节角速度(缩放)
# 7. actions
# 可以通过配置给观测加噪声
obs = self.env.get_observations()
privileged_obs = self.env.get_privileged_observations()
critic_obs = privileged_obs if privileged_obs is not None else obs

# 把观测向量迁移到指定设备,并切换到训练模式(pytorch需要)
obs, critic_obs = obs.to(self.device), critic_obs.to(self.device)
self.alg.actor_critic.train() # switch to train mode (for dropout for example)

ep_infos = []
rewbuffer = deque(maxlen=100)
lenbuffer = deque(maxlen=100)
cur_reward_sum = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)
cur_episode_length = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)

tot_iter = self.current_learning_iteration + num_learning_iterations
for it in range(self.current_learning_iteration, tot_iter):
start = time.time()
# 与环境进行交互,获取奖励和下一个状态
with torch.inference_mode():
for i in range(self.num_steps_per_env): # num_steps_per_env = 24
# 从算法中获取一个动作,内部根据策略网络获取动作,根据价值网络获取评估动作价值
actions = self.alg.act(obs, critic_obs)
# 与环境交互,产生下一步观测、奖励、是否结束
# LeggedRobot.step():
# 1. 调用self.render(),利用gym渲染场景
# 2. 执行仿真,每个policy dt=4*sim dt
# 2.1. 调用self._compute_torques(),如果是位置控制,则使用PD计算关节力矩,并clip
# 2.2. 调用self.gym.set_dof_actuation_force_tensor设置到机器人仿真环境
# 2.3. 调用self.gym.simulate(self.sim)执行仿真
# 2.4. 调用self.gym.refresh_dof_state_tensor刷新状态
# 3. 仿真后,执行后处理操作
# 3.1. 调用self.gym.refresh_actor_root_state_tensor和self.gym.refresh_net_contact_force_tensor刷新状态
# 3.2. 将内部状态,转换为位置、线速度、rpy等可读状态
# 3.3. 在计算奖励之前,随机生成vel_x,vel_y,ang_vel_yaw
# 3.4. self.check_termination()检查是不是需要停止,例如角度过大快要摔倒等
# 3.5. self.compute_reward()计算奖励:应用奖励函数,并追加终止奖励。
# 3.5. self._push_robots()如果需要,可以随机推移机器人
# 3.6. self.compute_observations()观测更新
obs, privileged_obs, rewards, dones, infos = self.env.step(actions)
critic_obs = privileged_obs if privileged_obs is not None else obs
# self.storage.add_transitions获取奖励,并存储transition
obs, critic_obs, rewards, dones = obs.to(self.device), critic_obs.to(self.device), rewards.to(self.device), dones.to(self.device)
self.alg.process_env_step(rewards, dones, infos)

if self.log_dir is not None:
# Book keeping
if 'episode' in infos:
ep_infos.append(infos['episode'])
cur_reward_sum += rewards
cur_episode_length += 1
new_ids = (dones > 0).nonzero(as_tuple=False)
rewbuffer.extend(cur_reward_sum[new_ids][:, 0].cpu().numpy().tolist())
lenbuffer.extend(cur_episode_length[new_ids][:, 0].cpu().numpy().tolist())
cur_reward_sum[new_ids] = 0
cur_episode_length[new_ids] = 0

stop = time.time()
collection_time = stop - start

# 计算折扣回报
start = stop
self.alg.compute_returns(critic_obs)

# 执行PPO算法更新
mean_value_loss, mean_surrogate_loss = self.alg.update()
stop = time.time()
learn_time = stop - start
if self.log_dir is not None:
self.log(locals())
if it % self.save_interval == 0:
self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(it)))
ep_infos.clear()

self.current_learning_iteration += num_learning_iterations
self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))

环境提供的奖励,定义于LeggedRobot,代码通过python技巧,获取了该类中所有_reward_xxx的奖励函数,这些奖励函数通过鼓励或者惩罚特定的属性,指导机器人能够按照预期运动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#------------ reward functions----------------
def _reward_lin_vel_z(self):
# Penalize z axis base linear velocity
return torch.square(self.base_lin_vel[:, 2])

def _reward_ang_vel_xy(self):
# Penalize xy axes base angular velocity
return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1)

def _reward_orientation(self):
# Penalize non flat base orientation
return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1)

def _reward_base_height(self):
# Penalize base height away from target
base_height = self.root_states[:, 2]
return torch.square(base_height - self.cfg.rewards.base_height_target)

def _reward_torques(self):
# Penalize torques
return torch.sum(torch.square(self.torques), dim=1)

def _reward_dof_vel(self):
# Penalize dof velocities
return torch.sum(torch.square(self.dof_vel), dim=1)

def _reward_dof_acc(self):
# Penalize dof accelerations
return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1)

def _reward_action_rate(self):
# Penalize changes in actions
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)

def _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)

def _reward_termination(self):
# Terminal reward / penalty
return self.reset_buf * ~self.time_out_buf

def _reward_dof_pos_limits(self):
# Penalize dof positions too close to the limit
out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.)
return torch.sum(out_of_limits, dim=1)

def _reward_dof_vel_limits(self):
# Penalize dof velocities too close to the limit
# clip to max error = 1 rad/s per joint to avoid huge penalties
return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0., max=1.), dim=1)

def _reward_torque_limits(self):
# penalize torques too close to the limit
return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1)

def _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma)

def _reward_tracking_ang_vel(self):
# Tracking of angular velocity commands (yaw)
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma)

def _reward_feet_air_time(self):
# Reward long steps
# Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
contact_filt = torch.logical_or(contact, self.last_contacts)
self.last_contacts = contact
first_contact = (self.feet_air_time > 0.) * contact_filt
self.feet_air_time += self.dt
rew_airTime = torch.sum((self.feet_air_time - 0.5) * first_contact, dim=1) # reward only on first contact with the ground
rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command
self.feet_air_time *= ~contact_filt
return rew_airTime

def _reward_stumble(self):
# Penalize feet hitting vertical surfaces
return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\
5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1)

def _reward_stand_still(self):
# Penalize motion at zero commands
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1)

def _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)

以上大费周章就是为了和环境进行交互,从而获取奖励、计算回报,并更新机器人状态。与环境交互的数据,可以更新到机器人PPO算法,接下来我们直接看下PPO算法更新实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import torch
import torch.nn as nn
import torch.optim as optim

from rsl_rl.modules import ActorCritic
from rsl_rl.storage import RolloutStorage

class PPO:
actor_critic: ActorCritic
def __init__(self,
actor_critic,
num_learning_epochs=1,
num_mini_batches=1,
clip_param=0.2,
gamma=0.998,
lam=0.95,
value_loss_coef=1.0,
entropy_coef=0.0,
learning_rate=1e-3,
max_grad_norm=1.0,
use_clipped_value_loss=True,
schedule="fixed",
desired_kl=0.01,
device='cpu',
):

self.device = device

self.desired_kl = desired_kl
self.schedule = schedule
self.learning_rate = learning_rate

# PPO components
self.actor_critic = actor_critic
self.actor_critic.to(self.device)
self.storage = None # initialized later
self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=learning_rate)
self.transition = RolloutStorage.Transition()

# PPO parameters
self.clip_param = clip_param
self.num_learning_epochs = num_learning_epochs
self.num_mini_batches = num_mini_batches
self.value_loss_coef = value_loss_coef
self.entropy_coef = entropy_coef
self.gamma = gamma
self.lam = lam
self.max_grad_norm = max_grad_norm
self.use_clipped_value_loss = use_clipped_value_loss

def update(self):
mean_value_loss = 0
mean_surrogate_loss = 0

# 根据是否RNN,选择不同的mini-batch生成器
if self.actor_critic.is_recurrent:
generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)
else:
generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)

# 对每一个mini-batch
for obs_batch, critic_obs_batch, actions_batch, target_values_batch, advantages_batch, returns_batch, old_actions_log_prob_batch, \
old_mu_batch, old_sigma_batch, hid_states_batch, masks_batch in generator:

# 从策略网络采样获取动作Action,并评估State-Value
self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0])
actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch)
value_batch = self.actor_critic.evaluate(critic_obs_batch, masks=masks_batch, hidden_states=hid_states_batch[1])
mu_batch = self.actor_critic.action_mean
sigma_batch = self.actor_critic.action_std
entropy_batch = self.actor_critic.entropy

# 自适应KL
if self.desired_kl != None and self.schedule == 'adaptive':
with torch.inference_mode():
kl = torch.sum(
torch.log(sigma_batch / old_sigma_batch + 1.e-5) + (torch.square(old_sigma_batch) + torch.square(old_mu_batch - mu_batch))
/ (2.0 * torch.square(sigma_batch)) - 0.5, axis=-1)
kl_mean = torch.mean(kl)
# 根据KL调整学习率
if kl_mean > self.desired_kl * 2.0:
self.learning_rate = max(1e-5, self.learning_rate / 1.5)
elif kl_mean < self.desired_kl / 2.0 and kl_mean > 0.0:
self.learning_rate = min(1e-2, self.learning_rate * 1.5)

for param_group in self.optimizer.param_groups:
param_group['lr'] = self.learning_rate


# Surrogate loss
# 重要性采样比例
ratio = torch.exp(actions_log_prob_batch - torch.squeeze(old_actions_log_prob_batch))
# 计算截断和非截断下重要性采样*优势函数
surrogate = -torch.squeeze(advantages_batch) * ratio
surrogate_clipped = -torch.squeeze(advantages_batch) * torch.clamp(ratio, 1.0 - self.clip_param,
1.0 + self.clip_param)
# 由于添加了符号,所以这里找到的是最大
surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()

# Value function loss
if self.use_clipped_value_loss:
value_clipped = target_values_batch + (value_batch - target_values_batch).clamp(-self.clip_param,
self.clip_param)
value_losses = (value_batch - returns_batch).pow(2)
value_losses_clipped = (value_clipped - returns_batch).pow(2)
value_loss = torch.max(value_losses, value_losses_clipped).mean()
else:
value_loss = (returns_batch - value_batch).pow(2).mean()
# 计算策略梯度的loss
loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean()

# 梯度更新反向传播,注意torch是梯度下降,所以前面去了负号
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm)
self.optimizer.step()

mean_value_loss += value_loss.item()
mean_surrogate_loss += surrogate_loss.item()

num_updates = self.num_learning_epochs * self.num_mini_batches
mean_value_loss /= num_updates
mean_surrogate_loss /= num_updates
self.storage.clear()

return mean_value_loss, mean_surrogate_loss

宇树RL代码分析
https://ziqingdream.github.io/2025/03/10/-1-机器人技术栈/-2-强化学习/【0】宇树RL代码分析/
作者
工程课代表[B站] / ZiQingDream[Github]
发布于
2025年3月10日
许可协议