Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
81c2ead
初次代码提交
2088376133zh Jan 14, 2026
7bfc7fa
提交论文第一章
2088376133zh Jan 15, 2026
b490c72
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 17, 2026
7483e6a
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 17, 2026
f07ceaf
初始化项目,上传基础文件
2088376133zh Apr 17, 2026
1114ad6
添加依赖配置与环境说明
2088376133zh Apr 18, 2026
3702d52
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 19, 2026
99653f8
更改文件夹名称
2088376133zh Apr 19, 2026
adc7ed1
添加生物力学肌肉双臂模型与XML文件
2088376133zh Apr 19, 2026
2046276
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 19, 2026
05babf0
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 19, 2026
285d64e
添加代码修改文件名
2088376133zh Apr 19, 2026
3a74583
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 22, 2026
2d0de79
删除旧文件并更新项目结构
2088376133zh Apr 22, 2026
a8a588b
提交代码文件
2088376133zh Apr 22, 2026
83fdb97
修改文件夹
2088376133zh Apr 22, 2026
481b33d
提交代码文件
2088376133zh Apr 22, 2026
d7d9dda
修改文件
2088376133zh Apr 22, 2026
63b9d5f
重新提交代码文件
2088376133zh Apr 22, 2026
4e961dc
分批上次代码文件
2088376133zh Apr 22, 2026
afd3b6f
部分
2088376133zh Apr 22, 2026
f361622
删除
2088376133zh Apr 22, 2026
f9fb3f7
提交代码
2088376133zh Apr 22, 2026
863961a
分批
2088376133zh Apr 22, 2026
e96cdcd
删除
2088376133zh Apr 22, 2026
06fb33d
代码
2088376133zh Apr 22, 2026
72dd0c2
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 24, 2026
c833d3b
提交
2088376133zh Apr 24, 2026
3c7f398
提交部分
2088376133zh Apr 24, 2026
6e536de
重新交
2088376133zh Apr 24, 2026
491d38b
提交部分
2088376133zh Apr 24, 2026
b3051e7
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 27, 2026
a9920ea
提交修改的代码
2088376133zh Apr 27, 2026
3862b2b
提交修改代码
2088376133zh Apr 27, 2026
0bf7afd
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 28, 2026
9a10e8c
修改不必要的提交
2088376133zh Apr 28, 2026
6b7f7e8
Merge branch 'master' of https://github.com/ZH1024-Heng/sim
2088376133zh Apr 28, 2026
ba9859e
提交修改
2088376133zh Apr 28, 2026
f29a394
提交修改
2088376133zh Apr 28, 2026
0826c22
1
2088376133zh Apr 28, 2026
d8363e9
提交模型文件和代码
2088376133zh Apr 28, 2026
09f0a6d
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng Apr 29, 2026
0f5517f
提交代码
2088376133zh Apr 30, 2026
466479f
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng May 5, 2026
b8c7795
添加模块文件
2088376133zh May 5, 2026
0951084
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng May 6, 2026
e61f953
提交文件
2088376133zh May 6, 2026
3c846b5
更改文件
2088376133zh May 6, 2026
32c9e73
Merge branch 'master' of https://github.com/ZH1024-Heng/sim
2088376133zh May 6, 2026
cb4f36f
提交部分代码文件
2088376133zh May 6, 2026
a3b7a42
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng May 7, 2026
66b4d61
添加模型和代码
2088376133zh May 7, 2026
adce73b
Merge branch 'OpenHUTB:master' into master
ZH1024-Heng May 9, 2026
814e60d
提交代码
2088376133zh May 9, 2026
450c146
1
2088376133zh May 9, 2026
3885d51
Merge branch 'master' of https://github.com/ZH1024-Heng/sim
2088376133zh May 9, 2026
9f7bedc
提交代码
2088376133zh May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions hci/user-in-the-box/uitb/test/PhysicalAlignment/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import pandas as pd
import numpy as np
import mujoco
from scipy.spatial.transform import Rotation

