-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathppo_apply.py
More file actions
121 lines (106 loc) · 4.64 KB
/
ppo_apply.py
File metadata and controls
121 lines (106 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import numpy as np
import argparse
import torch
import gymnasium as gym
print(f"gymnasium version: {gym.__version__}")
import torch.nn as nn
from torch.distributions.categorical import Categorical # 分布
'''
我的Agent继承自 nn.Module类, 所以这其中的方法都可以直接用, 比如load 训练好模型的函数:
agent.load_state_dict(torch.load('ppo_agent.pth'))
agent.eval(), disable Training mode, 而是开启 推理模式。两者在计算是有差别, 比如
Dropout层, BN层有差别
'''
class Agent(nn.Module):
def __init__(self, env):
super(Agent, self).__init__()
# *** 每一层都有参数, 这是要学习的参数
# 输入 env.single_observation_space.shape 的向量长度: 4
self.obs_size = np.array(env.observation_space.shape).prod()
# 输出 env.single_action_space.n 长度: 2
self.action_size = env.action_space.n
# *** PPO Agent 初始化 critic & actor
self.critic = nn.Sequential(
self.__layer(nn.Linear(self.obs_size, 64)),
nn.Tanh(),
self.__layer(nn.Linear(64, 64)),
nn.Tanh(),
self.__layer(nn.Linear(64, 1), std=1.0),
)
self.actor = nn.Sequential(
self.__layer(nn.Linear(self.obs_size, 64)),
nn.Tanh(),
self.__layer(nn.Linear(64, 64)),
nn.Tanh(),
self.__layer(nn.Linear(64, self.action_size), std=0.01),
)
# @staticmethod
def __layer(self, layer, std=np.sqrt(2), bias_const=0.0):
# 使用正交矩阵填充权重:正交初始化有助于保持激活值在网络中的传播过程中的方差不
# 变, 从而避免梯度消失或爆炸的问题;加速收敛
torch.nn.init.orthogonal_(layer.weight, std)
torch.nn.init.constant_(layer.bias, bias_const)
return layer
'''
这个 get_value() 返回到是什么东西? 根据:V(s_t) 是价值函数, 表示在状态 s_t 下的
期望回报。 即 Value 值
'''
def get_value(self, x):
return self.critic(x)
def get_action_and_value(self, x, action=None):
logits = self.actor(x)
probs = Categorical(logits=logits) # softmax 的概率输出
if action is None:
action = probs.sample() # 采样actions
return action, probs.log_prob(action), probs.entropy(), self.critic(x)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--max-episode-steps',
dest="max_episode_steps",
type=int, default=500,
help='number of steps in an episode')
args = parser.parse_args()
# 应用训练好的Agent
# 1. 加载训练好的 Agent
# 假设你已经将训练好的 Agent 保存到 'ppo_agent.pth' 文件中
# max_episode_steps 没有默认值, 即某人没有限制, Agent可以表现的非常好,
# 那么这个step可以很多, 变现出来就是video时长很长
env = gym.wrappers.TimeLimit(
gym.make("CartPole-v1",
render_mode="rgb_array"),
max_episode_steps=args.max_episode_steps
)
env = gym.wrappers.RecordVideo(env, "video-infer",
name_prefix="ppo_inference")
observation, info = env.reset() # 重置环境
print("Observation space shape:", env.observation_space.shape)
print("Observation space low:", env.observation_space.low)
print("Observation space high:", env.observation_space.high)
print("Action space n:", env.action_space.n)
agent = Agent(env=env)
agent.load_state_dict(torch.load('ppo_agent.pth'))
# 设置为评估模式, 等价于 self.training = False
agent.eval()
# print("agent.state_dict: ",agent.state_dict())
# 3. 运行 Agent
total_reward = 0
terminated = False
truncated = False
while not terminated and not truncated:
# 将 observation 转换为 PyTorch 张量
# 增加一个 batch 维度
obs_tensor = torch.Tensor(observation).unsqueeze(0)
# 使用 Actor 网络选择动作
with torch.no_grad(): # 关闭梯度计算
logits = agent.actor(obs_tensor)
probs = torch.nn.functional.softmax(logits, dim=-1)
# 采样动作
action = torch.multinomial(probs, num_samples=1).item()
# 执行动作, 获取新的 observation、奖励、done 等信息
observation, reward, terminated, truncated, info = env.step(action)
total_reward += reward
# 渲染环境
env.render()
# 打印总奖励
print(f"Total reward: {total_reward}")
env.close()