diff --git a/src/Neuro_Mujoco/README.md b/src/Neuro_Mujoco/README.md index 7cae468cb6..ed05b6ba35 100644 --- a/src/Neuro_Mujoco/README.md +++ b/src/Neuro_Mujoco/README.md @@ -44,10 +44,29 @@ python mujoco_utils.py visualize /path/to/your/model.xml --ros --policy /path/to # 目录自动加载(选目录内第一个模型文件) python mujoco_utils.py visualize /home/lan/桌面/nn/mujoco_menagerie/anybotics_anymal_b + +# 启动交互式模型选择菜单(支持PD控制、预设位置切换) +python mujoco_utils.py menu + 1.2 交互说明 窗口操作:鼠标拖拽旋转、滚轮缩放,按 ESC 键退出; 关节状态:仅发布非自由关节(排除 mjJNT_FREE 类型)的位置 / 速度; 策略映射:输出 [-1,1] 自动线性缩放至执行器 ctrlrange 范围,并强制裁剪至物理极限。 +菜单模式专属指令: +- 预设位置切换:直接输入预设名称(如home/up/left,根据模型不同有所差异) +- 自定义关节角度:set <关节号> <角度>(示例:set 0 0.5,单位rad) +- 仿真控制:pause(暂停)/ resume(继续)/ exit(退出) +- ROS键盘控制(需ROS环境):在新终端运行 `python keyboard_control.py <关节数>` + + +## 支持的预设模型 +工具内置4种常用机器人模型,可通过交互式菜单直接选择: +1. Franka Panda(机械臂):7关节,支持home/up/left/right预设位置 +2. UR5 机械臂:6关节,支持home/up/forward预设位置 +3. Franka Panda(带手爪):8关节(含手爪),支持home/open_gripper/up_open预设位置 +4. Walker2d 机器人:6关节,支持stand/walk_left/walk_right预设位置 + + ### 模型格式转换 2.1 运行命令 shell @@ -103,6 +122,17 @@ class PolicyNetwork(nn.Module): 输出:归一化控制指令([-1,1]),自动映射至模型 actuator_ctrlrange 范围: python 运行 + +## PD控制器说明 +适用于模型精确位置控制,核心参数与逻辑: +1. 控制参数:每个模型预配置KP(比例增益)和KD(微分增益),如Franka Panda默认KP=800.0、KD=60.0 +2. 控制逻辑: + ```python + # 核心PD计算 + pos_error = 目标关节位置 - 当前关节位置 + vel_error = -当前关节速度 + 关节力矩 = KP * pos_error + KD * vel_error + ### 核心映射逻辑 action = ctrl_min + (ctrl_max - ctrl_min) * (action + 1) / 2 action = np.clip(action, ctrl_min, ctrl_max) # 强制裁剪至物理极限 diff --git a/src/Neuro_Mujoco/cakin_ws/src/mujoco_ros/scripts/keyboard_control.py b/src/Neuro_Mujoco/cakin_ws/src/mujoco_ros/scripts/keyboard_control.py new file mode 100644 index 0000000000..2768741edd --- /dev/null +++ b/src/Neuro_Mujoco/cakin_ws/src/mujoco_ros/scripts/keyboard_control.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import rospy +from std_msgs.msg import Float32MultiArray +from pynput import keyboard +import sys +import argparse + +# 全局变量 +pub = None +delta_pos = None +joint_num = 0 +step_size = 0.05 # 每次按键的角度增量(弧度) + +def on_press(key): + """键盘按下事件处理""" + global delta_pos + try: + # 将 delta_pos 重置为全零 + delta_pos = [0.0] * joint_num + + # 根据按键设置对应的关节增量 + if key.char == '1': + delta_pos[0] = step_size + elif key.char == 'q': + delta_pos[0] = -step_size + elif key.char == '2': + delta_pos[1] = step_size + elif key.char == 'w': + delta_pos[1] = -step_size + elif key.char == '3' and joint_num > 2: + delta_pos[2] = step_size + elif key.char == 'e' and joint_num > 2: + delta_pos[2] = -step_size + elif key.char == '4' and joint_num > 3: + delta_pos[3] = step_size + elif key.char == 'r' and joint_num > 3: + delta_pos[3] = -step_size + elif key.char == '5' and joint_num > 4: + delta_pos[4] = step_size + elif key.char == 't' and joint_num > 4: + delta_pos[4] = -step_size + elif key.char == '6' and joint_num > 5: + delta_pos[5] = step_size + elif key.char == 'y' and joint_num > 5: + delta_pos[5] = -step_size + elif key.char == '7' and joint_num > 6: + delta_pos[6] = step_size + elif key.char == 'u' and joint_num > 6: + delta_pos[6] = -step_size + elif key.char == '8' and joint_num > 7: + delta_pos[7] = step_size + elif key.char == 'i' and joint_num > 7: + delta_pos[7] = -step_size + + # 如果有增量,则发布消息 + if any(d != 0 for d in delta_pos): + msg = Float32MultiArray(data=delta_pos) + pub.publish(msg) + rospy.loginfo(f"发布关节增量: {delta_pos}") + + except AttributeError: + # 处理特殊按键(如方向键),这里我们忽略 + pass + +def on_release(key): + """键盘释放事件处理""" + # 当按键释放时,发布一个全零的增量,让机器人停止在当前位置 + if key == keyboard.Key.esc: + # 按下ESC键退出 + rospy.loginfo("检测到ESC键,退出键盘控制...") + return False + +def main(): + global pub, joint_num, delta_pos + rospy.init_node('keyboard_controller', anonymous=True) + + # 使用 argparse 接收命令行参数 + parser = argparse.ArgumentParser(description='ROS Keyboard Controller for MuJoCo') + parser.add_argument('joint_num', type=int, help='Number of joints to control (e.g., 7 for Franka)') + parser.add_argument('--step', type=float, default=0.05, help='Step size in radians for each key press.') + args = parser.parse_args(rospy.myargv(argv=sys.argv)[1:]) # 处理ROS参数 + + joint_num = args.joint_num + step_size = args.step + delta_pos = [0.0] * joint_num + + pub = rospy.Publisher('joint_position_delta', Float32MultiArray, queue_size=10) + + rospy.loginfo("="*50) + rospy.loginfo(" ROS 键盘控制器已启动 ") + rospy.loginfo(f"控制关节数: {joint_num}, 步长: {step_size} rad") + rospy.loginfo("按键映射:") + rospy.loginfo(" 关节1: '1' (增加) / 'q' (减少)") + rospy.loginfo(" 关节2: '2' (增加) / 'w' (减少)") + rospy.loginfo(" 关节3: '3' (增加) / 'e' (减少)") + rospy.loginfo(" 关节4: '4' (增加) / 'r' (减少)") + rospy.loginfo(" 关节5: '5' (增加) / 't' (减少)") + rospy.loginfo(" 关节6: '6' (增加) / 'y' (减少)") + rospy.loginfo(" 关节7: '7' (增加) / 'u' (减少)") + rospy.loginfo(" 关节8: '8' (增加) / 'i' (减少)") + rospy.loginfo(" 退出: 按 'ESC' 键") + rospy.loginfo("="*50) + + # 启动键盘监听 + with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: + listener.join() + +if __name__ == '__main__': + try: + main() + except rospy.ROSInterruptException: + pass \ No newline at end of file diff --git a/src/Neuro_Mujoco/main.py b/src/Neuro_Mujoco/main.py index e43bbd4182..32c0735296 100644 --- a/src/Neuro_Mujoco/main.py +++ b/src/Neuro_Mujoco/main.py @@ -9,6 +9,7 @@ import mujoco from mujoco import viewer +# ===================== 依赖导入 - ROS 1 ===================== # ===================== 核心路径配置(相对路径)===================== # 获取当前脚本所在目录(所有相对路径的基准) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -26,11 +27,17 @@ except ImportError: logging.warning("ROS环境未检测到,ROS功能禁用") +# ===================== 动态路径配置 (已修改为相对路径) ===================== +# 获取当前脚本所在的目录 +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + # ===================== 多模型配置(修正预设指令 + 相对路径)===================== MODEL_CONFIGS = { 1: { "name": "Franka Panda(机械臂)", "key": "franka", + # 使用 os.path.join 拼接相对路径 + "path": os.path.join(SCRIPT_DIR, "mujoco_menagerie/franka_emika_panda/panda.xml"), "path": os.path.join(MODEL_ROOT, "franka_emika_panda/panda.xml"), "joint_num": 7, "pd_params": {"KP": 800.0, "KD": 60.0}, @@ -46,6 +53,7 @@ 2: { "name": "UR5 机械臂", "key": "ur5", + "path": os.path.join(SCRIPT_DIR, "mujoco_menagerie/universal_robots_ur5e/ur5e.xml"), "path": os.path.join(MODEL_ROOT, "universal_robots_ur5e/ur5e.xml"), "joint_num": 6, "pd_params": {"KP": 700.0, "KD": 50.0}, @@ -60,6 +68,7 @@ 3: { "name": "Franka Panda(带手爪)", "key": "franka_gripper", + "path": os.path.join(SCRIPT_DIR, "mujoco_menagerie/franka_emika_panda/panda_gripper.xml"), "path": os.path.join(MODEL_ROOT, "franka_emika_panda/panda_gripper.xml"), "joint_num": 8, "pd_params": {"KP": 800.0, "KD": 60.0}, @@ -74,6 +83,7 @@ 4: { "name": "Walker2d 机器人", "key": "walker2d", + "path": os.path.join(SCRIPT_DIR, "mujoco_menagerie/walker2d/walker2d.xml"), "path": os.path.join(MODEL_ROOT, "walker2d/walker2d.xml"), "joint_num": 6, "pd_params": {"KP": 1000.0, "KD": 80.0}, @@ -87,6 +97,7 @@ } } +# ===================== 全局变量 ===================== # ===================== 全局变量(精简版)===================== CURRENT_CONFIG = None TARGET_JOINT_POS = None @@ -96,6 +107,7 @@ SIMULATION_RUNNING = False CMD_LOCK = threading.Lock() +# ===================== 日志配置 ===================== # ===================== 日志配置(精简输出)===================== logging.basicConfig( level=logging.INFO, @@ -105,6 +117,9 @@ ) logger = logging.getLogger("mujoco_control_tool") +# ===================== 核心功能函数 ===================== +def load_mujoco_model(model_path: str) -> Tuple[Optional[mujoco.MjModel], Optional[mujoco.MjData]]: + """加载MuJoCo模型""" # ===================== 核心功能函数(精简版 + 路径调试)===================== def load_mujoco_model(model_path: str) -> Tuple[Optional[mujoco.MjModel], Optional[mujoco.MjData]]: """加载MuJoCo模型(增加路径调试信息)""" @@ -167,6 +182,83 @@ def pd_controller(model: mujoco.MjModel, data: mujoco.MjData): data.ctrl[:joint_num] = joint_torque +def simulation_worker(model, data, viewer_instance): + """仿真工作线程""" + global SIMULATION_RUNNING, SIMULATION_PAUSE + step_counter = 0 + sim_frequency = 50 + sim_interval = 1.0 / sim_frequency + + # 适配相机视角 + cam_distance = 2.0 if "机械臂" in CURRENT_CONFIG["name"] else 3.0 + viewer_instance.cam.distance = cam_distance + viewer_instance.cam.azimuth = 90 + viewer_instance.cam.elevation = -20 + viewer_instance.cam.lookat = [0.0, 0.0, 0.5] + + while SIMULATION_RUNNING and viewer_instance.is_running(): + if SIMULATION_PAUSE: + time.sleep(0.1) + continue + + loop_start = time.time() + + # 执行PD控制 + pd_controller(model, data) + + # 步进仿真 + mujoco.mj_step(model, data) + viewer_instance.sync() + + # 大幅降低打印频率 + if step_counter % 500 == 0: + print_simulation_status(data, step_counter) + step_counter += 1 + + # 控制仿真频率 + loop_duration = time.time() - loop_start + if loop_duration < sim_interval: + time.sleep(sim_interval - loop_duration) + +def cmd_listener_main(): + """终端指令监听(主线程)""" + global TARGET_JOINT_POS, SIMULATION_PAUSE, SIMULATION_RUNNING + joint_num = CURRENT_CONFIG["joint_num"] + preset_keys = list(CURRENT_CONFIG["presets"].keys()) + + + model_path = CURRENT_CONFIG["path"] + model, data = load_mujoco_model(model_path) + if model and data: + mujoco.mj_resetDataKeyframe(model, data, 0) + logger.info(f"\n✅ 成功加载模型:{CURRENT_CONFIG['name']}") + logger.info(f"📂 模型路径:{model_path}") + logger.info(f"🔧 控制关节数:{CURRENT_CONFIG['joint_num']} | PD参数:KP={KP}, KD={KD}") + return model, data + +def pd_controller(model: mujoco.MjModel, data: mujoco.MjData): + """PD控制器""" + joint_num = CURRENT_CONFIG["joint_num"] + if model.nq < joint_num or model.nu < joint_num: + logger.warning(f"⚠️ 模型自由度/控制维度不足") + return + + global TARGET_JOINT_POS + current_joint_pos = data.qpos[:joint_num].copy() + current_joint_vel = data.qvel[:joint_num].copy() + + # PD控制核心计算 + pos_error = TARGET_JOINT_POS - current_joint_pos + vel_error = -current_joint_vel + joint_torque = KP * pos_error + KD * vel_error + + # 力矩限位 + if model.actuator_forcerange.size >= joint_num: + torque_limit = model.actuator_forcerange[:, 1].copy()[:joint_num] + joint_torque = np.clip(joint_torque, -torque_limit, torque_limit) + + data.ctrl[:joint_num] = joint_torque + def simulation_worker(model, data, viewer_instance): """仿真工作线程""" global SIMULATION_RUNNING, SIMULATION_PAUSE @@ -218,10 +310,17 @@ def cmd_listener_main(): logger.info(f" 2. 自定义关节:set 关节号 角度(示例:set 0 0.5,单位rad)") logger.info(f" 3. 暂停/继续仿真:pause / resume") logger.info(f" 4. 退出仿真:exit") + if ROS_AVAILABLE: + logger.info(f" 5. ROS键盘控制:在新终端运行 `python keyboard_control.py <关节数>`") logger.info("="*50 + "\n") while SIMULATION_RUNNING: try: + cmd = input("\n👉 请输入控制指令:").strip().lower() + + with CMD_LOCK: + pass + # 输入提示符单独显示,更清晰 cmd = input("\n👉 请输入控制指令:").strip().lower() @@ -270,6 +369,7 @@ def cmd_listener_main(): logger.error(f"❌ 指令解析失败:{str(e)}", exc_info=True) def print_simulation_status(data: mujoco.MjData, step: int): + """打印仿真状态""" """打印仿真状态(精简输出)""" joint_num = CURRENT_CONFIG["joint_num"] current_pos = data.qpos[:joint_num].round(4) @@ -282,6 +382,62 @@ def print_simulation_status(data: mujoco.MjData, step: int): logger.info(f"📝 当前关节位置:{current_pos}") logger.info(f"📊 控制精度:平均误差={avg_error:.6f}rad | 最大误差={max_error:.6f}rad") +# ===================== ROS 功能 ===================== +def joint_control_callback(msg): + """ROS 关节控制话题回调函数""" + global TARGET_JOINT_POS + if not SIMULATION_RUNNING or SIMULATION_PAUSE: + return + + joint_num = CURRENT_CONFIG["joint_num"] + if len(msg.data) != joint_num: + logger.warning(f"ROS消息数据长度不匹配!期望 {joint_num}, 收到 {len(msg.data)}") + return + + # 将接收到的增量值加到目标关节位置上 + delta_pos = np.array(msg.data) + with CMD_LOCK: # 使用锁确保线程安全 + TARGET_JOINT_POS += delta_pos + + logger.info(f"\n🎮 接收到ROS控制指令:目标关节位置更新为 {TARGET_JOINT_POS.round(3)}") + +def ros_joint_control_subscriber(): + """ROS 订阅者节点""" + try: + rospy.init_node('mujoco_ros_controller', anonymous=True) + rospy.Subscriber('joint_position_delta', Float32MultiArray, joint_control_callback) + logger.info("✅ ROS节点已启动,正在监听 /joint_position_delta 话题...") + rospy.spin() # 保持节点运行,直到被关闭 + except rospy.ROSInterruptException: + logger.info("🛑 ROS节点被中断。") + except Exception as e: + logger.error(f"❌ ROS节点运行出错: {e}", exc_info=True) + +# ===================== 主程序逻辑 ===================== +def run_selected_model(): + """启动选中模型的可视化与控制""" + model, data = load_selected_model() + if not model or not data: + input("\n按回车键返回模型选择菜单...") + return + + global SIMULATION_PAUSE, SIMULATION_RUNNING + SIMULATION_PAUSE = False + SIMULATION_RUNNING = True + + logger.info(f"\n🖥️ 正在启动 {CURRENT_CONFIG['name']} 可视化窗口...") + logger.info("💡 提示:窗口弹出后,终端会显示输入提示符,可输入指令控制模型!") + + ros_thread = None + if ROS_AVAILABLE: + # 启动ROS订阅者线程 + ros_thread = threading.Thread(target=ros_joint_control_subscriber, daemon=True) + ros_thread.start() + time.sleep(1.0) # 等待ROS节点初始化 + + try: + with viewer.launch_passive(model, data) as viewer_instance: + def run_selected_model(): """启动选中模型的可视化与控制(精简版)""" # 加载模型 @@ -308,6 +464,8 @@ def run_selected_model(): ) sim_thread.start() + cmd_listener_main() + # 主线程运行指令监听 cmd_listener_main() @@ -320,6 +478,10 @@ def run_selected_model(): logger.error(f"\n❌ 可视化出错:{str(e)}", exc_info=True) SIMULATION_RUNNING = False + if ros_thread and ros_thread.is_alive(): + rospy.signal_shutdown("仿真结束,关闭ROS节点。") + ros_thread.join(timeout=2) + input("\n按回车键返回模型选择菜单...") def show_menu(): @@ -352,6 +514,7 @@ def main_menu(): sys.exit(0) elif choice in MODEL_CONFIGS: CURRENT_CONFIG = MODEL_CONFIGS[choice] + TARGET_JOINT_POS = CURRENT_CONFIG["presets"][CURRENT_CONFIG["default_preset"]].copy() TARGET_JOINT_POS = CURRENT_CONFIG["presets"][CURRENT_CONFIG["default_preset"]] KP = CURRENT_CONFIG["pd_params"]["KP"] KD = CURRENT_CONFIG["pd_params"]["KD"] @@ -367,10 +530,12 @@ def main(): logger.info(f"📌 模型根目录:{MODEL_ROOT}") parser = argparse.ArgumentParser( + description="MuJoCo模型控制工具(支持ROS键盘控制)", description="MuJoCo模型控制工具(精简版,无数据保存)", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) subparsers = parser.add_subparsers(dest="subcommand", required=True, help="子命令列表") + menu_parser = subparsers.add_parser("menu", help="启动交互式模型选择菜单(PD控制)") # 只保留menu子命令(核心控制功能) menu_parser = subparsers.add_parser("menu", help="启动交互式模型选择菜单(PD控制)")