## used for coordinate transformations
def transformation_matrix(pos, quat):
quat = np.roll(quat, -1)
matrix = Rotation.from_quat(quat).as_matrix()

# Create the matrix
T = np.eye(4)
T[:3, :3] = matrix
T[:3, 3] = pos
return T


def ReachEnvelopeCheck(env,
use_vr_controller,
welded_body,
relpose,
endeffector_name,
trajectories_table_columns,
num_episodes,
video_output: bool,
figure_filename,
table_filename,
video_filename):
#return 123

trajectories_table = pd.DataFrame(columns=trajectories_table_columns)
#with imageio.get_writer(video_filename, fps=int(1 / (env._model.opt.timestep))) as video:
video = []

for video_episode_index in range(num_episodes):
print('Video - Episode {}/{}'.format(video_episode_index + 1, num_episodes))
obs, info = env.reset()
print("Successfully reset.")
if video_output:
video.append(env.render())

nsteps_to_test = 100
##TODO: generalize to other biom. models (allow to define initial posture and list of joints over which to iterate)
range_elevation_angle = np.linspace(
env._model.joint('elv_angle').range[0],
env._model.joint('elv_angle').range[1], nsteps_to_test)
range_elevation = np.linspace(
env._model.joint('shoulder_elv').range[0],
env._model.joint('shoulder_elv').range[1], nsteps_to_test)
_steps = 0
for i in range(nsteps_to_test):
for j in range(nsteps_to_test):
if (100*(_steps+1)/(nsteps_to_test**2) % 10) == 0:
print(f"{100*(_steps+1)/(nsteps_to_test**2)}% reached.")

# if use_vr_controller:
# obs, info = env.reset()
env._data.joint('elv_angle').qpos = range_elevation_angle[i]
env._data.joint('shoulder_elv').qpos = range_elevation[j]

# maximally extend the arm:
env._data.joint("elbow_flexion").qpos = env._model.joint("elbow_flexion").range[0]

# adjust virtual joints according to active constraints:
_eq_act = getattr(env._model, "eq_active", None)
if _eq_act is None:
_eq_act = np.asarray(getattr(env._model, "eq_active0", np.ones(env._model.neq)))
for (virtual_joint_id, physical_joint_id, poly_coefs) in zip(
env._model.eq_obj1id[
(env._model.eq_type == 2) & (_eq_act == 1)],
env._model.eq_obj2id[
(env._model.eq_type == 2) & (_eq_act == 1)],
env._model.eq_data[(env._model.eq_type == 2) &
(_eq_act == 1), 4::-1]):
if physical_joint_id >= 0:
env._data.joint(virtual_joint_id).qpos = np.polyval(poly_coefs, env._data.joint(physical_joint_id).qpos)
# qpos[env._model.joint_name2id('shoulder1_r2')] = -qpos[env._model.joint_name2id(
# 'elv_angle')] # joint equality constraint between "elv_angle" and "shoulder1_r2"

#env._data.qpos[:] = qpos
# if use_vr_controller:
# mujoco.mj_step(env._model, env._data)
# else:
# mujoco.mj_forward(env._model, env._data)
mujoco.mj_forward(env._model, env._data)
if video_output:
video.append(env.render())
trajectories_table.loc[len(trajectories_table), :"shoulder_elv_pos"] = np.concatenate((env._data.joint('elv_angle').qpos, env._data.joint('shoulder_elv').qpos))
if use_vr_controller:
#manually compute VR controller position using "right_controller_relpose" from config file and current position of the aux body/welded body
T1 = transformation_matrix(pos=env._data.body(welded_body).xpos, quat=env._data.body(welded_body).xquat)
T2 = transformation_matrix(pos=relpose[:3], quat=relpose[3:])
T = np.matmul(T1, np.linalg.inv(T2))
T[:3, 3]
trajectories_table.loc[len(trajectories_table) - 1,
"end-effector_xpos_x":"end-effector_xpos_z"] = T[:3, 3]
# _controller_quat = np.roll(Rotation.from_matrix(T[:3, :3]).as_quat(), 1)
else:
trajectories_table.loc[len(trajectories_table) - 1,
"end-effector_xpos_x":"end-effector_xpos_z"] = env._data.body(endeffector_name).xpos
# print(env._data.body(endeffector_name).xpos)

