1+ import multiprocessing as mp
2+ import time
13import typing
24from dataclasses import dataclass
3- import multiprocessing as mp
45from multiprocessing .shared_memory import SharedMemory
5- import time
6+
67import numpy as np
78import rtde_control
89import rtde_receive
9- from rcs import common
1010from rcs_ur5e import robotiq_gripper
1111
12+ from rcs import common
13+
1214
1315@dataclass (kw_only = True )
1416class UR5eConfig (common .RobotConfig ):
@@ -38,6 +40,7 @@ def __post_init__(self):
3840JOINT_MODE = 1
3941CARTESIAN_MODE = 2
4042
43+
4144def _control_robot (shm_name : str , ip : str , stop_queue : mp .Queue , config_queue : mp .Queue ) -> None :
4245 """
4346 Control loop for the robot, running in a separate process.
@@ -51,12 +54,13 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
5154 ur_receive = rtde_receive .RTDEReceiveInterface (ip )
5255
5356 if not ur_control .isConnected ():
54- raise ConnectionError (f"Could not connect to UR5e at { ip } ." )
57+ msg = f"Could not connect to UR5e at { ip } ."
58+ raise ConnectionError (msg )
5559
5660 # Attach to the shared memory segment
5761 shm = SharedMemory (name = shm_name )
5862 data_buffer = shm .buf
59-
63+
6064 # Define offsets for each field in the shared memory buffer
6165 offset_mode = 0
6266 offset_target_reached = offset_mode + 4
@@ -66,22 +70,14 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
6670 offset_cartesian_state = offset_joint_state + 48
6771
6872 # Create numpy views on the shared memory buffer
69- joint_target_view = np .ndarray (
70- (6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_target
71- )
72- cartesian_target_view = np .ndarray (
73- (6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_cartesian_target
74- )
75- joint_state_view = np .ndarray (
76- (6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_state
77- )
78- cartesian_state_view = np .ndarray (
79- (6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_cartesian_state
80- )
73+ joint_target_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_target )
74+ cartesian_target_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_cartesian_target )
75+ joint_state_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_joint_state )
76+ cartesian_state_view = np .ndarray ((6 ,), dtype = np .float64 , buffer = data_buffer , offset = offset_cartesian_state )
8177
8278 print ("Robot control process started." )
8379
84- dt = 1.0 / 500.0 # 2ms
80+ dt = 1.0 / 500.0 # 2ms
8581
8682 while stop_queue .empty ():
8783 if not config_queue .empty ():
@@ -91,11 +87,11 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
9187
9288 joint_state = np .array (ur_receive .getActualQ ())
9389 joint_state_view [:] = joint_state
94-
90+
9591 cartesian_state = np .array (ur_receive .getActualTCPPose ())
9692 cartesian_state_view [:] = cartesian_state
9793
98- if mode == JOINT_MODE :
94+ if mode == JOINT_MODE :
9995 diff = joint_target_view - joint_state_view
10096 if np .max (np .abs (diff )) < 0.01 :
10197 data_buffer [offset_target_reached ] = 1
@@ -107,7 +103,7 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
107103 target_q = joint_target_view .tolist ()
108104
109105 # current_qd = ur_receive.getActualQd()
110- # required_qd = diff / dt
106+ # required_qd = diff / dt
111107 # required_qdd = (required_qd - current_qd) / dt
112108
113109 # max_qdd = [1,1,1,1,1,1]
@@ -133,8 +129,8 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
133129 robot_config .lookahead_time ,
134130 robot_config .gain ,
135131 )
136-
137- elif mode == CARTESIAN_MODE :
132+
133+ elif mode == CARTESIAN_MODE :
138134 rotvec = common .RotVec (np .array (cartesian_target_view [3 :6 ]))
139135 # print("Target View: ", cartesian_target_view)
140136 a = common .Pose (quaternion = rotvec .as_quaternion_vector (), translation = cartesian_target_view [:3 ])
@@ -168,20 +164,21 @@ def _control_robot(shm_name: str, ip: str, stop_queue: mp.Queue, config_queue: m
168164 )
169165
170166 ur_control .waitPeriod (t_start )
171-
167+
172168 except Exception as e :
173169 print (f"Robot control process encountered an error: { e } " )
174-
170+
175171 finally :
176172 print ("Robot control process is shutting down..." )
177- if ' ur_control' in locals ():
173+ if " ur_control" in locals ():
178174 ur_control .servoStop ()
179175 ur_control .stopScript ()
180176 ur_control .disconnect ()
181- if ' shm' in locals ():
177+ if " shm" in locals ():
182178 shm .close ()
183179
184- class UR5e : # (common.Robot): # should inherit and implement common.Robot, but currently there is a bug that needs to be fixed
180+
181+ class UR5e : # (common.Robot): # should inherit and implement common.Robot, but currently there is a bug that needs to be fixed
185182 def __init__ (self , ip : str ):
186183 self .ik = common .RL (urdf_path = "/home/johannes/repos/rcs/assets/ur5e/urdf/ur5e.urdf" )
187184 self ._config = UR5eConfig ()
@@ -202,33 +199,39 @@ def __init__(self, ip: str):
202199 self ._offset_joint_state = self ._offset_cartesian_target + 48
203200 self ._offset_cartesian_state = self ._offset_joint_state + 48
204201
205- self ._joint_target_shm = np .ndarray ((6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_joint_target )
206- self ._cartesian_target_shm = np .ndarray ((6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_cartesian_target )
207- self ._joint_state_shm = np .ndarray ((6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_joint_state )
208- self ._cartesian_state_shm = np .ndarray ((6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_cartesian_state )
202+ self ._joint_target_shm = np .ndarray (
203+ (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_joint_target
204+ )
205+ self ._cartesian_target_shm = np .ndarray (
206+ (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_cartesian_target
207+ )
208+ self ._joint_state_shm = np .ndarray (
209+ (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_joint_state
210+ )
211+ self ._cartesian_state_shm = np .ndarray (
212+ (6 ,), dtype = np .float64 , buffer = self ._shm_buffer , offset = self ._offset_cartesian_state
213+ )
209214
210215 # Initialise with -10 to check
211216 self ._joint_state_shm [:] = - 10
212217
213218 # Start the robot control process
214219 self ._robot_process = mp .Process (
215- target = _control_robot ,
216- args = (SHM_NAME , self ._ip , self ._stop_queue , self ._config_queue )
220+ target = _control_robot , args = (SHM_NAME , self ._ip , self ._stop_queue , self ._config_queue )
217221 )
218- self ._robot_process .daemon = True # Kills process if main process exits
222+ self ._robot_process .daemon = True # Kills process if main process exits
219223 self ._robot_process .start ()
220224
221225 # Check for first update
222- while self ._joint_state_shm [0 ]== - 10 :
226+ while self ._joint_state_shm [0 ] == - 10 :
223227 print ("Waiting for first robot state to arrive.." )
224228 time .sleep (1 )
225229 print ("Robot Connection established." )
226230
227-
228231 def __del__ (self ):
229232 """Ensures resources are cleaned up when the object is garbage collected."""
230233 self .stop_control_process ()
231-
234+
232235 def stop_control_process (self ):
233236 """Safely stops the robot control process."""
234237 if self ._robot_process .is_alive ():
@@ -245,8 +248,7 @@ def get_cartesian_position(self) -> common.Pose:
245248 trans = ur_pose [0 :3 ]
246249 rotvec = common .RotVec (np .array (ur_pose [3 :6 ]))
247250 pose = common .Pose (quaternion = rotvec .as_quaternion_vector (), translation = trans )
248- pose = common .Pose (rpy_vector = [0 , 0 , np .deg2rad (180 )], translation = [0 , 0 , 0 ]).inverse () * pose
249- return pose
251+ return common .Pose (rpy_vector = [0 , 0 , np .deg2rad (180 )], translation = [0 , 0 , 0 ]).inverse () * pose
250252
251253 def get_ik (self ) -> common .IK | None :
252254
@@ -275,42 +277,45 @@ def set_cartesian_position(self, pose: common.Pose) -> None:
275277 self ._shm_buffer [self ._offset_target_reached ] = 0
276278 target_pose = (common .Pose (rpy_vector = [0 , 0 , np .deg2rad (180 )], translation = [0 , 0 , 0 ]) * pose ).rotvec ()
277279 self ._cartesian_target_shm [:] = target_pose
278- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (CARTESIAN_MODE ).to_bytes (4 , "little" )
280+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (CARTESIAN_MODE ).to_bytes (4 , "little" )
279281 if not self ._config .async_control :
280282 while not self ._shm_buffer [self ._offset_target_reached ]:
281283 time .sleep (0.01 )
282284
283285 def set_joint_position (self , q : np .ndarray [tuple [typing .Literal [6 ]], np .dtype [np .float64 ]]) -> None :
284286 self ._shm_buffer [self ._offset_target_reached ] = 0
285287 self ._joint_target_shm [:] = q
286- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
288+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
287289 if not self ._config .async_control :
288290 while not self ._shm_buffer [self ._offset_target_reached ]:
289291 time .sleep (0.01 )
290-
292+
291293 def move_home (self ) -> None :
292294 home = typing .cast (
293295 np .ndarray [tuple [typing .Literal [6 ]], np .dtype [np .float64 ]],
294296 common .robots_meta_config (common .RobotType .UR5e ).q_home ,
295297 )
296298 if np .any ((home < - 2 * np .pi ) | (home > 2 * np .pi )):
297- raise ValueError (f"Home position { home } is out of bounds." )
299+ msg = f"Home position { home } is out of bounds."
300+ raise ValueError (msg )
298301 print (f"Moving to home position: { home } " )
299302 self ._joint_target_shm [:] = home
300- self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
303+ self ._shm_buffer [self ._offset_mode : self ._offset_target_reached ] = (JOINT_MODE ).to_bytes (4 , "little" )
301304 while not self ._shm_buffer [self ._offset_target_reached ]:
302305 time .sleep (0.01 )
303306
304307 def reset (self ) -> None :
305308 pass
306309
310+
307311class RobotiQGripper : # (common.Gripper):
308312 def __init__ (self , ip ):
309313 self .gripper = robotiq_gripper .RobotiqGripper ()
310314 try :
311315 self .gripper .connect (ip , 63352 , socket_timeout = 3.0 ) # default port for Robotiq gripper
312316 except Exception as e :
313- raise RuntimeError (f"Failed to connect to Robotiq gripper at { ip } : { e } " )
317+ msg = f"Failed to connect to RobotiQ gripper at { ip } "
318+ raise RuntimeError (msg ) from e
314319 if not self .gripper .is_active ():
315320 self .gripper .activate ()
316321 print ("Gripper Connection established." )
0 commit comments