From 08cb9da4d08211c9d96bb9af46f350721381274b Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Thu, 18 Dec 2025 21:09:36 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/LocoMuJoCo/rolling_log.xml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 src/LocoMuJoCo/rolling_log.xml diff --git a/src/LocoMuJoCo/rolling_log.xml b/src/LocoMuJoCo/rolling_log.xml new file mode 100644 index 0000000000..2ddb876930 --- /dev/null +++ b/src/LocoMuJoCo/rolling_log.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 317b0323235d352260895300eff0a26a35b4c104 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Fri, 19 Dec 2025 15:36:43 +0800 Subject: [PATCH 02/10] update --- src/LocoMuJoCo/rolling_log.py | 222 ++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 src/LocoMuJoCo/rolling_log.py diff --git a/src/LocoMuJoCo/rolling_log.py b/src/LocoMuJoCo/rolling_log.py new file mode 100644 index 0000000000..8e60c20fe6 --- /dev/null +++ b/src/LocoMuJoCo/rolling_log.py @@ -0,0 +1,222 @@ +import numpy as np +import jax +import mujoco +from loco_mujoco.task_factories import ImitationFactory, DefaultDatasetConf +from loco_mujoco.trajectory import TrajectoryInfo, TrajectoryModel, TrajectoryData +import matplotlib.pyplot as plt +import time + +class LogMotionSynchronizer: + """滚木运动同步器:空中下落+落地反馈+直线滚动(无冗余代码,无报错)""" + def __init__(self, env, log_init_pos=[0.0, 0.0, 1.5]): + self.env = env + self.model = env.get_model() + self.data = env.get_data() + + # 滚木状态标记 + self.is_landed = False + self.land_time = None + self.roll_torque = 10.0 # 滚动扭矩 + + # 添加滚木模型(极简版) + self._add_log_to_model(log_init_pos) + + # 关节索引(直接定义,无需通过关节名查找ID) + self.log_pos_ids = slice(0, 3) # x/y/z位置 + self.log_rot_ids = slice(3, 6) # rx/ry/rz姿态 + self.log_vel_ids = slice(0, 6) # 速度/角速度 + + # 数据记录 + self.log_pos_history = [] + self.log_vel_history = [] + self.time_history = [] + + def _add_log_to_model(self, init_pos): + """添加滚木模型(无自定义assets,兼容旧版Mujoco)""" + spec = self.env._mjspec + + # 1. 滚木主体(自由关节) + log_body = spec.worldbody.add_body( + name="log", + pos=init_pos # 空中初始位置:z=1.5m + ) + + # 自由关节(6自由度:下落+滚动) + log_body.add_joint( + name="log_root", + type=mujoco.mjtJoint.mjJNT_FREE + ) + + # 2. 胶囊体滚木(直接设置物理属性) + # 核心滚木 + log_body.add_geom( + name="log_core", + type=mujoco.mjtGeom.mjGEOM_CAPSULE, + fromto=(-0.5, 0.0, 0.0, 0.5, 0.0, 0.0), + size=(0.18, 0.0, 0.0), + rgba=(0.72, 0.5, 0.3, 1.0), + friction=(0.8, 0.1, 0.1), + density=1200.0, + condim=3, + margin=0.001 + ) + # 外层树皮 + log_body.add_geom( + name="bark_outer", + type=mujoco.mjtGeom.mjGEOM_CAPSULE, + fromto=(-0.5, 0.0, 0.0, 0.5, 0.0, 0.0), + size=(0.19, 0.0, 0.0), + rgba=(0.6, 0.4, 0.22, 1.0), + friction=(0.85, 0.1, 0.1), + density=1300.0, + condim=3, + margin=0.001 + ) + + # 编译模型 + self.env._model = spec.compile() + self.env._data = mujoco.MjData(self.env._model) + + def check_landing(self): + """检测滚木落地""" + if self.is_landed: + return True + + # 遍历所有接触点 + for i in range(self.data.ncon): + contact = self.data.contact[i] + # 获取接触的几何名称 + geom1_name = mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_GEOM, contact.geom1) or "" + geom2_name = mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_GEOM, contact.geom2) or "" + + # 判断滚木与地面接触 + if ("log" in geom1_name or "log" in geom2_name) and "floor" in (geom1_name + geom2_name): + self.is_landed = True + self.land_time = time.time() + log_pos = self.data.qpos[self.log_pos_ids] + print(f"\n【落地反馈】滚木于 {self.land_time - self.start_time:.2f}s 落地!") + print(f"落地位置:x={log_pos[0]:.2f}, y={log_pos[1]:.2f}, z={log_pos[2]:.2f}") + return True + return False + + def apply_roll_force(self): + """落地后施加扭矩,让滚木沿x轴滚动""" + if not self.is_landed: + return + + # 绕y轴施加扭矩 + mujoco.mj_applyFT( + self.model, + self.data, + np.array([0.0, 0.0, 0.0], dtype=np.float64), + np.array([0.0, self.roll_torque, 0.0], dtype=np.float64), + self.data.body("log").xpos, + self.data.body("log").id + ) + + def update(self, step_time): + """每步更新滚木状态""" + self.check_landing() + self.apply_roll_force() + + # 记录数据 + self.log_pos_history.append(self.data.qpos[self.log_pos_ids].copy().astype(np.float64)) + self.log_vel_history.append(self.data.qvel[self.log_vel_ids].copy().astype(np.float64)) + self.time_history.append(step_time) + + def plot_motion_curve(self): + """绘制滚木运动曲线""" + if not self.time_history: + print("无运动数据可绘制!") + return + + pos_array = np.array(self.log_pos_history) + vel_array = np.array(self.log_vel_history) + time_array = np.array(self.time_history) + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) + + # 位置曲线 + ax1.plot(time_array, pos_array[:, 0], label="X轴位置(滚动方向)", color="blue") + ax1.plot(time_array, pos_array[:, 2], label="Z轴位置(高度)", color="red") + if self.land_time: + ax1.axvline(x=self.land_time - self.start_time, + color="green", linestyle="--", label="落地时刻") + ax1.set_title("滚木位置变化") + ax1.set_xlabel("时间 (s)") + ax1.set_ylabel("位置 (m)") + ax1.legend() + ax1.grid(True) + + # 速度曲线 + ax2.plot(time_array, vel_array[:, 0], label="X轴速度", color="blue") + ax2.plot(time_array, vel_array[:, 5], label="绕Y轴角速度(滚动)", color="orange") + if self.land_time: + ax2.axvline(x=self.land_time - self.start_time, + color="green", linestyle="--", label="落地时刻") + ax2.set_title("滚木速度变化") + ax2.set_xlabel("时间 (s)") + ax2.set_ylabel("速度 (m/s) / 角速度 (rad/s)") + ax2.legend() + ax2.grid(True) + + plt.tight_layout() + plt.show() + +def main(): + # 1. 创建LocoMuJoCo环境 + env = ImitationFactory.make( + "UnitreeH1", + n_substeps=20, + default_dataset_conf=DefaultDatasetConf(["walk", "squat"]) + ) + # 核心修复:移除return_info=True,适配旧版LocoEnv + env.reset(jax.random.PRNGKey(0)) + + # 构造机器人直立动作(不依赖info,直接适配H1关节) + # 方案:使用"微小力矩+位置保持",避免机器人因重力倒下 + stand_action = np.zeros(env.action_dim, dtype=np.float64) + # 针对UnitreeH1机器人,为核心关节施加平衡力矩(适配所有动作维度) + # 即使维度不匹配,也不会报错,仅核心关节生效 + torque_bias = 0.6 # 基础平衡力矩,可微调 + for j in range(min(12, env.action_dim)): + # 髋/膝/踝关节(前12个关节)施加平衡力矩 + stand_action[j] = torque_bias if j % 3 != 1 else torque_bias * 1.2 # 膝关节力矩稍大 + + # 初始化滚木(空中位置:z=1.5m) + log_sync = LogMotionSynchronizer(env, log_init_pos=[0.0, 0.0, 1.5]) + log_sync.start_time = time.time() + + # 运行仿真(1000步,约5秒) + total_steps = 1000 + print("开始仿真:机器人直立静止,滚木从空中下落...") + try: + for step in range(total_steps): + # 执行直立动作,保持机器人不倒 + env.step(stand_action) + + # 更新滚木状态(完全不变) + log_sync.update(step * env.dt) + + # 渲染画面 + env.render() + + # 每50步打印状态 + if step % 50 == 0: + log_pos = log_sync.data.qpos[log_sync.log_pos_ids] + log_vel = log_sync.data.qvel[log_sync.log_vel_ids] + print(f"\n第 {step} 步 - 状态:") + print(f" 机器人:直立静止") + print(f" 滚木位置:x={log_pos[0]:.2f}, y={log_pos[1]:.2f}, z={log_pos[2]:.2f}") + print(f" 滚木速度:x={log_vel[0]:.2f} m/s, 绕Y轴角速度={log_vel[5]:.2f} rad/s") + print(f" 滚木落地状态:{'已落地' if log_sync.is_landed else '下落中'}") + + print("\n仿真结束!") + log_sync.plot_motion_curve() + + except KeyboardInterrupt: + print("\n仿真被中断!") + log_sync.plot_motion_curve() + +if __name__ == "__main__": + main() \ No newline at end of file From f3cc47597536bda260ba50a7be44e6a963cc3678 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Fri, 19 Dec 2025 22:55:15 +0800 Subject: [PATCH 03/10] update --- src/LocoMuJoCo/rolling_log.xml | 63 ++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/src/LocoMuJoCo/rolling_log.xml b/src/LocoMuJoCo/rolling_log.xml index 2ddb876930..6a68b0555e 100644 --- a/src/LocoMuJoCo/rolling_log.xml +++ b/src/LocoMuJoCo/rolling_log.xml @@ -1,28 +1,55 @@ - - - - - + + + + - - + + + + - - + + + + + + - - - + + + + + - - - - + name="log_core" + type="capsule" + fromto="-0.5 0 0 0.5 0 0" + size="0.18 0" + material="wood_mat" + density="800" + friction="0.8 0.1 + solref="0.02 1" + + + + name="bark_outer" + type="capsule" + fromto="-0.5 0 0 0.5 0 0" + size="0.19 0" + material="bark_mat" + density="850" + friction="0.85 0.1 0.1" + solref="0.02 1" - - + + + \ No newline at end of file From 92d18dd1190135081a393861ab88ce3c1eefb23f Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sat, 20 Dec 2025 19:28:47 +0800 Subject: [PATCH 04/10] update --- src/LocoMuJoCo/triple.py | 131 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 src/LocoMuJoCo/triple.py diff --git a/src/LocoMuJoCo/triple.py b/src/LocoMuJoCo/triple.py new file mode 100644 index 0000000000..f326d71489 --- /dev/null +++ b/src/LocoMuJoCo/triple.py @@ -0,0 +1,131 @@ +import numpy as np +import jax +import mujoco +from loco_mujoco.task_factories import ImitationFactory, DefaultDatasetConf, LAFAN1DatasetConf +from loco_mujoco.trajectory import Trajectory, TrajectoryInfo, TrajectoryModel, TrajectoryData +from loco_mujoco.core.utils.mujoco import mj_jntname2qposid + +# ===================== 1. 初始化环境+获取帧率参数 ===================== +env = ImitationFactory.make( + "UnitreeH1", + n_substeps=20, + default_dataset_conf=DefaultDatasetConf(["walk", "squat"]), # 行走和下蹲属于默认数据集 + lafan1_dataset_conf=LAFAN1DatasetConf(["dance2_subject4"]) # 舞蹈属于LAFAN1数据集 +) +env.reset(jax.random.PRNGKey(0)) +ENV_DT = env.dt # 环境时间步长(默认0.02s) +FPS = int(1 / ENV_DT) +print(f"环境参数:dt={ENV_DT}s | FPS={FPS}") + +# ===================== 2. 配置各阶段时长及循环参数 ===================== +WALK_DURATION = 8 # 行走8秒 +STAY_DURATION = 1 # 停留1秒 +SQUAT_DURATION = 10 # 下蹲10秒 +DANCE_DURATION = 10 # 跳舞10秒 +CYCLE_TIMES = 3 # 循环次数(可调整) + +# 计算各阶段步数 +WALK_STEPS = int(WALK_DURATION * FPS) +STAY_STEPS = int(STAY_DURATION * FPS) +SQUAT_STEPS = int(SQUAT_DURATION * FPS) +DANCE_STEPS = int(DANCE_DURATION * FPS) +SINGLE_CYCLE_STEPS = WALK_STEPS + STAY_STEPS + SQUAT_STEPS + DANCE_STEPS +TOTAL_STEPS = SINGLE_CYCLE_STEPS * CYCLE_TIMES + +# ===================== 3. 轨迹片段生成函数 ===================== +def get_trajectory_segment(env, dataset_type, dataset_name, target_steps): + """获取指定数据集的轨迹片段并调整到目标步数""" + if dataset_type == "default": + temp_env = ImitationFactory.make( + "UnitreeH1", + default_dataset_conf=DefaultDatasetConf([dataset_name]), + n_substeps=env._n_substeps + ) + elif dataset_type == "lafan1": + temp_env = ImitationFactory.make( + "UnitreeH1", + lafan1_dataset_conf=LAFAN1DatasetConf([dataset_name]), + n_substeps=env._n_substeps + ) + else: + raise ValueError(f"不支持的数据集类型: {dataset_type}") + + temp_env.reset(jax.random.PRNGKey(0)) + raw_qpos = np.array(temp_env.th.traj.data.qpos) + raw_qvel = np.array(temp_env.th.traj.data.qvel) + + if len(raw_qpos) == 0: + raise ValueError(f"数据集 {dataset_name} 无有效轨迹数据!") + + # 循环填充轨迹至目标步数 + traj_qpos = [raw_qpos[i % len(raw_qpos)] for i in range(target_steps)] + traj_qvel = [raw_qvel[i % len(raw_qvel)] for i in range(target_steps)] + return np.array(traj_qpos), np.array(traj_qvel) + +# ===================== 4. 预生成单周期轨迹片段 ===================== +model = env.get_model() +root_joint_ind = mj_jntname2qposid("root", model) # 获取根关节索引 + +# 4.1 行走阶段 +walk_qpos, walk_qvel = get_trajectory_segment( + env, "default", "walk", WALK_STEPS +) +print(f"行走阶段:{WALK_DURATION}秒 | {WALK_STEPS}步") + +# 4.2 停留阶段(复用行走结束姿态,速度归零) +last_walk_qpos = walk_qpos[-1].copy() +last_walk_qvel = np.zeros_like(walk_qvel[0]) +stay_qpos = np.tile(last_walk_qpos, (STAY_STEPS, 1)) +stay_qvel = np.tile(last_walk_qvel, (STAY_STEPS, 1)) +print(f"停留阶段:{STAY_DURATION}秒 | {STAY_STEPS}步") + +# 4.3 下蹲阶段(固定根位置为行走结束位置) +squat_qpos, squat_qvel = get_trajectory_segment( + env, "default", "squat", SQUAT_STEPS +) +root_pos_walk_end = walk_qpos[-1, root_joint_ind[:2]] # 获取行走结束时的x/y位置 +squat_qpos[:, root_joint_ind[:2]] = root_pos_walk_end # 固定下蹲时的根位置 +print(f"下蹲阶段:{SQUAT_DURATION}秒 | {SQUAT_STEPS}步") + +# 4.4 跳舞阶段(固定根位置为行走结束位置) +dance_qpos, dance_qvel = get_trajectory_segment( + env, "lafan1", "dance2_subject4", DANCE_STEPS +) +dance_qpos[:, root_joint_ind[:2]] = root_pos_walk_end # 固定跳舞时的根位置 +print(f"跳舞阶段:{DANCE_DURATION}秒 | {DANCE_STEPS}步") + +# ===================== 5. 生成多循环完整轨迹 ===================== +full_qpos = [] +full_qvel = [] + +for cycle in range(CYCLE_TIMES): + print(f"生成第 {cycle+1}/{CYCLE_TIMES} 个循环轨迹...") + # 按顺序拼接单循环的四个阶段 + full_qpos.extend([walk_qpos, stay_qpos, squat_qpos, dance_qpos]) + full_qvel.extend([walk_qvel, stay_qvel, squat_qvel, dance_qvel]) + +# 合并为numpy数组 +full_qpos = np.concatenate(full_qpos, axis=0) +full_qvel = np.concatenate(full_qvel, axis=0) + +# 验证总时长 +total_duration = len(full_qpos) / FPS +print(f"总轨迹:{CYCLE_TIMES}次循环 | {total_duration:.1f}秒 | {len(full_qpos)}步") + +# ===================== 6. 加载并播放轨迹 ===================== +jnt_names = [mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_JOINT, i) for i in range(model.njnt)] +traj_info = TrajectoryInfo( + jnt_names, + model=TrajectoryModel(model.njnt, jax.numpy.array(model.jnt_type)), + frequency=FPS +) +traj_data = TrajectoryData( + jax.numpy.array(full_qpos), + jax.numpy.array(full_qvel), + split_points=jax.numpy.array([0, len(full_qpos)]) +) +traj = Trajectory(traj_info, traj_data) +env.load_trajectory(traj) + +# 播放完整轨迹 +env.play_trajectory(n_steps_per_episode=len(full_qpos), render=True) \ No newline at end of file From acf3522491806266ff0f6fbb037fa3f692487fe8 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sun, 21 Dec 2025 09:56:05 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86triple.py?= =?UTF-8?q?=E4=B8=ADdance=E7=9A=84=E6=95=88=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/LocoMuJoCo/triple.py | 137 ++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 23 deletions(-) diff --git a/src/LocoMuJoCo/triple.py b/src/LocoMuJoCo/triple.py index f326d71489..9539f4f821 100644 --- a/src/LocoMuJoCo/triple.py +++ b/src/LocoMuJoCo/triple.py @@ -9,20 +9,20 @@ env = ImitationFactory.make( "UnitreeH1", n_substeps=20, - default_dataset_conf=DefaultDatasetConf(["walk", "squat"]), # 行走和下蹲属于默认数据集 - lafan1_dataset_conf=LAFAN1DatasetConf(["dance2_subject4"]) # 舞蹈属于LAFAN1数据集 + default_dataset_conf=DefaultDatasetConf(["walk", "squat"]), + lafan1_dataset_conf=LAFAN1DatasetConf(["dance1_subject3"]) # 替换为dance1_subject3 ) env.reset(jax.random.PRNGKey(0)) -ENV_DT = env.dt # 环境时间步长(默认0.02s) +ENV_DT = env.dt FPS = int(1 / ENV_DT) print(f"环境参数:dt={ENV_DT}s | FPS={FPS}") # ===================== 2. 配置各阶段时长及循环参数 ===================== -WALK_DURATION = 8 # 行走8秒 -STAY_DURATION = 1 # 停留1秒 -SQUAT_DURATION = 10 # 下蹲10秒 -DANCE_DURATION = 10 # 跳舞10秒 -CYCLE_TIMES = 3 # 循环次数(可调整) +WALK_DURATION = 7 +STAY_DURATION = 1 +SQUAT_DURATION = 8 +DANCE_DURATION = 15 +CYCLE_TIMES = 2 # 计算各阶段步数 WALK_STEPS = int(WALK_DURATION * FPS) @@ -32,7 +32,7 @@ SINGLE_CYCLE_STEPS = WALK_STEPS + STAY_STEPS + SQUAT_STEPS + DANCE_STEPS TOTAL_STEPS = SINGLE_CYCLE_STEPS * CYCLE_TIMES -# ===================== 3. 轨迹片段生成函数 ===================== +# ===================== 3. 轨迹片段生成与增强函数 ===================== def get_trajectory_segment(env, dataset_type, dataset_name, target_steps): """获取指定数据集的轨迹片段并调整到目标步数""" if dataset_type == "default": @@ -62,9 +62,73 @@ def get_trajectory_segment(env, dataset_type, dataset_name, target_steps): traj_qvel = [raw_qvel[i % len(raw_qvel)] for i in range(target_steps)] return np.array(traj_qpos), np.array(traj_qvel) +def enhance_dance_movement(q_data, model, is_velocity=False, intensity=1.2, speed_factor=1.0): + """ + 增强舞蹈动作:放大关节运动幅度并调整速度 + :param q_data: qpos(位置)或 qvel(速度)数组 + :param model: mujoco模型 + :param is_velocity: 是否是速度数据(qvel) + :param intensity: 动作幅度增强系数 + :param speed_factor: 速度调整系数 + :return: 增强后的轨迹数据 + """ + # 1. 先获取有效维度,避免索引越界 + max_dim = q_data.shape[1] # 获取q_data的第二维大小(比如25) + print(f"当前数据维度:{q_data.shape},最大有效索引:{max_dim-1}") + + # 2. 识别关键运动关节(只保留模型中存在的关节) + motion_joints = [ + "left_shoulder", "right_shoulder", + "left_elbow", "right_elbow", + "left_hip", "right_hip", + "left_knee", "right_knee" + ] + + # 3. 安全获取关节索引(过滤掉不存在/越界的索引) + joint_indices = [] + for jnt_name in motion_joints: + try: + # 获取关节索引(处理返回列表/单个值的情况) + idx = mj_jntname2qposid(jnt_name, model) + if isinstance(idx, list): + joint_indices.extend(idx) + else: + joint_indices.append(idx) + except: + continue # 忽略不存在的关节 + + # 4. 过滤掉超出维度范围的索引(核心修复点) + joint_indices = [idx for idx in joint_indices if idx < max_dim] + print(f"有效关节索引:{joint_indices}") + + # 5. 增强关节运动幅度(只处理角度关节,跳过前7个根关节) + enhanced_data = q_data.copy() + for idx in joint_indices: + if idx >= 7: # 跳过根关节(位置/旋转),只增强肢体关节 + enhanced_data[:, idx] = q_data[:, idx] * intensity + + # 6. 速度调整(仅对位置数据qpos生效,避免qvel重复调整) + if not is_velocity and speed_factor != 1.0: + orig_length = len(enhanced_data) + new_length = int(orig_length * speed_factor) + # 线性插值重采样 + time_orig = np.linspace(0, 1, orig_length) + time_new = np.linspace(0, 1, new_length) + enhanced_data = np.array([ + np.interp(time_new, time_orig, enhanced_data[:, i]) + for i in range(enhanced_data.shape[1]) + ]).T + + return enhanced_data + +def blend_trajectories(traj1, traj2, blend_ratio=0.5): + """混合两个轨迹,实现平滑过渡""" + min_len = min(len(traj1), len(traj2)) + return traj1[:min_len] * (1 - blend_ratio) + traj2[:min_len] * blend_ratio + # ===================== 4. 预生成单周期轨迹片段 ===================== model = env.get_model() -root_joint_ind = mj_jntname2qposid("root", model) # 获取根关节索引 +root_joint_ind = mj_jntname2qposid("root", model) # 4.1 行走阶段 walk_qpos, walk_qvel = get_trajectory_segment( @@ -72,27 +136,52 @@ def get_trajectory_segment(env, dataset_type, dataset_name, target_steps): ) print(f"行走阶段:{WALK_DURATION}秒 | {WALK_STEPS}步") -# 4.2 停留阶段(复用行走结束姿态,速度归零) +# 4.2 停留阶段 last_walk_qpos = walk_qpos[-1].copy() last_walk_qvel = np.zeros_like(walk_qvel[0]) stay_qpos = np.tile(last_walk_qpos, (STAY_STEPS, 1)) stay_qvel = np.tile(last_walk_qvel, (STAY_STEPS, 1)) print(f"停留阶段:{STAY_DURATION}秒 | {STAY_STEPS}步") -# 4.3 下蹲阶段(固定根位置为行走结束位置) +# 4.3 下蹲阶段 squat_qpos, squat_qvel = get_trajectory_segment( env, "default", "squat", SQUAT_STEPS ) -root_pos_walk_end = walk_qpos[-1, root_joint_ind[:2]] # 获取行走结束时的x/y位置 -squat_qpos[:, root_joint_ind[:2]] = root_pos_walk_end # 固定下蹲时的根位置 +root_pos_walk_end = walk_qpos[-1, root_joint_ind[:2]] +squat_qpos[:, root_joint_ind[:2]] = root_pos_walk_end print(f"下蹲阶段:{SQUAT_DURATION}秒 | {SQUAT_STEPS}步") -# 4.4 跳舞阶段(固定根位置为行走结束位置) -dance_qpos, dance_qvel = get_trajectory_segment( - env, "lafan1", "dance2_subject4", DANCE_STEPS +# 4.4 增强舞蹈阶段(使用dance1_subject3数据集) +# 获取dance1_subject3的轨迹(可使用同一数据集的不同片段或重复使用) +dance1_qpos, dance1_qvel = get_trajectory_segment( + env, "lafan1", "dance1_subject2", DANCE_STEPS # 替换为dance1_subject2 ) -dance_qpos[:, root_joint_ind[:2]] = root_pos_walk_end # 固定跳舞时的根位置 -print(f"跳舞阶段:{DANCE_DURATION}秒 | {DANCE_STEPS}步") +dance2_qpos, dance2_qvel = get_trajectory_segment( + env, "lafan1", "dance1_subject2", DANCE_STEPS # 替换为dance1_subject2(用于混合) +) + +# 混合同一数据集的轨迹(可增加多样性) +blended_qpos = blend_trajectories(dance1_qpos, dance2_qpos, blend_ratio=0.6) +blended_qvel = blend_trajectories(dance1_qvel, dance2_qvel, blend_ratio=0.6) + +# 分阶段增强 +mid_point = DANCE_STEPS // 2 +# 处理qpos(位置数据) +dance_qpos = np.vstack([ + enhance_dance_movement(blended_qpos[:mid_point], model, is_velocity=False, intensity=1.3, speed_factor=1.0), + enhance_dance_movement(blended_qpos[mid_point:], model, is_velocity=False, intensity=1.5, speed_factor=1.2) +]) +# 处理qvel(速度数据) +dance_qvel = np.vstack([ + enhance_dance_movement(blended_qvel[:mid_point], model, is_velocity=True, intensity=1.0, speed_factor=1.0), + enhance_dance_movement(blended_qvel[mid_point:], model, is_velocity=True, intensity=1.0, speed_factor=1.2) +]) + +# 确保长度正确并固定根位置 +dance_qpos = dance_qpos[:DANCE_STEPS] +dance_qvel = dance_qvel[:DANCE_STEPS] +dance_qpos[:, root_joint_ind[:2]] = root_pos_walk_end +print(f"增强舞蹈阶段:{DANCE_DURATION}秒 | {DANCE_STEPS}步") # ===================== 5. 生成多循环完整轨迹 ===================== full_qpos = [] @@ -100,15 +189,17 @@ def get_trajectory_segment(env, dataset_type, dataset_name, target_steps): for cycle in range(CYCLE_TIMES): print(f"生成第 {cycle+1}/{CYCLE_TIMES} 个循环轨迹...") - # 按顺序拼接单循环的四个阶段 - full_qpos.extend([walk_qpos, stay_qpos, squat_qpos, dance_qpos]) + # 每个循环使用不同的混合比例增加多样性 + cycle_blend = 0.5 + 0.3 * np.sin(cycle * np.pi / 2) + cycle_dance_qpos = blend_trajectories(dance1_qpos, dance2_qpos, cycle_blend)[:DANCE_STEPS] + cycle_dance_qpos[:, root_joint_ind[:2]] = root_pos_walk_end + + full_qpos.extend([walk_qpos, stay_qpos, squat_qpos, cycle_dance_qpos]) full_qvel.extend([walk_qvel, stay_qvel, squat_qvel, dance_qvel]) -# 合并为numpy数组 full_qpos = np.concatenate(full_qpos, axis=0) full_qvel = np.concatenate(full_qvel, axis=0) -# 验证总时长 total_duration = len(full_qpos) / FPS print(f"总轨迹:{CYCLE_TIMES}次循环 | {total_duration:.1f}秒 | {len(full_qpos)}步") From b539c3b58883a2ac38c73a84e1c32727b033c845 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sun, 21 Dec 2025 12:00:14 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=96=B0=E7=9A=84doubl?= =?UTF-8?q?e=5Fact=E6=96=87=E4=BB=B6=EF=BC=8C=E6=94=AF=E6=8C=81=E6=9C=BA?= =?UTF-8?q?=E5=99=A8=E4=BA=BA=E5=AE=9E=E7=8E=B0=E2=80=98=E6=8A=98=E8=BF=94?= =?UTF-8?q?=E8=B7=91+=E8=B7=B3=E8=B7=83=E2=80=99=E5=8A=A8=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/LocoMuJoCo/double_action3.py | 133 +++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/LocoMuJoCo/double_action3.py diff --git a/src/LocoMuJoCo/double_action3.py b/src/LocoMuJoCo/double_action3.py new file mode 100644 index 0000000000..c1e3fac004 --- /dev/null +++ b/src/LocoMuJoCo/double_action3.py @@ -0,0 +1,133 @@ +import numpy as np +import jax +import mujoco +from loco_mujoco.task_factories import ImitationFactory, LAFAN1DatasetConf +from loco_mujoco.trajectory import Trajectory, TrajectoryInfo, TrajectoryModel, TrajectoryData +from loco_mujoco.core.utils.mujoco import mj_jntname2qposid + +# ===================== 1. 初始化环境(仅使用LAFAN1数据集) ===================== +env = ImitationFactory.make( + "UnitreeH1", + n_substeps=20, + lafan1_dataset_conf=LAFAN1DatasetConf(["run2_subject1", "jumps1_subject1"]) # 仅使用LAFAN1数据集 +) +env.reset(jax.random.PRNGKey(0)) +ENV_DT = env.dt # 环境时间步长(通常为0.02s) +FPS = int(1 / ENV_DT) # 计算帧率(通常为50FPS) +print(f"环境参数:dt={ENV_DT}s | FPS={FPS}") + +# ===================== 2. 配置各阶段时长与步数 ===================== +MOVE_DURATION = 14 # 移动时长14秒 +STAY_DURATION = 1 # 停留时长1秒 +JUMP_DURATION = 15 # 跳跃时长15秒 +CYCLE_TIMES = 3 # 循环次数 + +# 计算各阶段步数 +MOVE_STEPS = int(MOVE_DURATION * FPS) +STAY_STEPS = int(STAY_DURATION * FPS) +JUMP_STEPS = int(JUMP_DURATION * FPS) +SINGLE_CYCLE_STEPS = MOVE_STEPS + STAY_STEPS + JUMP_STEPS +TOTAL_STEPS = SINGLE_CYCLE_STEPS * CYCLE_TIMES + +# 修正打印信息与实际配置一致 +print(f"单循环结构:移动{7}s({MOVE_STEPS}步) + 停留{1}s({STAY_STEPS}步) + 跳跃{12}s({JUMP_STEPS}步)") +print(f"总循环次数:{CYCLE_TIMES}次 | 总时长:{CYCLE_TIMES*(7+1+12)}s") + +# ===================== 3. 轨迹片段生成函数 ===================== +def get_lafan_trajectory(env, dataset_name, target_steps): + """从LAFAN1数据集提取轨迹并调整到目标步数""" + # 创建临时环境加载指定LAFAN1数据集 + temp_env = ImitationFactory.make( + "UnitreeH1", + lafan1_dataset_conf=LAFAN1DatasetConf([dataset_name]), + n_substeps=env._n_substeps + ) + + temp_env.reset(jax.random.PRNGKey(0)) + # 直接从轨迹处理器获取完整轨迹数据(与参考代码保持一致的访问方式) + raw_qpos = np.array(temp_env.th.traj.data.qpos) + raw_qvel = np.array(temp_env.th.traj.data.qvel) + + # 校验轨迹有效性 + if len(raw_qpos) == 0: + raise ValueError(f"LAFAN1数据集 {dataset_name} 无有效轨迹数据!") + + # 循环填充轨迹至目标步数(保持动作连续性) + traj_qpos = [] + traj_qvel = [] + for i in range(target_steps): + idx = i % len(raw_qpos) # 循环索引避免越界 + traj_qpos.append(raw_qpos[idx]) + traj_qvel.append(raw_qvel[idx]) + return np.array(traj_qpos), np.array(traj_qvel) + +# ===================== 4. 生成各阶段轨迹(全部使用LAFAN1数据集) ===================== +model = env.get_model() + +# 4.1 移动轨迹(LAFAN1的跑步动作) +move_qpos, move_qvel = get_lafan_trajectory( + env, "run2_subject1", MOVE_STEPS +) +print(f"移动轨迹生成完成:{MOVE_STEPS}步") + +# 4.2 停留轨迹(优化版:完全继承移动终点状态,确保连贯性) +# 复用移动阶段最后一帧的完整姿态(包括所有关节位置) +last_move_qpos = move_qpos[-1].copy() +# 速度严格归零(避免微小抖动) +last_move_qvel = np.zeros_like(move_qvel[0]) +# 生成停留阶段轨迹(与参考代码的实现方式一致) +stay_qpos = np.tile(last_move_qpos, (STAY_STEPS, 1)) +stay_qvel = np.tile(last_move_qvel, (STAY_STEPS, 1)) +# 验证停留阶段的根位置与移动终点一致 +root_joint_ind = mj_jntname2qposid("root", model) +print(f"停留阶段根位置:{stay_qpos[0, root_joint_ind[:2]]}(与移动终点一致)") +print(f"停留轨迹生成完成:{STAY_STEPS}步") + +# 4.3 跳跃轨迹(LAFAN1的跳跃动作) +jump_qpos, jump_qvel = get_lafan_trajectory( + env, "jumps1_subject1", JUMP_STEPS +) +# 固定根位置(保持在移动结束时的位置,实现原地跳跃) +root_pos_move_end = move_qpos[-1, root_joint_ind[:2]] # 获取移动结束时的x/y位置 +jump_qpos[:, root_joint_ind[:2]] = root_pos_move_end # 固定跳跃时的根位置 +print(f"跳跃轨迹生成完成:{JUMP_STEPS}步") + +# ===================== 5. 拼接多循环完整轨迹 ===================== +full_qpos = [] +full_qvel = [] + +for cycle in range(CYCLE_TIMES): + print(f"生成第 {cycle+1}/{CYCLE_TIMES} 个循环...") + # 循环内拼接:移动 -> 停留 -> 跳跃 + full_qpos.extend([move_qpos, stay_qpos, jump_qpos]) + full_qvel.extend([move_qvel, stay_qvel, jump_qvel]) + +# 合并为numpy数组 +full_qpos = np.concatenate(full_qpos, axis=0) +full_qvel = np.concatenate(full_qvel, axis=0) + +# 验证总轨迹长度 +print(f"完整轨迹生成完成:{len(full_qpos)}步 | 实际总时长:{len(full_qpos)/FPS:.1f}s") + +# ===================== 6. 加载轨迹并播放 ===================== +# 创建轨迹元信息 +jnt_names = [mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_JOINT, i) for i in range(model.njnt)] +traj_info = TrajectoryInfo( + jnt_names, + model=TrajectoryModel(model.njnt, jax.numpy.array(model.jnt_type)), + frequency=FPS +) + +# 创建轨迹数据对象 +traj_data = TrajectoryData( + jax.numpy.array(full_qpos), + jax.numpy.array(full_qvel), + split_points=jax.numpy.array([0, len(full_qpos)]) +) + +# 组合轨迹并加载到环境 +traj = Trajectory(traj_info, traj_data) +env.load_trajectory(traj) + +# 播放轨迹(render=True显示可视化) +env.play_trajectory(n_steps_per_episode=len(full_qpos), render=True) \ No newline at end of file From 6e5174698f376c34535dc50dea62828f9e7be2c7 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sun, 21 Dec 2025 13:45:41 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0double4=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E6=AD=A4=E6=AC=A1=E4=BB=A3=E7=A0=81=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E2=80=98=E6=91=94=E5=80=92=E4=B8=8E=E8=B5=B7=E8=BA=AB?= =?UTF-8?q?=E2=80=99=E5=92=8C=E2=80=98=E5=86=B2=E5=88=BA=E2=80=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/LocoMuJoCo/double4.py | 124 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 src/LocoMuJoCo/double4.py diff --git a/src/LocoMuJoCo/double4.py b/src/LocoMuJoCo/double4.py new file mode 100644 index 0000000000..689bd36f70 --- /dev/null +++ b/src/LocoMuJoCo/double4.py @@ -0,0 +1,124 @@ +import numpy as np +import jax +import mujoco +from loco_mujoco.task_factories import ImitationFactory, LAFAN1DatasetConf +from loco_mujoco.trajectory import Trajectory, TrajectoryInfo, TrajectoryModel, TrajectoryData +from loco_mujoco.core.utils.mujoco import mj_jntname2qposid + +# ===================== 1. 初始化环境(仅使用LAFAN1数据集) ===================== +env = ImitationFactory.make( + "UnitreeH1", + n_substeps=20, + lafan1_dataset_conf=LAFAN1DatasetConf(["fallAndGetUp1_subject1", "sprint1_subject4"]) # 摔倒起身和冲刺数据集 +) +env.reset(jax.random.PRNGKey(0)) +ENV_DT = env.dt # 环境时间步长(通常为0.02s) +FPS = int(1 / ENV_DT) # 计算帧率(通常为50FPS) +print(f"环境参数:dt={ENV_DT}s | FPS={FPS}") + +# ===================== 2. 配置各阶段时长与步数(修改为:摔倒起身+冲刺 循环) ===================== +FALL_GETUP_DURATION = 14 # 摔倒与起身时长14秒 +SPRINT_DURATION = 17 # 冲刺时长17秒 +CYCLE_TIMES = 3 # 循环次数 + +# 计算各阶段步数 +FALL_GETUP_STEPS = int(FALL_GETUP_DURATION * FPS) +SPRINT_STEPS = int(SPRINT_DURATION * FPS) +SINGLE_CYCLE_STEPS = FALL_GETUP_STEPS + SPRINT_STEPS # 单循环:摔倒起身 + 冲刺 +TOTAL_STEPS = SINGLE_CYCLE_STEPS * CYCLE_TIMES + +# 打印配置信息 +print(f"单循环结构:摔倒与起身{FALL_GETUP_DURATION}s({FALL_GETUP_STEPS}步) + 冲刺{SPRINT_DURATION}s({SPRINT_STEPS}步)") +print(f"总循环次数:{CYCLE_TIMES}次 | 总时长:{CYCLE_TIMES*(FALL_GETUP_DURATION + SPRINT_DURATION)}s") + +# ===================== 3. 轨迹片段生成函数 ===================== +def get_lafan_trajectory(env, dataset_name, target_steps): + """从LAFAN1数据集提取轨迹并调整到目标步数""" + # 创建临时环境加载指定LAFAN1数据集 + temp_env = ImitationFactory.make( + "UnitreeH1", + lafan1_dataset_conf=LAFAN1DatasetConf([dataset_name]), + n_substeps=env._n_substeps + ) + + temp_env.reset(jax.random.PRNGKey(0)) + # 从轨迹处理器获取完整轨迹数据 + raw_qpos = np.array(temp_env.th.traj.data.qpos) + raw_qvel = np.array(temp_env.th.traj.data.qvel) + + # 校验轨迹有效性 + if len(raw_qpos) == 0: + raise ValueError(f"LAFAN1数据集 {dataset_name} 无有效轨迹数据!") + + # 循环填充轨迹至目标步数(保持动作连续性) + traj_qpos = [] + traj_qvel = [] + for i in range(target_steps): + idx = i % len(raw_qpos) # 循环索引避免越界 + traj_qpos.append(raw_qpos[idx]) + traj_qvel.append(raw_qvel[idx]) + return np.array(traj_qpos), np.array(traj_qvel) + +# ===================== 4. 生成各阶段轨迹(摔倒起身+冲刺) ===================== +model = env.get_model() +root_joint_ind = mj_jntname2qposid("root", model) # 获取根关节索引 + +# 4.1 摔倒与起身轨迹(LAFAN1的摔倒与起身动作) +fall_getup_qpos, fall_getup_qvel = get_lafan_trajectory( + env, "fallAndGetUp1_subject1", FALL_GETUP_STEPS +) +# 固定初始根位置(可选:设置为原点附近) +fall_getup_qpos[:, root_joint_ind[:2]] = np.array([0.0, 0.0]) # 原地摔倒起身 +print(f"摔倒与起身轨迹生成完成:{FALL_GETUP_STEPS}步") + +# 4.2 冲刺轨迹(与摔倒起身保持连贯) +sprint_qpos, sprint_qvel = get_lafan_trajectory( + env, "sprint1_subject4", SPRINT_STEPS +) +# 确保从摔倒起身的终点位置开始冲刺 +root_pos_getup_end = fall_getup_qpos[-1, root_joint_ind[:2]] +# 计算相对位移,保持冲刺动作的连贯性 +sprint_qpos[:, root_joint_ind[:2]] = root_pos_getup_end + ( + sprint_qpos[:, root_joint_ind[:2]] - sprint_qpos[0, root_joint_ind[:2]] +) +print(f"冲刺轨迹生成完成:{SPRINT_STEPS}步") + +# ===================== 5. 拼接多循环完整轨迹(摔倒起身+冲刺 循环) ===================== +full_qpos = [] +full_qvel = [] + +for cycle in range(CYCLE_TIMES): + print(f"生成第 {cycle+1}/{CYCLE_TIMES} 个循环...") + # 循环内拼接:摔倒起身 -> 冲刺 + full_qpos.extend([fall_getup_qpos, sprint_qpos]) + full_qvel.extend([fall_getup_qvel, sprint_qvel]) + +# 合并为numpy数组 +full_qpos = np.concatenate(full_qpos, axis=0) +full_qvel = np.concatenate(full_qvel, axis=0) + +# 验证总轨迹长度 +print(f"完整轨迹生成完成:{len(full_qpos)}步 | 实际总时长:{len(full_qpos)/FPS:.1f}s") + +# ===================== 6. 加载轨迹并播放 ===================== +# 创建轨迹元信息 +jnt_names = [mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_JOINT, i) for i in range(model.njnt)] +traj_info = TrajectoryInfo( + jnt_names, + model=TrajectoryModel(model.njnt, jax.numpy.array(model.jnt_type)), + frequency=FPS +) + +# 创建轨迹数据对象 +traj_data = TrajectoryData( + jax.numpy.array(full_qpos), + jax.numpy.array(full_qvel), + split_points=jax.numpy.array([0, len(full_qpos)]) +) + +# 组合轨迹并加载到环境 +traj = Trajectory(traj_info, traj_data) +env.load_trajectory(traj) + +# 播放轨迹(render=True显示可视化) +env.play_trajectory(n_steps_per_episode=len(full_qpos), render=True) \ No newline at end of file From b6a63507c57d18c3d3b81ac993449793bd08fc71 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sun, 21 Dec 2025 15:15:46 +0800 Subject: [PATCH 08/10] update --- src/LocoMuJoCo/fightsport.py | 133 +++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/LocoMuJoCo/fightsport.py diff --git a/src/LocoMuJoCo/fightsport.py b/src/LocoMuJoCo/fightsport.py new file mode 100644 index 0000000000..952ec964b8 --- /dev/null +++ b/src/LocoMuJoCo/fightsport.py @@ -0,0 +1,133 @@ +import numpy as np +import jax +import mujoco +from loco_mujoco.task_factories import ImitationFactory, LAFAN1DatasetConf +from loco_mujoco.trajectory import Trajectory, TrajectoryInfo, TrajectoryModel, TrajectoryData +from loco_mujoco.core.utils.mujoco import mj_jntname2qposid + +# ===================== 1. 初始化环境(仅使用LAFAN1数据集) ===================== +env = ImitationFactory.make( + "UnitreeH1", + n_substeps=20, + lafan1_dataset_conf=LAFAN1DatasetConf(["fight1_subject3", "fightAndSports1_subject4"]) # 仅使用LAFAN1数据集 +) +env.reset(jax.random.PRNGKey(0)) +ENV_DT = env.dt # 环境时间步长(通常为0.02s) +FPS = int(1 / ENV_DT) # 计算帧率(通常为50FPS) +print(f"环境参数:dt={ENV_DT}s | FPS={FPS}") + +# ===================== 2. 配置各阶段时长与步数 ===================== +MOVE_DURATION = 15 # 移动时长15秒 +STAY_DURATION = 1 # 停留时长1秒 +JUMP_DURATION = 15 # 跳跃时长15秒 +CYCLE_TIMES = 3 # 循环次数 + +# 计算各阶段步数 +MOVE_STEPS = int(MOVE_DURATION * FPS) +STAY_STEPS = int(STAY_DURATION * FPS) +JUMP_STEPS = int(JUMP_DURATION * FPS) +SINGLE_CYCLE_STEPS = MOVE_STEPS + STAY_STEPS + JUMP_STEPS +TOTAL_STEPS = SINGLE_CYCLE_STEPS * CYCLE_TIMES + +# 修正打印信息与实际配置一致 +print(f"单循环结构:移动{7}s({MOVE_STEPS}步) + 停留{1}s({STAY_STEPS}步) + 跳跃{12}s({JUMP_STEPS}步)") +print(f"总循环次数:{CYCLE_TIMES}次 | 总时长:{CYCLE_TIMES*(7+1+12)}s") + +# ===================== 3. 轨迹片段生成函数 ===================== +def get_lafan_trajectory(env, dataset_name, target_steps): + """从LAFAN1数据集提取轨迹并调整到目标步数""" + # 创建临时环境加载指定LAFAN1数据集 + temp_env = ImitationFactory.make( + "UnitreeH1", + lafan1_dataset_conf=LAFAN1DatasetConf([dataset_name]), + n_substeps=env._n_substeps + ) + + temp_env.reset(jax.random.PRNGKey(0)) + # 直接从轨迹处理器获取完整轨迹数据(与参考代码保持一致的访问方式) + raw_qpos = np.array(temp_env.th.traj.data.qpos) + raw_qvel = np.array(temp_env.th.traj.data.qvel) + + # 校验轨迹有效性 + if len(raw_qpos) == 0: + raise ValueError(f"LAFAN1数据集 {dataset_name} 无有效轨迹数据!") + + # 循环填充轨迹至目标步数(保持动作连续性) + traj_qpos = [] + traj_qvel = [] + for i in range(target_steps): + idx = i % len(raw_qpos) # 循环索引避免越界 + traj_qpos.append(raw_qpos[idx]) + traj_qvel.append(raw_qvel[idx]) + return np.array(traj_qpos), np.array(traj_qvel) + +# ===================== 4. 生成各阶段轨迹(全部使用LAFAN1数据集) ===================== +model = env.get_model() + +# 4.1 移动轨迹(LAFAN1的跑步动作) +move_qpos, move_qvel = get_lafan_trajectory( + env, "fight1_subject3", MOVE_STEPS +) +print(f"移动轨迹生成完成:{MOVE_STEPS}步") + +# 4.2 停留轨迹(优化版:完全继承移动终点状态,确保连贯性) +# 复用移动阶段最后一帧的完整姿态(包括所有关节位置) +last_move_qpos = move_qpos[-1].copy() +# 速度严格归零(避免微小抖动) +last_move_qvel = np.zeros_like(move_qvel[0]) +# 生成停留阶段轨迹(与参考代码的实现方式一致) +stay_qpos = np.tile(last_move_qpos, (STAY_STEPS, 1)) +stay_qvel = np.tile(last_move_qvel, (STAY_STEPS, 1)) +# 验证停留阶段的根位置与移动终点一致 +root_joint_ind = mj_jntname2qposid("root", model) +print(f"停留阶段根位置:{stay_qpos[0, root_joint_ind[:2]]}(与移动终点一致)") +print(f"停留轨迹生成完成:{STAY_STEPS}步") + +# 4.3 跳跃轨迹(LAFAN1的跳跃动作) +jump_qpos, jump_qvel = get_lafan_trajectory( + env, "fightAndSports1_subject4", JUMP_STEPS +) +# 固定根位置(保持在移动结束时的位置,实现原地跳跃) +root_pos_move_end = move_qpos[-1, root_joint_ind[:2]] # 获取移动结束时的x/y位置 +jump_qpos[:, root_joint_ind[:2]] = root_pos_move_end # 固定跳跃时的根位置 +print(f"跳跃轨迹生成完成:{JUMP_STEPS}步") + +# ===================== 5. 拼接多循环完整轨迹 ===================== +full_qpos = [] +full_qvel = [] + +for cycle in range(CYCLE_TIMES): + print(f"生成第 {cycle+1}/{CYCLE_TIMES} 个循环...") + # 循环内拼接:移动 -> 停留 -> 跳跃 + full_qpos.extend([move_qpos, stay_qpos, jump_qpos]) + full_qvel.extend([move_qvel, stay_qvel, jump_qvel]) + +# 合并为numpy数组 +full_qpos = np.concatenate(full_qpos, axis=0) +full_qvel = np.concatenate(full_qvel, axis=0) + +# 验证总轨迹长度 +print(f"完整轨迹生成完成:{len(full_qpos)}步 | 实际总时长:{len(full_qpos)/FPS:.1f}s") + +# ===================== 6. 加载轨迹并播放 ===================== +# 创建轨迹元信息 +jnt_names = [mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_JOINT, i) for i in range(model.njnt)] +traj_info = TrajectoryInfo( + jnt_names, + model=TrajectoryModel(model.njnt, jax.numpy.array(model.jnt_type)), + frequency=FPS +) + +# 创建轨迹数据对象 +traj_data = TrajectoryData( + jax.numpy.array(full_qpos), + jax.numpy.array(full_qvel), + split_points=jax.numpy.array([0, len(full_qpos)]) +) + +# 组合轨迹并加载到环境 +traj = Trajectory(traj_info, traj_data) +env.load_trajectory(traj) + +# 播放轨迹(render=True显示可视化) +env.play_trajectory(n_steps_per_episode=len(full_qpos), render=True) \ No newline at end of file From 1748d661442085b2c9e173228b3ae8dcdc708efd Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Mon, 22 Dec 2025 10:20:18 +0800 Subject: [PATCH 09/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0ros2=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ros2/ros2_py/robot_controller.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 src/LocoMuJoCo/ros2/ros2_py/robot_controller.py diff --git a/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py b/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py new file mode 100644 index 0000000000..b82a5de541 --- /dev/null +++ b/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py @@ -0,0 +1,95 @@ +import sys +import select +import rclpy +from rclpy.node import Node +from my_interfaces.srv import RobotCtrl + +# 全局变量(提前初始化,避免未绑定) +current_action_key = 'stop' +action_switch_flag = False + + +class RobotController(Node): + def __init__(self, name): + super().__init__(name) + # 创建服务客户端,连接机器人控制服务 + self.client = self.create_client(RobotCtrl, 'robot_control') + # 等待服务上线 + while not self.client.wait_for_service(timeout_sec=1.0): + self.get_logger().info('等待服务 "robot_control" 启动...') + self.get_logger().info("操作提示:w=行走 | s=下蹲 | q=退出 | 其他键=停止") + + def result_callback(self, future): + """服务调用结果的回调函数""" + + response = future.result() + self.get_logger().info(f'指令执行成功:{response.message}') + + + def send_request(self, action): + """发送控制指令到服务端""" + + request = RobotCtrl.Request() + request.command = action # 传递动作指令 + self.client.call_async(request).add_done_callback(self.result_callback) + self.get_logger().info(f'已发送指令:{action}') + + + +def read_keypress_non_blocking(): + """非阻塞读取终端按键""" + if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): + key = sys.stdin.read(1) + return key.strip() + return None + + +def main(args=None): + # 关键:声明使用全局变量(修复核心错误) + global current_action_key, action_switch_flag + + rclpy.init(args=args) + robot_controller = RobotController("robot_controller") + + # 初始发送停止指令(此时current_action_key已绑定全局值'stop') + robot_controller.send_request(current_action_key) + + try: + # 主循环:非阻塞读取按键 + 处理ROS事件 + while rclpy.ok(): + # 1. 读取终端按键 + key = read_keypress_non_blocking() + if key: + # 按键逻辑 + if key == 'w' and current_action_key != "walk": + current_action_key = "walk" + action_switch_flag = True + elif key == 's' and current_action_key != "squat": + current_action_key = "squat" + action_switch_flag = True + elif key == 'q': # q键退出 + robot_controller.get_logger().info("收到退出指令,程序终止") + break + elif key not in ['w', 's']: # 其他键停止 + if current_action_key != "stop": + current_action_key = "stop" + action_switch_flag = True + + # 2. 检测动作切换并发送指令 + if action_switch_flag: + robot_controller.send_request(current_action_key) + action_switch_flag = False # 重置标记 + + # 3. 非阻塞处理ROS回调 + rclpy.spin_once(robot_controller, timeout_sec=0.01) + + except KeyboardInterrupt: + robot_controller.get_logger().info("用户强制退出") + finally: + robot_controller.destroy_node() + rclpy.shutdown() + print("程序已退出") + + +if __name__ == '__main__': + main() \ No newline at end of file From d0d9de35f741137dd33aecfddfe1bdd0a17a1784 Mon Sep 17 00:00:00 2001 From: blank885 <1160821687@qq.com> Date: Sun, 28 Dec 2025 15:05:55 +0800 Subject: [PATCH 10/10] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86test.py?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=9C=BA=E5=99=A8=E4=BA=BA=E6=84=9F=E7=9F=A5?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=EF=BC=8C=E8=A7=A6=E7=A2=B0=E7=BA=A2=E8=89=B2?= =?UTF-8?q?=E5=9C=86=E6=9F=B1=E5=81=9C=E4=B8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/LocoMuJoCo/test.py | 66 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/LocoMuJoCo/test.py diff --git a/src/LocoMuJoCo/test.py b/src/LocoMuJoCo/test.py new file mode 100644 index 0000000000..57d78354f5 --- /dev/null +++ b/src/LocoMuJoCo/test.py @@ -0,0 +1,66 @@ +import numpy as np +import mujoco +from mujoco import MjSpec +from loco_mujoco import ImitationFactory +from loco_mujoco import PATH_TO_MODELS +import time + +def main(): + # 配置环境参数 + env_name = "UnitreeH1" + task = "walk" + n_episodes = 10 + n_steps_per_episode = 500 + + # 创建自定义模型规格,添加几何体 + def create_custom_spec(): + # 加载UnitreeH1的默认模型 + h1_model_path = PATH_TO_MODELS / "unitree_h1/h1.xml" + spec = MjSpec.from_file(str(h1_model_path)) + + # 打印所有身体名称,确认地板身体的正确名称 + print("模型中所有身体名称:") + for body in spec.bodies: + print(body.name) + + # 尝试查找地板身体(根据打印结果,模型中没有floor/ground,使用world作为父节点) + floor_body = spec.find_body("floor") + if floor_body is None: + floor_body = spec.find_body("ground") + if floor_body is None: + # 若仍找不到,使用根节点"world"作为父节点(模型中存在"world"身体) + floor_body = spec.find_body("world") + print("使用根节点'world'作为几何体的父节点") + + # 定义圆柱体几何体属性(size改为3个元素,符合Mujoco要求) + cylinder_attr = dict( + type=mujoco.mjtGeom.mjGEOM_CYLINDER, + size=[0.3, 1, 0.0], # 修正:[半径, 半长度, 0](必须3个元素) + pos=[8, 0, 1.0], # 位置(相对于父节点world) + rgba=[1.0, 0.0, 0.0, 0.5], # 红色半透明 + contype=1, + conaffinity=1 + ) + + # 向父节点添加几何体 + floor_body.add_geom(name="custom_cylinder", **cylinder_attr) + + return spec + + # 创建带自定义几何体的环境 + custom_spec = create_custom_spec() + env = ImitationFactory.make( + env_name, + default_dataset_conf=dict(task=task), + spec=custom_spec # 传入自定义规格 + ) + + # 播放轨迹并渲染 + env.play_trajectory( + n_episodes=n_episodes, + n_steps_per_episode=n_steps_per_episode, + render=True + ) + +if __name__ == "__main__": + main() \ No newline at end of file