_steps += 1

## -> Store csv file
trajectories_table.to_csv(table_filename)
print(f"ReachEnvelope data stored at '{table_filename}'.")


## -> Store mp4 file
print("\nCreating ReachEnvelope video...")
# temporarily override fps
_orig_fps = env._GUI_camera._fps
env._GUI_camera._fps = 100
env._GUI_camera.write_video_set_path(video_filename)
# Write the video
# simulator._camera.write_video(imgs, os.path.join(evaluate_dir, args.out_file))
for _img in video:
env._GUI_camera.write_video_add_frame(_img)
env._GUI_camera.write_video_close()
# reset to original fps from env
env._GUI_camera._fps = _orig_fps
print(f"ReachEnvelope video stored at '{video_filename}'.")
1 change: 1 addition & 0 deletions hci/user-in-the-box/uitb/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from uitb.test.PhysicalAlignment.utils import ReachEnvelopeCheck
269 changes: 269 additions & 0 deletions hci/user-in-the-box/uitb/test/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import os
import numpy as np
from stable_baselines3 import PPO
import re
import argparse
import scipy.ndimage
from collections import defaultdict
import matplotlib.pyplot as pp
import cv2

from uitb.utils.logger import StateLogger, ActionLogger
from uitb.utils import interaction_metrics
from uitb.simulator import Simulator

#from uitb.bm_models.effort_models import CumulativeFatigue3CCr, ConsumedEndurance


def natural_sort(l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)


### DEPRECATED, use simulator.render() instead
# def grab_pip_image(simulator):
# # Grab an image from both 'for_testing' camera and 'oculomotor' camera, and display them 'picture-in-picture'
#
# # Grab images
# img, _ = simulator._GUI_camera.render()
#
# ocular_img = None
# for module in simulator.perception.perception_modules:
# if module.modality == "vision":
# # TODO would be better to have a class function that returns "human-viewable" rendering of the observation;
# # e.g. in case the vision model has two cameras, or returns a combination of rgb + depth images etc.
# ocular_img, _ = module._camera.render()
#
# if ocular_img is not None:
#
# # Resample
# resample_factor = 2
# resample_height = ocular_img.shape[0]*resample_factor
# resample_width = ocular_img.shape[1]*resample_factor
# resampled_img = np.zeros((resample_height, resample_width, 3), dtype=np.uint8)
# for channel in range(3):
# resampled_img[:, :, channel] = scipy.ndimage.zoom(ocular_img[:, :, channel], resample_factor, order=0)
#
# # Embed ocular image into free image
# i = simulator._GUI_camera.height - resample_height
# j = simulator._GUI_camera.width - resample_width
# img[i:, j:] = resampled_img
#
# return img


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Evaluate a policy.')
parser.add_argument('simulator_folder', type=str,
help='the simulation folder')
parser.add_argument('--action_sample_freq', type=float, default=20,
help='action sample frequency (how many times per second actions are sampled from policy, default: 20)')
parser.add_argument('--checkpoint', type=str, default=None,
help='filename of a specific checkpoint (default: None, latest checkpoint is used)')
parser.add_argument('--num_episodes', type=int, default=10,
help='how many episodes are evaluated (default: 10)')
parser.add_argument('--uncloned', dest="cloned", action='store_false', help='use source code instead of files from cloned simulator module')
parser.add_argument('--app_condition', type=str, default=None,
help="can be used to override the 'condition' argument passed to a Unity app")
parser.add_argument('--record', action='store_true', help='enable recording')
parser.add_argument('--human', action='store_true', help='enable real-time PyGame rendering window')
parser.add_argument('--out_file', type=str, default='evaluate.mp4',
help='output file for recording if recording is enabled (default: ./evaluate.mp4)')
parser.add_argument('--logging', action='store_true', help='enable logging')
parser.add_argument('--state_log_file', default='state_log',
help='output file for state log if logging is enabled (default: ./state_log)')
parser.add_argument('--action_log_file', default='action_log',
help='output file for action log if logging is enabled (default: ./action_log)')
parser.add_argument('--metrics', action='store_true',
help='汇总人机交互精度指标(指向/跟踪:指尖-目标距离 RMSE 等;遥控驾驶:车-目标距离与停车成功率)')
parser.add_argument('--deterministic', action='store_true',
help='策略评测时使用确定性动作(降低方差,便于对比控制精度)')
args = parser.parse_args()

