diff --git a/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py b/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py new file mode 100644 index 0000000000..b82a5de541 --- /dev/null +++ b/src/LocoMuJoCo/ros2/ros2_py/robot_controller.py @@ -0,0 +1,95 @@ +import sys +import select +import rclpy +from rclpy.node import Node +from my_interfaces.srv import RobotCtrl + +# 全局变量(提前初始化,避免未绑定) +current_action_key = 'stop' +action_switch_flag = False + + +class RobotController(Node): + def __init__(self, name): + super().__init__(name) + # 创建服务客户端,连接机器人控制服务 + self.client = self.create_client(RobotCtrl, 'robot_control') + # 等待服务上线 + while not self.client.wait_for_service(timeout_sec=1.0): + self.get_logger().info('等待服务 "robot_control" 启动...') + self.get_logger().info("操作提示:w=行走 | s=下蹲 | q=退出 | 其他键=停止") + + def result_callback(self, future): + """服务调用结果的回调函数""" + + response = future.result() + self.get_logger().info(f'指令执行成功:{response.message}') + + + def send_request(self, action): + """发送控制指令到服务端""" + + request = RobotCtrl.Request() + request.command = action # 传递动作指令 + self.client.call_async(request).add_done_callback(self.result_callback) + self.get_logger().info(f'已发送指令:{action}') + + + +def read_keypress_non_blocking(): + """非阻塞读取终端按键""" + if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): + key = sys.stdin.read(1) + return key.strip() + return None + + +def main(args=None): + # 关键:声明使用全局变量(修复核心错误) + global current_action_key, action_switch_flag + + rclpy.init(args=args) + robot_controller = RobotController("robot_controller") + + # 初始发送停止指令(此时current_action_key已绑定全局值'stop') + robot_controller.send_request(current_action_key) + + try: + # 主循环:非阻塞读取按键 + 处理ROS事件 + while rclpy.ok(): + # 1. 读取终端按键 + key = read_keypress_non_blocking() + if key: + # 按键逻辑 + if key == 'w' and current_action_key != "walk": + current_action_key = "walk" + action_switch_flag = True + elif key == 's' and current_action_key != "squat": + current_action_key = "squat" + action_switch_flag = True + elif key == 'q': # q键退出 + robot_controller.get_logger().info("收到退出指令,程序终止") + break + elif key not in ['w', 's']: # 其他键停止 + if current_action_key != "stop": + current_action_key = "stop" + action_switch_flag = True + + # 2. 检测动作切换并发送指令 + if action_switch_flag: + robot_controller.send_request(current_action_key) + action_switch_flag = False # 重置标记 + + # 3. 非阻塞处理ROS回调 + rclpy.spin_once(robot_controller, timeout_sec=0.01) + + except KeyboardInterrupt: + robot_controller.get_logger().info("用户强制退出") + finally: + robot_controller.destroy_node() + rclpy.shutdown() + print("程序已退出") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/LocoMuJoCo/test.py b/src/LocoMuJoCo/test.py new file mode 100644 index 0000000000..57d78354f5 --- /dev/null +++ b/src/LocoMuJoCo/test.py @@ -0,0 +1,66 @@ +import numpy as np +import mujoco +from mujoco import MjSpec +from loco_mujoco import ImitationFactory +from loco_mujoco import PATH_TO_MODELS +import time + +def main(): + # 配置环境参数 + env_name = "UnitreeH1" + task = "walk" + n_episodes = 10 + n_steps_per_episode = 500 + + # 创建自定义模型规格,添加几何体 + def create_custom_spec(): + # 加载UnitreeH1的默认模型 + h1_model_path = PATH_TO_MODELS / "unitree_h1/h1.xml" + spec = MjSpec.from_file(str(h1_model_path)) + + # 打印所有身体名称,确认地板身体的正确名称 + print("模型中所有身体名称:") + for body in spec.bodies: + print(body.name) + + # 尝试查找地板身体(根据打印结果,模型中没有floor/ground,使用world作为父节点) + floor_body = spec.find_body("floor") + if floor_body is None: + floor_body = spec.find_body("ground") + if floor_body is None: + # 若仍找不到,使用根节点"world"作为父节点(模型中存在"world"身体) + floor_body = spec.find_body("world") + print("使用根节点'world'作为几何体的父节点") + + # 定义圆柱体几何体属性(size改为3个元素,符合Mujoco要求) + cylinder_attr = dict( + type=mujoco.mjtGeom.mjGEOM_CYLINDER, + size=[0.3, 1, 0.0], # 修正:[半径, 半长度, 0](必须3个元素) + pos=[8, 0, 1.0], # 位置(相对于父节点world) + rgba=[1.0, 0.0, 0.0, 0.5], # 红色半透明 + contype=1, + conaffinity=1 + ) + + # 向父节点添加几何体 + floor_body.add_geom(name="custom_cylinder", **cylinder_attr) + + return spec + + # 创建带自定义几何体的环境 + custom_spec = create_custom_spec() + env = ImitationFactory.make( + env_name, + default_dataset_conf=dict(task=task), + spec=custom_spec # 传入自定义规格 + ) + + # 播放轨迹并渲染 + env.play_trajectory( + n_episodes=n_episodes, + n_steps_per_episode=n_steps_per_episode, + render=True + ) + +if __name__ == "__main__": + main() \ No newline at end of file