-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecord_demos.py
More file actions
executable file
·304 lines (247 loc) · 11.5 KB
/
record_demos.py
File metadata and controls
executable file
·304 lines (247 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to record demonstrations with Isaac Lab environments using human teleoperation.
This script allows users to record demonstrations operated by human teleoperation for a specified task.
The recorded demonstrations are stored as episodes in a hdf5 file. Users can specify the task, teleoperation
device, dataset directory, and environment stepping rate through command-line arguments.
"""
"""Launch Isaac Sim Simulator first."""
import argparse
import os
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Record demonstrations for Isaac Lab environments.")
parser.add_argument("--task", type=str, default="IIL-Lift-Cube-v0-LowDim", help="Name of the task.")
parser.add_argument(
"--teleop_device",
type=str,
default="gamepad",
help="Device for interacting with environment.",
)
parser.add_argument("--sensitivity", type=float, default=1, help="Sensitivity factor.")
parser.add_argument(
"--dataset_file",
type=str,
default="./data/dataset.hdf5",
help="File path to export recorded demos.",
)
parser.add_argument("--step_hz", type=int, default=30, help="Environment stepping rate in Hz.")
parser.add_argument(
"--num_demos",
type=int,
default=0,
help="Number of demonstrations to record. Set to 0 for infinite.",
)
parser.add_argument(
"--num_success_steps",
type=int,
default=10,
help="Number of continuous steps with task success for concluding a demo as successful. Default is 10.",
)
parser.add_argument(
"--simpub",
action="store_true",
default=False,
help="Enable SimPub.",
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="Seed for simulation.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
if args_cli.teleop_device.lower() == "handtracking":
vars(args_cli)["experience"] = f"{os.environ['ISAACLAB_PATH']}/apps/isaaclab.python.xr.openxr.kit"
if "simpub" in args_cli.teleop_device.lower():
args_cli.simpub = True
# launch the simulator
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import time
import gymnasium as gym
import omni.log
import torch
from isaaclab.devices import Se3Gamepad, Se3HandTracking, Se3Keyboard, Se3SpaceMouse
from isaaclab.envs import ManagerBasedEnvCfg, ViewerCfg
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.envs.ui import ViewportCameraController
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
import isaac_imitation_learning.tasks # noqa: F401
# only import simpub if needed, so user is not forced to install it
if args_cli.simpub:
from simpub.sim.isaacsim_publisher import IsaacSimPublisher
from isaac_imitation_learning.devices.SimPub.se3_SimPubHandTracking import Se3SimPubHandTrackingRel
class RateLimiter:
"""Convenience class for enforcing rates in loops."""
def __init__(self, hz):
"""
Args:
hz (int): frequency to enforce
"""
self.hz = hz
self.last_time = time.time()
self.sleep_duration = 1.0 / hz
self.render_period = min(0.033, self.sleep_duration)
def sleep(self, env):
"""Attempt to sleep at the specified rate in hz."""
next_wakeup_time = self.last_time + self.sleep_duration
while time.time() < next_wakeup_time:
time.sleep(self.render_period)
env.sim.render()
self.last_time = self.last_time + self.sleep_duration
# detect time jumping forwards (e.g. loop is too slow)
if self.last_time < time.time():
while self.last_time < time.time():
self.last_time += self.sleep_duration
def pre_process_actions(delta_pose: torch.Tensor, gripper_command: bool) -> torch.Tensor:
"""Pre-process actions for the environment."""
# resolve gripper command
gripper_vel = torch.zeros((delta_pose.shape[0], 1), dtype=torch.float, device=delta_pose.device)
gripper_vel[:] = -1 if gripper_command else 1
# compute actions
return torch.concat([delta_pose, gripper_vel], dim=1)
def main():
"""Collect demonstrations from the environment using teleop interfaces."""
# if handtracking is selected, rate limiting is achieved via OpenXR
rate_limiter = (
None
if args_cli.teleop_device.lower() == "handtracking" or args_cli.step_hz == 0
else RateLimiter(args_cli.step_hz)
)
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.dataset_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.dataset_file))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# parse configuration
env_cfg = parse_env_cfg(task_name=args_cli.task, device=args_cli.device, num_envs=1)
assert isinstance(env_cfg, ManagerBasedEnvCfg), "For recording direct env demos use record_direct_demos.py"
env_cfg.env_name = args_cli.task
env_cfg.seed = args_cli.seed
# extract success checking function to invoke in the main loop
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
omni.log.warn(
"No success termination term was found in the environment."
" Will not be able to mark recorded demos as successful."
)
# modify configuration such that the environment runs indefinitely until
# the goal is reached or other termination conditions are met
env_cfg.terminations.time_out = None
env_cfg.observations.policy.concatenate_terms = False
env_cfg.recorders = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
# reset teleop if env is reset
def teleop_reset(env, env_ids):
teleop_interface.reset()
env_cfg.events.reset_teleop = EventTerm(func=teleop_reset, mode="reset")
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# add teleoperation key for reset current recording instance
should_reset_recording_instance = False
def reset_recording_instance():
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
# enable simpub if selected
if args_cli.simpub and env.unwrapped.sim is not None and env.unwrapped.sim.stage is not None:
print("parsing usd stage...")
IsaacSimPublisher(host="127.0.0.1", stage=env.unwrapped.sim.stage)
# create controller
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(pos_sensitivity=0.2, rot_sensitivity=0.5)
teleop_interface.add_callback("R", reset_recording_instance)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(pos_sensitivity=0.2, rot_sensitivity=0.5)
elif args_cli.teleop_device.lower() == "gamepad":
from carb.input import GamepadInput
teleop_interface = Se3Gamepad(
pos_sensitivity=0.1 * args_cli.sensitivity,
rot_sensitivity=0.1 * args_cli.sensitivity,
dead_zone=0.2,
)
teleop_interface.add_callback(GamepadInput.B, reset_recording_instance)
elif args_cli.teleop_device.lower() == "handtracking":
from isaacsim.xr.openxr import OpenXRSpec
teleop_interface = Se3HandTracking(OpenXRSpec.XrHandEXT.XR_HAND_RIGHT_EXT, False, True)
teleop_interface.add_callback("RESET", reset_recording_instance)
viewer = ViewerCfg(eye=(-0.25, -0.3, 0.5), lookat=(0.6, 0, 0), asset_name="viewer")
ViewportCameraController(env, viewer)
elif args_cli.teleop_device.lower() == "simpub":
teleop_interface = Se3SimPubHandTrackingRel(
pos_sensitivity=(args_cli.sensitivity / env_cfg.decimation),
rot_sensitivity=(args_cli.sensitivity / env_cfg.decimation),
)
teleop_interface.add_callback("Y", reset_recording_instance)
else:
raise ValueError(
f"Invalid device interface '{args_cli.teleop_device}'. Supported: 'keyboard', 'spacemouse', 'handtracking', 'gamepad', 'simpub'."
)
print(teleop_interface)
# reset before starting
env.reset()
# simulate environment -- run everything in inference mode
current_recorded_demo_count = 0
success_step_count = 0
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running():
# get keyboard command
delta_pose, gripper_command = teleop_interface.advance()
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=env.device).repeat(env.num_envs, 1)
# compute actions based on environment
actions = pre_process_actions(delta_pose, gripper_command)
# perform action on environment
# TODO: Support for envs not using full range of SE3 input
env.step(actions)
if success_term is not None:
if bool(success_term.func(env, **success_term.params)[0]):
success_step_count += 1
if success_step_count >= args_cli.num_success_steps:
env.recorder_manager.record_pre_reset([0], force_export_or_skip=False)
env.recorder_manager.get_episode(0).seed = args_cli.seed
env.recorder_manager.set_success_to_episodes(
[0],
torch.tensor([[True]], dtype=torch.bool, device=env.device),
)
env.recorder_manager.export_episodes([0])
should_reset_recording_instance = True
else:
success_step_count = 0
if should_reset_recording_instance:
env.recorder_manager.reset()
env.reset()
should_reset_recording_instance = False
success_step_count = 0
# print out the current demo count if it has changed
if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count
print(f"Recorded {current_recorded_demo_count} successful demonstrations.")
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
break
# check that simulation is stopped or not
if env.sim.is_stopped():
break
if rate_limiter:
rate_limiter.sleep(env)
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()