# Define directories
checkpoint_dir = os.path.join(args.simulator_folder, 'checkpoints')
evaluate_dir = os.path.join(args.simulator_folder, 'evaluate')

# Make sure output dir exists
os.makedirs(evaluate_dir, exist_ok=True)

# Override run parameters
run_params = dict()
run_params["action_sample_freq"] = args.action_sample_freq
run_params["evaluate"] = True

run_params["unity_record_gameplay"] = args.record #False
run_params["unity_logging"] = True
run_params["unity_output_folder"] = evaluate_dir
if args.app_condition is not None:
run_params["app_args"] = ['-condition', args.app_condition]
# run_params["unity_random_seed"] = 123

# Embed visual observations into main mp4 or store as separate mp4 files
render_mode_perception = "separate" if run_params["unity_record_gameplay"] else "embed"

deterministic = bool(args.deterministic)

# Initialise simulator
_render_mode = "human" if args.human else "rgb_array_list"
simulator = Simulator.get(args.simulator_folder, render_mode=_render_mode, render_mode_perception=render_mode_perception, run_parameters=run_params, use_cloned=args.cloned)
task_name = simulator.task.__class__.__name__
metric_episodes = []

# ## Change effort model #TODO: delete
# simulator.bm_model._effort_model = CumulativeFatigue3CCr(simulator.bm_model, dt=simulator._run_parameters["dt"])

print(f"run parameters are: {simulator.run_parameters}")

# Load latest model if filename not given
_policy_loaded = False
if args.checkpoint is not None:
model_file = args.checkpoint
_policy_loaded = True
else:
try:
files = natural_sort(os.listdir(checkpoint_dir))
model_file = files[-1]
_policy_loaded = True
except (FileNotFoundError, IndexError):
print("No checkpoint found. Will continue evaluation with randomly sampled controls.")

if _policy_loaded:
# Load policy TODO should create a load method for uitb.rl.BaseRLModel
print(f'Loading model: {os.path.join(checkpoint_dir, model_file)}\n')
model = PPO.load(os.path.join(checkpoint_dir, model_file))

# Set callbacks to match the value used for this training point (if the simulator had any)
simulator.update_callbacks(model.num_timesteps)

if args.logging:
# Initialise log
state_logger = StateLogger(args.num_episodes, keys=simulator.get_state().keys())

# Actions are logged separately to make things easier
action_logger = ActionLogger(args.num_episodes)

# Visualise evaluations
# statistics = defaultdict(list)
for episode_idx in range(args.num_episodes):

print(f"Run episode {episode_idx + 1}/{args.num_episodes}.")

# Reset environment
obs, info = simulator.reset()
terminated = False
truncated = False
reward = 0

if args.metrics:
episode_dist_samples = []
episode_inside = []
episode_car = []

if args.logging:
state = simulator.get_state()
state_logger.log(episode_idx, state)

# Loop until episode ends
while not terminated and not truncated:
# #print(f"Episode {episode_idx}: {simulator.get_episode_statistics_str()}")
# print(reward)

if _policy_loaded:
# Get actions from policy
action, _internal_policy_state = model.predict(obs, deterministic=deterministic)
else:
# choose random action from action space
action = simulator.action_space.sample()

