-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplay.py
More file actions
executable file
·206 lines (156 loc) · 6.61 KB
/
play.py
File metadata and controls
executable file
·206 lines (156 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to play and evaluate a trained policy from robomimic."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Evaluate robomimic policy for Isaac Lab environment.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--task", type=str, default=None, help="Overwrite model task.")
parser.add_argument("--checkpoint", type=str, default=None, help="Pytorch model checkpoint to load.")
parser.add_argument("--horizon", type=int, default=300, help="Step horizon of each rollout.")
parser.add_argument("--num_rollouts", type=int, default=1, help="Number of rollouts.")
parser.add_argument("--seed", type=int, default=101, help="Random seed.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of parallel environments.")
parser.add_argument(
"--num_success_steps",
type=int,
default=1,
help="Number of continuous steps with task success for concluding a demo as successful. Default is 1.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
from typing import TYPE_CHECKING
import dill
import gymnasium as gym
import hydra
import numpy as np
import robomimic.utils.torch_utils as TorchUtils
import torch
from diffusion_policy.common.pytorch_util import dict_apply
from isaaclab_tasks.utils import parse_env_cfg
import isaac_imitation_learning.tasks # noqa: F401
if TYPE_CHECKING:
from diffusion_policy.workspace.base_workspace import BaseWorkspace
def rollout(policy, env, horizon, abs_action, success_term, num_success_steps):
obs = env.reset()
step = 0
success_counter = 0
done = torch.zeros(env.unwrapped.num_envs, device=env.unwrapped.device)
success_steps = torch.zeros(env.unwrapped.num_envs, device=env.unwrapped.device)
while step < horizon and not torch.all(done):
# device transfer
obs_dict = dict_apply(obs, lambda x: x.to(device=policy.device))
# run policy
with torch.no_grad():
actions = policy.predict_action(obs_dict)
# device transfer
actions = dict_apply(actions, lambda x: x.detach().to(env.unwrapped.device))
actions = actions["action"]
if not torch.all(torch.isfinite(actions)):
print(actions)
raise RuntimeError("Nan or Inf action")
# step env
if abs_action:
actions = undo_transform_action(actions)
# swap axis (N_envs, Ta, Da) -> (Ta, N_envs, Da)
actions = torch.moveaxis(actions, 0, 1)
for act in actions:
# only execute action if env is not done
act = act * torch.logical_not(done)[:, None]
obs, rewards, terminations = env.step(act)
done = torch.logical_or(done, terminations)
if success_term is not None:
success_state = success_term.func(env.unwrapped, **success_term.params)
success_steps = (success_steps + 1) * success_state
success = success_steps >= num_success_steps
done = torch.logical_or(done, success)
success_counter += success_state.sum()
step += 1
return success_counter
def undo_transform_action(self, action):
raw_shape = action.shape
if raw_shape[-1] == 20:
# dual arm
action = action.reshape(-1, 2, 10)
d_rot = action.shape[-1] - 4
pos = action[..., :3]
rot = action[..., 3 : 3 + d_rot]
gripper = action[..., [-1]]
rot = self.rotation_transformer.inverse(rot)
uaction = np.concatenate([pos, rot, gripper], axis=-1)
if raw_shape[-1] == 20:
# dual arm
uaction = uaction.reshape(*raw_shape[:-1], 14)
return uaction
def main():
"""Run a trained policy from robomimic with Isaac Lab environment."""
# Load policy
payload = torch.load(open(args_cli.checkpoint, "rb"), pickle_module=dill) # noqa: SIM115
cfg = payload["cfg"]
cls = hydra.utils.get_class(cfg._target_)
workspace: BaseWorkspace = cls(cfg)
workspace.load_payload(payload, exclude_keys=None, include_keys=None)
# get policy from workspace
policy = workspace.model
if cfg.training.use_ema:
policy = workspace.ema_model
# parse configuration
# TODO: Check if task ID exists
task = args_cli.task if args_cli.task is not None else cfg.task.task_id
env_cfg = parse_env_cfg(
task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# Set observations to dictionary mode for Robomimic
env_cfg.observations.policy.concatenate_terms = False
# Set termination conditions
env_cfg.terminations.time_out = None
# Disable recorder
env_cfg.recorders = None
# use history_length instead of a multi step wrapper
env_cfg.observations.policy.history_length = cfg.n_obs_steps
env_cfg.observations.policy.flatten_history_dim = False
# env_cfg.viewer.eye = (1.3, 0, 0.4)
env_cfg.viewer.resolution = (1920, 1080)
# Set seed
torch.manual_seed(args_cli.seed)
env_cfg.seed = args_cli.seed
# extract success checking function to invoke during rollout
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
# Create environment
env = gym.make(task, cfg=env_cfg, render_mode="rgb_array").unwrapped
env = gym.wrappers.RecordVideo(env, video_folder="/home/timo/", video_length=200, disable_logger=True, fps=30)
env_wrapper = hydra.utils.instantiate(cfg.task.env_runner.env_wrapper)
env = env_wrapper(env=env)
# Acquire device
device = TorchUtils.get_torch_device(try_to_use_cuda=True)
device = torch.device(device)
policy.to(device)
policy.eval()
# Run policy
success = 0
for trial in range(args_cli.num_rollouts):
success += rollout(policy, env, args_cli.horizon, cfg.task.abs_action, success_term, args_cli.num_success_steps)
env.close()
print(
f"Success: {success} / {args_cli.num_rollouts * args_cli.num_envs} ({success / (args_cli.num_rollouts * args_cli.num_envs)})"
)
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()