diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3955c1e..4388672 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,10 @@ default_install_hook_types: [pre-commit] default_stages: [pre-commit] +default_language_version: + python: python3 + node: "20.18.0" # cspell requires >=20.18; full version needed for nodeenv prebuilt download + repos: - repo: https://github.com/pycqa/isort rev: 7.0.0 diff --git a/examples/2_collect_teleop_data_with_neuracore.py b/examples/2_collect_teleop_data_with_neuracore.py index d8972fb..d7143a1 100644 --- a/examples/2_collect_teleop_data_with_neuracore.py +++ b/examples/2_collect_teleop_data_with_neuracore.py @@ -12,9 +12,7 @@ import threading import time import traceback -from multiprocessing import Process, Queue from pathlib import Path -from queue import Empty import neuracore as nc import numpy as np @@ -55,134 +53,39 @@ from pink_ik_solver import PinkIKSolver from piper_controller import PiperController -NUM_LOGGING_WORKERS = 1 # Number of worker processes for logging -PROCESSES_LOGGING_FREQUENCY = 1000 # Hz - -def neuracore_logging_worker(queue: Queue, worker_id: int) -> None: - """Worker process that logs data to Neuracore from the queue. - - Args: - queue: Multiprocessing queue containing logging tasks - worker_id: Worker process identifier - """ - # Initialize Neuracore connection in this worker process +def log_to_neuracore_on_change_callback( + name: str, value: float, timestamp: float +) -> None: + """Log data to queue on change callback.""" + # Call appropriate Neuracore logging function try: - nc.login() - nc.connect_robot( - robot_name="AgileX PiPER", - urdf_path=str(URDF_PATH), - instance=worker_id, - overwrite=False, - ) + if name == "log_joint_positions": + data_value = np.radians(value) + data_dict = { + joint_name: angle for joint_name, angle in zip(JOINT_NAMES, data_value) + } + nc.log_joint_positions(data_dict, timestamp=timestamp) + elif name == "log_joint_target_positions": + data_value = np.radians(value) + data_dict = { + joint_name: angle for joint_name, angle in zip(JOINT_NAMES, data_value) + } + nc.log_joint_target_positions(data_dict, timestamp=timestamp) + elif name == "log_parallel_gripper_open_amounts": + data_dict = {GRIPPER_LOGGING_NAME: value} + nc.log_parallel_gripper_open_amounts(data_dict, timestamp=timestamp) + elif name == "log_parallel_gripper_target_open_amounts": + data_dict = {GRIPPER_LOGGING_NAME: value} + nc.log_parallel_gripper_target_open_amounts(data_dict, timestamp=timestamp) + elif name == "log_rgb": + camera_name = "rgb" + image_array = value + nc.log_rgb(camera_name, image_array, timestamp=timestamp) + else: + print(f"\n⚠️ Unknown logging function: {name}") except Exception as e: - print(f"\n⚠️ Worker {worker_id} failed to login to Neuracore: {e}") - return - - print(f"\n👷 Logging worker process {worker_id} started") - - while True: - try: - start_time = time.time() - # Get task from queue (blocking with timeout to allow periodic checks) - task = queue.get(timeout=1.0) - - # Check for shutdown - if task is None: - print( - f"\n👷 Logging worker process {worker_id} received shutdown signal" - ) - break - - # Unpack task: (function_name, args_tuple, timestamp) - function_name, data_value, timestamp = task - - # Call appropriate Neuracore logging function - try: - if function_name == "log_joint_positions": - data_value = np.radians(data_value) - data_dict = { - joint_name: angle - for joint_name, angle in zip(JOINT_NAMES, data_value) - } - nc.log_joint_positions(data_dict, timestamp=timestamp) - elif function_name == "log_joint_target_positions": - data_value = np.radians(data_value) - data_dict = { - joint_name: angle - for joint_name, angle in zip(JOINT_NAMES, data_value) - } - nc.log_joint_target_positions(data_dict, timestamp=timestamp) - elif function_name == "log_parallel_gripper_open_amounts": - data_dict = {GRIPPER_LOGGING_NAME: data_value} - nc.log_parallel_gripper_open_amounts(data_dict, timestamp=timestamp) - elif function_name == "log_parallel_gripper_target_open_amounts": - data_dict = {GRIPPER_LOGGING_NAME: data_value} - nc.log_parallel_gripper_target_open_amounts( - data_dict, timestamp=timestamp - ) - elif function_name == "log_rgb": - camera_name = "rgb" - image_array = data_value - nc.log_rgb(camera_name, image_array, timestamp=timestamp) - else: - print(f"\n⚠️ Unknown logging function: {function_name}") - except Exception as e: - print(f"\n⚠️ Failed to log {function_name} to Neuracore: {e}") - - # Sleep to maintain loop rate - elapsed = time.time() - start_time - sleep_time = 1.0 / PROCESSES_LOGGING_FREQUENCY - elapsed - time.sleep(max(0, sleep_time)) - - except Empty: - continue # Timeout - except KeyboardInterrupt: - print( - f"\n🔴 Logging worker process {worker_id} interrupted, shutting down!" - ) - break - except Exception as e: - print(f"\n❌ Logging worker {worker_id} error: {e}") - - -def shutdown_logging_workers(logging_queue: Queue, logging_workers: list) -> None: - """Gracefully shut down logging worker processes and their queue.""" - print("\n📝 Shutting down logging workers...") - if logging_queue is not None and logging_workers: - # Send shutdown sentinel to each worker - for _ in logging_workers: - logging_queue.put(None, timeout=1.0) - # Wait for workers to finish - for i, worker in enumerate(logging_workers, 1): - worker.join(timeout=2.0) - if worker.is_alive(): - worker.terminate() - worker.join(timeout=1.0) - print(f" ✓ Worker {i} shut down") - # Close the queue - logging_queue.close() - - # Join thread in background to prevent atexit hang - def join_queue() -> None: - """Join queue thread.""" - try: - logging_queue.join_thread() - except Exception: - pass - - threading.Thread(target=join_queue, daemon=True).start() - - -def log_to_queue_on_change_callback(name: str, value: float, timestamp: float) -> None: - """Log data to queue on change callback.""" - logging_queue.put( - ( - name, - value, - timestamp, - ) - ) + print(f"\n⚠️ Failed to call {name} to Neuracore: {e}") def on_button_a_pressed() -> None: @@ -288,21 +191,9 @@ def on_button_rj_pressed() -> None: description="Teleop data collection for Piper robot", ) - # Initialize logging queue and worker pool - print("\n📝 Initializing Neuracore logging queue and worker pool...") - logging_queue: Queue = Queue() - logging_workers = [] - - # Start worker processes - for i in range(NUM_LOGGING_WORKERS): - worker = Process(target=neuracore_logging_worker, args=(logging_queue, i)) - worker.start() - logging_workers.append(worker) - time.sleep(0.1) - # Initialize shared state data_manager = DataManager() - data_manager.set_on_change_callback(log_to_queue_on_change_callback) + data_manager.set_on_change_callback(log_to_neuracore_on_change_callback) data_manager.set_controller_filter_params( CONTROLLER_MIN_CUTOFF, CONTROLLER_BETA, @@ -434,6 +325,4 @@ def on_button_rj_pressed() -> None: camera_thread_obj.join() robot_controller.cleanup() - shutdown_logging_workers(logging_queue, logging_workers) - print("\n👋 Demo stopped.") diff --git a/examples/common/configs.py b/examples/common/configs.py index 371127c..d2ac818 100644 --- a/examples/common/configs.py +++ b/examples/common/configs.py @@ -35,7 +35,7 @@ # Thread rates (Hz) CONTROLLER_DATA_RATE = 50.0 # Controller input reading -IK_SOLVER_RATE = 500.0 # IK solving and robot commands +IK_SOLVER_RATE = 250.0 # IK solving and robot commands VISUALIZATION_RATE = 60.0 # GUI updates ROBOT_RATE = 100.0