# Take a step
obs, r, terminated, truncated, info = simulator.step(action)
reward += r

if args.metrics:
st = simulator.get_state()
st.update(info)
if task_name in ("Pointing", "Tracking", "SmallObjectTouch"):
if "dist_to_target_center" in st:
episode_dist_samples.append(float(st["dist_to_target_center"]))
episode_inside.append(bool(info.get("inside_target", False)))
elif task_name == "RemoteDriving":
if "dist_car_to_target" in st:
episode_car.append(float(st["dist_car_to_target"]))

if args.logging:
action_logger.log(episode_idx,
{"steps": state["steps"], "timestep": state["timestep"], "action": action.copy(),
"reward": r})
state = simulator.get_state()
state.update(info)
state_logger.log(episode_idx, state)

if args.metrics:
st_end = simulator.get_state()
st_end.update(info)
if task_name in ("Pointing", "Tracking", "SmallObjectTouch"):
ep = {"dist_samples": list(episode_dist_samples),
"inside_target_frac": interaction_metrics.inside_fraction(episode_inside)}
if task_name in ("Pointing", "SmallObjectTouch"):
ep["targets_hit"] = int(st_end.get("targets_hit", 0))
metric_episodes.append(ep)
elif task_name == "RemoteDriving":
metric_episodes.append({
"dist_car_samples": list(episode_car),
"target_parking_success": bool(st_end.get("target_hit", False)),
"min_dist_car": float(min(episode_car)) if episode_car else float("nan"),
})

# print(f"Episode {episode_idx}: {simulator.get_episode_statistics_str()}")

# episode_statistics = simulator.get_episode_statistics()
# for key in episode_statistics:
# statistics[key].append(episode_statistics[key])

# print(f'Averages over {args.num_episodes} episodes (std in parenthesis):',
# ', '.join(['{}: {:.2f} ({:.2f})'.format(k, np.mean(v), np.std(v)) for k, v in statistics.items()]))

if args.metrics:
if task_name in ("Pointing", "Tracking", "SmallObjectTouch", "RemoteDriving"):
interaction_metrics.print_run_summary(task_name, metric_episodes)
else:
print(f"提示: 任务「{task_name}」暂无内置自动指标;请使用 --logging 记录状态后离线分析,"
f"或在 uitb/utils/interaction_metrics.py 中扩展。")

if args.logging:
# Output log
state_logger.save(os.path.join(evaluate_dir, args.state_log_file))
action_logger.save(os.path.join(evaluate_dir, args.action_log_file))
print(f'Log files have been saved files {os.path.join(evaluate_dir, args.state_log_file)}.pickle and '
f'{os.path.join(evaluate_dir, args.action_log_file)}.pickle')

if args.record:
simulator._GUI_camera.write_video_set_path(os.path.join(evaluate_dir, args.out_file))

# Write the video
# simulator._camera.write_video(imgs, os.path.join(evaluate_dir, args.out_file))
_imgs = simulator.render()
for _img in _imgs:
simulator._GUI_camera.write_video_add_frame(_img)

simulator._GUI_camera.write_video_close()
print(f'A recording has been saved to file {os.path.join(evaluate_dir, args.out_file)}')

# Write additional videos for each perception module camera (only if simulator._render_mode_perception == "separate")
if simulator._render_mode_perception == "separate":
_perception_imgs = simulator.get_render_stack_perception()
for _module_name, _imgs in _perception_imgs.items():
_out_file = os.path.splitext(args.out_file)[0] + f"_{_module_name.replace('/', '-')}" + os.path.splitext(args.out_file)[1]

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(os.path.join(evaluate_dir, _out_file), fourcc, simulator._GUI_camera._fps, (_imgs[0].shape[1], _imgs[0].shape[0]))
# out.open(_out_file)
for img in _imgs:
out.write(cv2.cvtColor(np.uint8(img), cv2.COLOR_BGR2RGB))
out.release()
print(f'A recording has been saved to file {os.path.join(evaluate_dir, _out_file)}')

simulator.close()
Loading