Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
default_install_hook_types: [pre-commit]
default_stages: [pre-commit]

default_language_version:
python: python3
node: "20.18.0" # cspell requires >=20.18; full version needed for nodeenv prebuilt download

repos:
- repo: https://github.com/pycqa/isort
rev: 7.0.0
Expand Down
173 changes: 31 additions & 142 deletions examples/2_collect_teleop_data_with_neuracore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import threading
import time
import traceback
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty

import neuracore as nc
import numpy as np
Expand Down Expand Up @@ -55,134 +53,39 @@
from pink_ik_solver import PinkIKSolver
from piper_controller import PiperController

NUM_LOGGING_WORKERS = 1 # Number of worker processes for logging
PROCESSES_LOGGING_FREQUENCY = 1000 # Hz


def neuracore_logging_worker(queue: Queue, worker_id: int) -> None:
"""Worker process that logs data to Neuracore from the queue.

Args:
queue: Multiprocessing queue containing logging tasks
worker_id: Worker process identifier
"""
# Initialize Neuracore connection in this worker process
def log_to_neuracore_on_change_callback(
name: str, value: float, timestamp: float
) -> None:
"""Log data to queue on change callback."""
# Call appropriate Neuracore logging function
try:
nc.login()
nc.connect_robot(
robot_name="AgileX PiPER",
urdf_path=str(URDF_PATH),
instance=worker_id,
overwrite=False,
)
if name == "log_joint_positions":
data_value = np.radians(value)
data_dict = {
joint_name: angle for joint_name, angle in zip(JOINT_NAMES, data_value)
}
nc.log_joint_positions(data_dict, timestamp=timestamp)
elif name == "log_joint_target_positions":
data_value = np.radians(value)
data_dict = {
joint_name: angle for joint_name, angle in zip(JOINT_NAMES, data_value)
}
nc.log_joint_target_positions(data_dict, timestamp=timestamp)
elif name == "log_parallel_gripper_open_amounts":
data_dict = {GRIPPER_LOGGING_NAME: value}
nc.log_parallel_gripper_open_amounts(data_dict, timestamp=timestamp)
elif name == "log_parallel_gripper_target_open_amounts":
data_dict = {GRIPPER_LOGGING_NAME: value}
nc.log_parallel_gripper_target_open_amounts(data_dict, timestamp=timestamp)
elif name == "log_rgb":
camera_name = "rgb"
image_array = value
nc.log_rgb(camera_name, image_array, timestamp=timestamp)
else:
print(f"\n⚠️ Unknown logging function: {name}")
except Exception as e:
print(f"\n⚠️ Worker {worker_id} failed to login to Neuracore: {e}")
return

print(f"\n👷 Logging worker process {worker_id} started")

while True:
try:
start_time = time.time()
# Get task from queue (blocking with timeout to allow periodic checks)
task = queue.get(timeout=1.0)

# Check for shutdown
if task is None:
print(
f"\n👷 Logging worker process {worker_id} received shutdown signal"
)
break

# Unpack task: (function_name, args_tuple, timestamp)
function_name, data_value, timestamp = task

# Call appropriate Neuracore logging function
try:
if function_name == "log_joint_positions":
data_value = np.radians(data_value)
data_dict = {
joint_name: angle
for joint_name, angle in zip(JOINT_NAMES, data_value)
}
nc.log_joint_positions(data_dict, timestamp=timestamp)
elif function_name == "log_joint_target_positions":
data_value = np.radians(data_value)
data_dict = {
joint_name: angle
for joint_name, angle in zip(JOINT_NAMES, data_value)
}
nc.log_joint_target_positions(data_dict, timestamp=timestamp)
elif function_name == "log_parallel_gripper_open_amounts":
data_dict = {GRIPPER_LOGGING_NAME: data_value}
nc.log_parallel_gripper_open_amounts(data_dict, timestamp=timestamp)
elif function_name == "log_parallel_gripper_target_open_amounts":
data_dict = {GRIPPER_LOGGING_NAME: data_value}
nc.log_parallel_gripper_target_open_amounts(
data_dict, timestamp=timestamp
)
elif function_name == "log_rgb":
camera_name = "rgb"
image_array = data_value
nc.log_rgb(camera_name, image_array, timestamp=timestamp)
else:
print(f"\n⚠️ Unknown logging function: {function_name}")
except Exception as e:
print(f"\n⚠️ Failed to log {function_name} to Neuracore: {e}")

# Sleep to maintain loop rate
elapsed = time.time() - start_time
sleep_time = 1.0 / PROCESSES_LOGGING_FREQUENCY - elapsed
time.sleep(max(0, sleep_time))

except Empty:
continue # Timeout
except KeyboardInterrupt:
print(
f"\n🔴 Logging worker process {worker_id} interrupted, shutting down!"
)
break
except Exception as e:
print(f"\n❌ Logging worker {worker_id} error: {e}")


def shutdown_logging_workers(logging_queue: Queue, logging_workers: list) -> None:
"""Gracefully shut down logging worker processes and their queue."""
print("\n📝 Shutting down logging workers...")
if logging_queue is not None and logging_workers:
# Send shutdown sentinel to each worker
for _ in logging_workers:
logging_queue.put(None, timeout=1.0)
# Wait for workers to finish
for i, worker in enumerate(logging_workers, 1):
worker.join(timeout=2.0)
if worker.is_alive():
worker.terminate()
worker.join(timeout=1.0)
print(f" ✓ Worker {i} shut down")
# Close the queue
logging_queue.close()

# Join thread in background to prevent atexit hang
def join_queue() -> None:
"""Join queue thread."""
try:
logging_queue.join_thread()
except Exception:
pass

threading.Thread(target=join_queue, daemon=True).start()


def log_to_queue_on_change_callback(name: str, value: float, timestamp: float) -> None:
"""Log data to queue on change callback."""
logging_queue.put(
(
name,
value,
timestamp,
)
)
print(f"\n⚠️ Failed to call {name} to Neuracore: {e}")


def on_button_a_pressed() -> None:
Expand Down Expand Up @@ -288,21 +191,9 @@ def on_button_rj_pressed() -> None:
description="Teleop data collection for Piper robot",
)

# Initialize logging queue and worker pool
print("\n📝 Initializing Neuracore logging queue and worker pool...")
logging_queue: Queue = Queue()
logging_workers = []

# Start worker processes
for i in range(NUM_LOGGING_WORKERS):
worker = Process(target=neuracore_logging_worker, args=(logging_queue, i))
worker.start()
logging_workers.append(worker)
time.sleep(0.1)

# Initialize shared state
data_manager = DataManager()
data_manager.set_on_change_callback(log_to_queue_on_change_callback)
data_manager.set_on_change_callback(log_to_neuracore_on_change_callback)
data_manager.set_controller_filter_params(
CONTROLLER_MIN_CUTOFF,
CONTROLLER_BETA,
Expand Down Expand Up @@ -434,6 +325,4 @@ def on_button_rj_pressed() -> None:
camera_thread_obj.join()
robot_controller.cleanup()

shutdown_logging_workers(logging_queue, logging_workers)

print("\n👋 Demo stopped.")
2 changes: 1 addition & 1 deletion examples/common/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

# Thread rates (Hz)
CONTROLLER_DATA_RATE = 50.0 # Controller input reading
IK_SOLVER_RATE = 500.0 # IK solving and robot commands
IK_SOLVER_RATE = 250.0 # IK solving and robot commands
VISUALIZATION_RATE = 60.0 # GUI updates
ROBOT_RATE = 100.0

Expand Down