A modern Python reference application demonstrating how to control a Universal Robot using three concurrent interfaces: RTDE for state monitoring, Interpreter Mode for remote script execution, and the Robot API for robot control.
- Python 3.10+
- Universal Robots controller running PolyScope X
- Robot in Remote Control mode (for Robot API commands)
- Network connectivity to the robot controller
- Clone the repository:
git clone https://github.com/yourusername/px_remote_operation.git
cd px_remote_operation- Create and activate a virtual environment:
python -m venv .venv
# Windows
.venv\Scripts\activate
# Linux/Mac
source .venv/bin/activate- Install dependencies:
pip install httpx # For REST API clientRun the application with the default scenario:
python main.py --host 192.168.0.1Options:
--host: Robot IP address (default:127.0.0.1)--port: RTDE port (default:30004)--config: Path to RTDE config file (default:rtde/rtdeState.xml)
Edit main.py line 14-16 to select your scenario:
# ===== SWAP SCENARIO HERE =====
import scenarios.simple_move as scenario
# import scenarios.protective_stop_test as scenario
# ==============================Available scenarios:
simple_move: Execute a single URScript file via Interpreter Modeprotective_stop_test: Continuous execution with protective stop detection and recovery
This application demonstrates how to use all three UR robot interfaces concurrently:
- Subscribes to robot state data at 125 Hz
- Provides pub/sub pattern for multiple consumers
- Dynamic field configuration via XML
- Fields:
robot_mode,runtime_state,safety_status, joint positions, TCP pose, etc.
- RESTful HTTP client for PolyScope X Robot API
- Commands: power on/off, brake release, play/pause/stop, unlock protective stop
- Async context manager for clean resource handling
- Remote URScript execution via TCP socket (port 30020)
- Background task for receiving acks and state responses
- Execution progress tracking (
statelastexecuted) - Control commands:
skipbuffer,abort,clear_interpreter()
px_remote_operation/
├── main.py # Application entry point
├── README.md # This file
├── .gitignore # Git ignore rules
│
├── interpreter/ # Interpreter Mode client
│ ├── __init__.py
│ ├── interpreter_client.py # Async TCP client for port 30020
│ ├── interpreter.py # Base helper for Interpreter mode
│ └── utils.py # Utility functions
│
├── robot_api/ # REST API client
│ ├── __init__.py
│ └── api_client.py # HTTP client for Robot API
│
├── rtde/ # RTDE (Real-Time Data Exchange)
│ ├── __init__.py
│ ├── rtde_hub.py # High-level async hub with pub/sub
│ ├── rtde.py # Low-level RTDE protocol implementation
│ ├── rtde_config.py # XML config parser
│ ├── rtdeState.xml # RTDE output field configuration
│ └── serialize.py # Binary serialization for RTDE protocol
│
├── scenarios/ # Scenario modules
│ ├── __init__.py
│ ├── simple_move.py # Basic movement scenario
│ └── protective_stop_test.py # Protective stop handling
│
└── urscript/ # URScript files
├── move_close.txt # Example move nodes.
├── move_close_frame.txt # Example move nodes plus sending a socket string to a server.
├── move_far.txt # Example move nodes.
└── move_home.txt # Home position script
Scenarios are self-contained modules that define robot behaviors. Create a new file in scenarios/:
# scenarios/my_custom_scenario.py
"""My custom robot behavior."""
import asyncio
from rtde import RTDEHub
from robot_api import RobotAPIClient
from interpreter.interpreter_client import InterpreterClient
async def run(hub: RTDEHub, api: RobotAPIClient, host: str) -> None:
"""Execute custom behavior.
Args:
hub: RTDE hub for monitoring robot state
api: Robot API client for control commands
host: Robot hostname/IP for interpreter connection
"""
# Wait for robot to be ready
q = hub.subscribe()
sample = await q.get()
while sample.robot_mode != 7: # Wait for Running mode
sample = await q.get()
# Your logic here - combine RTDE monitoring, API control, and interpreter execution
async with InterpreterClient(host, 30020) as intr:
await intr.send_file("urscript/my_script.txt")
await asyncio.sleep(1.0)
print("[Scenario] Custom behavior complete!")Then update main.py:
import scenarios.my_custom_scenario as scenarioExecute a predefined URScript file and monitor execution:
# Edit main.py to use simple_move scenario
python main.py --host 192.168.0.1Continuously execute moves until protective stop, then recover:
# Edit main.py to use protective_stop_test scenario
python main.py --host 192.168.0.1The robot will:
- Loop
move_close.txtcontinuously - Monitor
safety_statusevery 200ms - When protective stop detected:
- Pause execution
- Prompt user for confirmation
- Unlock protective stop via REST API
- Clear interpreter buffer
- Send robot to home position
Opens a socket to a remote server and sends the value of a frame called "my_frame" to the server. Assumes that you already have the frame "my_frame" defined in your Application settings. Modify the IP address in urscript/move_close_frame.txt to match your server.
# Edit main.py to use move_and_send_frame scenario
python main.py --host 192.168.0.1Edit rtde/rtdeState.xml to customize which data fields are monitored:
<?xml version="1.0"?>
<rtde_config>
<recipe key="state">
<field name="actual_TCP_pose" type="VECTOR6D"/>
<field name="actual_q" type="VECTOR6D"/>
<field name="robot_mode" type="INT32"/>
<field name="runtime_state" type="UINT32"/>
<field name="safety_status" type="INT32"/>
<!-- Add more fields as needed -->
</recipe>
</rtde_config>Step 1: Add field to rtdeState.xml:
<field name="actual_TCP_force" type="VECTOR6D"/>Step 2: Use it immediately in your code:
q = hub.subscribe()
sample = await q.get()
# The new field is automatically available
tcp_force = sample.actual_TCP_force # or sample.get("actual_TCP_force")
print(f"TCP Force: {tcp_force}")No changes to RTDEHub or any other Python code needed. The dynamic field system handles everything automatically.
The application logs three key robot states:
Program execution state:
0: Stopping1: Stopped2: Playing3: Pausing4: Paused5: Resuming6: Retracting
Overall robot operational mode:
3: Power Off5: Idle7: Running- (See
rtde/rtde_hub.pyfor complete list)
Safety system state:
1: NORMAL2: REDUCED3: PROTECTIVE_STOP5: SAFEGUARD_STOP6: SYSTEM_EMERGENCY_STOP7: ROBOT_EMERGENCY_STOP- (See
rtde/rtde_hub.pyfor complete list)
All clients use async with for automatic resource cleanup:
async with InterpreterClient(host, 30020) as intr:
await intr.send_file("script.txt")
# Connection automatically closedMultiple consumers can subscribe to the same RTDE stream:
q1 = hub.subscribe() # Main logic
q2 = hub.subscribe() # State watcher
q3 = hub.subscribe() # Safety monitorLong-running monitoring happens in background:
rtde_task = asyncio.create_task(hub.run(host, port))
watcher_task = asyncio.create_task(watch_states(hub))
# Runs concurrently with main scenario