-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspawn_actors.py
More file actions
497 lines (396 loc) · 19.8 KB
/
spawn_actors.py
File metadata and controls
497 lines (396 loc) · 19.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
"""This script contains methods to spawn various types of actors, including
sensors and vehicles. These methods should be called before starting the
main loop of the CARLA simulator. Each method returns one or multiple actors,
which should be appended to an actor list, and despawned after the game loop is
finished."""
import logging
import numpy as np
import numpy.random as random
import os
import logging
import time
import shutil
import carla
from utils.tum_file_parser import tum_append_right_handed_carla_transform
from utils.misc import clear_directory
import config.global_config_parser as global_config_parser
# Make sure to initialize the global configuration parser before importing this
# script, otherwise the following line will throw an error
# This should be done with
# `global_config_parser.init_config("<path/to/config.ini>")`
# in the calling script
global_config = global_config_parser.get_config()
def valid_spawn_transform(world, transform, lane_type=carla.LaneType.Any):
"""
Projects the given transform to the nearest valid spawn point on the road
Note that the following values are set to fixed standard values for valid spawns:
- Z coordinate: 0.6
- pitch: 0.0
- roll: 0.0
X and Y are taken from the given transform, but projected to specified lane type.
Yaw is taken as-is from the given transform (free choice of direction).
Args:
world (carla.World): The CARLA world object
lane_type (carla.LaneType): The type of lane to project to. Available lane types: https://carla.readthedocs.io/en/latest/python_api/#carla.LaneType
transform (carla.Transform): The transform to project
Returns:
carla.Transform: A valid spawn point transform projected to the road
"""
# map-specific z coordinate that avoids collision (with ground, for example) at spawn
fixed_z = 0.6
fixed_pitch = 0.0
fixed_roll = 0.0
waypoint = world.get_map().get_waypoint(transform.location, project_to_road=True, lane_type=lane_type)
if waypoint == None:
raise RuntimeError(f'no valid spawn position found near given coordinates with LaneType={lane_type}')
spawn_point = carla.Transform(
carla.Location(x=waypoint.transform.location.x,
y=waypoint.transform.location.y,
z=fixed_z),
carla.Rotation(pitch=fixed_pitch,
roll=fixed_roll,
yaw=transform.rotation.yaw)
)
return spawn_point
def _try_get_spawn_points(world, number=1, seed=None):
"""
Get a list of spawn points from the CARLA world.
If the number of requested spawn points is less than the available spawn points, a warning is logged and all available spawn points are returned. If no spawn points are available, a critical error is logged and an empty list is returned.
Args:
world (carla.World): The CARLA world object
number (int): Number of spawn points to return
seed (int, optional): Random seed for reproducibility
Returns:
list: A list of spawn points (carla.Transform objects)
"""
if seed is not None:
random.seed(seed)
spawn_points = world.get_map().get_spawn_points()
if not spawn_points:
logging.critical("No spawn points available in the map.")
return []
number_of_spawn_points = len(spawn_points)
# If we have enough spawn points, randomly select the requested number
# Essentially shuffles points if n_points is equal to number_of_spawn_points
if number <= number_of_spawn_points:
# Use replace=False to ensure unique spawn points
spawn_points = random.choice(
spawn_points, size=number, replace=False).tolist()
# If we have fewer spawn points than requested, return all available spawn
# points and log a warning
elif number > number_of_spawn_points:
logging.warning(
f"Requested {number} spawn points, but only {number_of_spawn_points} are available. Returning all available spawn points."
)
return spawn_points
def _try_get_random_vehicles(world, number=1, filter="vehicle.*.*", type="car", seed=None):
"""
Get a list of random vehicles from the CARLA world.
If the number of requested vehicles is less than the available blueprints, a warning is logged and all available blueprints are returned. If no blueprints are available, a critical error is logged and an empty list is returned.
Args:
world (carla.World): The CARLA world object
number (int): Number of vehicles to return
filter (str): Blueprint filter for vehicles
type (str): Type of vehicle to spawn (e.g., "car")
seed (int, optional): Random seed for reproducibility
Returns:
list: A list of vehicle blueprints (carla.Blueprint objects)
"""
if seed is not None:
random.seed(seed)
bp_library = world.get_blueprint_library()
vehicle_bps = bp_library.filter(filter)
if type:
vehicle_bps = [i for i in vehicle_bps if i.get_attribute("base_type") == type]
if not vehicle_bps:
logging.critical(f"No valid blueprints found with filter={filter} and type={type}")
return []
logging.debug(f"Returning {number} random vehicles from {len(vehicle_bps)} blueprints matching filter '{filter}' and type '{type}'.")
# Duplicates are OK
return random.choice(vehicle_bps, size=number).tolist()
def _try_spawn_vehicle(world, vehicle_bp, transform, traffic_manager, project_to_road=False, lane_type=carla.LaneType.Any, autopilot=True):
"""
Attempts to spawn a vehicle in the CARLA world at the specified transform.
If the vehicle cannot be spawned, a critical error is logged and None is returned.
Args:
world (carla.World): The CARLA world object
vehicle_bp (carla.Blueprint): The vehicle blueprint to spawn
transform (carla.Transform): The transform to spawn the vehicle at
traffic_manager (carla.TrafficManager): The traffic manager object
project_to_road (bool): Whether to project the spawn point to the road
lane_type (carla.LaneType): The type of lane to project to (if project_to_road is True)
Returns:
carla.Actor: The spawned vehicle actor, or None if spawning failed.
"""
if project_to_road:
# Project the transform to the nearest valid spawn point on the road
transform = valid_spawn_transform(world, transform, lane_type)
vehicle = world.try_spawn_actor(vehicle_bp, transform)
if vehicle is None:
logging.error(f"Vehicle {vehicle_bp.id} could not be spawned at location {transform}")
return None
if autopilot:
# Set to automatic control
vehicle.set_autopilot(True, traffic_manager.get_port())
traffic_manager.update_vehicle_lights(vehicle, True)
logging.debug(f"Vehicle {vehicle_bp.id} was spawned at {transform.location} and {transform.rotation}")
return vehicle
def _save_right_handed_ply(lidar_data, filename):
"""
Save the LiDAR data to a PLY file in right-handed coordinate system.
:param lidar_data: LiDAR data to save
:param filename: Name of the file to save the data
"""
# Convert the point cloud to a numpy array
points = np.frombuffer(lidar_data.raw_data, dtype=np.float32).copy()
points = points.reshape((-1, 4))
# Flip Y-axis to convert to right-handed coordinate system
points[:, 1] *= -1
# Save to ASCII .ply file
with open(filename, 'w') as f:
f.write('ply\n')
f.write('format ascii 1.0\n')
f.write(f'element vertex {points.shape[0]}\n')
f.write('property float32 x\n')
f.write('property float32 y\n')
f.write('property float32 z\n')
f.write('property float32 I\n')
f.write('end_header\n')
np.savetxt(f, points, fmt='%.4f %.4f %.4f %.4f', delimiter=' ')
def create_lidar_callback(data_dir):
"""
Creates a LiDAR callback function that saves point cloud data and corresponding poses
to disk, maintaining a separate frame counter per callback instance.
Args:
data_dir (str): Path to the directory where LiDAR frames and pose files will be saved.
Returns:
function: A callback function that takes a `point_cloud` object as input and
performs the following:
- Increments and tracks its own frame count.
- Saves the point cloud to a .ply file.
- Appends the vehicle pose to a TUM-format pose file.
"""
# Use simulation time added to base time. If we just use time.time() within
# the callback, the timestamps will not be consistent across different
# sensors because simulation time is not synchronized with real time.
start_time = time.time()
frame_counter = 0
ply_dir = os.path.join(data_dir, 'frames')
os.makedirs(ply_dir, exist_ok=True)
tum_file = os.path.join(data_dir, f'ground_truth_poses_tum.txt')
def lidar_callback(point_cloud):
"""
Callback function for processing and saving LiDAR point cloud data.
Args:
point_cloud: An object representing a LiDAR point cloud. Expected to have the following attributes:
- frame_number (int): Identifier for the frame.
- timestamp (float): Timestamp of the frame.
- transform (matrix or pose object): The vehicle's pose at this timestamp.
Side Effects:
- Writes a PLY file for the point cloud to `<data_dir>/frames/<frame_number>.ply`.
- Appends a line to `<data_dir>/ground_truth_poses_tum.txt` in TUM pose format.
"""
nonlocal frame_counter
nonlocal ply_dir
nonlocal tum_file
frame_number = f"{frame_counter:06d}"
logging.debug(
f"Saving LiDAR data for frame: {frame_number}, point count: {(point_cloud)}"
)
# Format with zero-padded frame number
ply_path = os.path.join(ply_dir, f'{frame_number}.ply')
# Save the point cloud to a file in PLY format
_save_right_handed_ply(point_cloud, ply_path)
# Number of seconds since Unix epoch, according to TUM format
timestamp = start_time + point_cloud.timestamp
# Save the vehicle pose to a file in TUM format
tum_append_right_handed_carla_transform(tum_file, point_cloud.transform,
timestamp)
# increment frame number
frame_counter += 1
return lidar_callback
def create_camera_callback(data_dir):
"""
Creates a camera callback function that saves camera images and corresponding poses.
Args:
data_dir (str): Path to the directory where camera frames and pose files will be saved.
Returns:
function: A callback function that takes a `camera_image` object as input and
performs the following:
- Saves the camera image to a file.
- Appends the vehicle pose to a TUM-format pose file.
"""
# Use simulation time added to base time. If we just use time.time() within
# the callback, the timestamps will not be consistent across different
# sensors because simulation time is not synchronized with real time.
start_time = time.time()
frame_counter = 0
camera_dir = os.path.join(data_dir, 'frames')
os.makedirs(camera_dir, exist_ok=True)
tum_file = os.path.join(data_dir, f'ground_truth_poses_tum.txt')
def camera_callback(camera_image):
"""
Callback function for processing and saving camera images.
Args:
camera_image: An object representing a camera image. Expected to have the following attributes:
- frame_number (int): Identifier for the frame.
- timestamp (float): Timestamp of the frame.
- transform (matrix or pose object): The vehicle's pose at this timestamp.
- raw_data (bytes): The raw image data.
Side Effects:
- Saves the camera image to `<data_dir>/frames/<frame_number>.jpg`.
- Appends a line to `<data_dir>/ground_truth_poses_tum.txt` in TUM pose format.
"""
nonlocal frame_counter
nonlocal camera_dir
nonlocal tum_file
frame_number = f"{frame_counter:06d}"
logging.debug(
f"Saving camera image for frame: {frame_number}, timestamp: {camera_image.timestamp}"
)
# Save the camera image to a file
image_path = os.path.join(camera_dir, f'{frame_number}.png')
camera_image.save_to_disk(image_path)
# Number of seconds since Unix epoch, according to TUM format
timestamp = start_time + camera_image.timestamp
# Save the vehicle pose to a file in TUM format
tum_append_right_handed_carla_transform(tum_file, camera_image.transform,
timestamp)
# increment frame number
frame_counter += 1
return camera_callback
def spawn_lidar(output_dir, world, transform, attach_to=None):
lidar_bp = world.get_blueprint_library().find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels',
str(global_config.carla_lidar.channels))
lidar_bp.set_attribute('range',
str(global_config.carla_lidar.range))
lidar_bp.set_attribute('points_per_second',
str(global_config.carla_lidar.points_per_second))
lidar_bp.set_attribute('rotation_frequency',
str(global_config.carla_lidar.rotation_frequency))
lidar_bp.set_attribute('upper_fov',
str(global_config.carla_lidar.upper_fov))
lidar_bp.set_attribute('lower_fov',
str(global_config.carla_lidar.lower_fov))
lidar_bp.set_attribute('horizontal_fov',
str(global_config.carla_lidar.horizontal_fov))
lidar_bp.set_attribute('atmosphere_attenuation_rate',
str(global_config.carla_lidar.atmosphere_attenuation_rate))
lidar_bp.set_attribute('dropoff_general_rate',
str(global_config.carla_lidar.dropoff_general_rate))
lidar_bp.set_attribute('dropoff_intensity_limit',
str(global_config.carla_lidar.dropoff_intensity_limit))
lidar_bp.set_attribute('dropoff_zero_intensity',
str(global_config.carla_lidar.dropoff_zero_intensity))
lidar_bp.set_attribute('sensor_tick',
str(global_config.carla_lidar.sensor_tick))
lidar_bp.set_attribute('noise_stddev',
str(global_config.carla_lidar.noise_stddev))
lidar = None
if attach_to is not None:
# Attach LiDAR to the vehicle
lidar = world.spawn_actor(lidar_bp, transform, attach_to=attach_to,
attachment_type=carla.AttachmentType.Rigid)
else:
lidar = world.spawn_actor(lidar_bp, transform)
lidar.listen(create_lidar_callback(output_dir))
return lidar
def spawn_camera(output_dir, world, transform, attach_to=None):
camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
camera_bp.set_attribute('bloom_intensity',
str(global_config.carla_camera.bloom_intensity))
camera_bp.set_attribute('fov',
str(global_config.carla_camera.fov))
camera_bp.set_attribute('fstop',
str(global_config.carla_camera.fstop))
camera_bp.set_attribute('image_size_x',
str(global_config.carla_camera.image_size_x))
camera_bp.set_attribute('image_size_y',
str(global_config.carla_camera.image_size_y))
camera_bp.set_attribute('iso',
str(global_config.carla_camera.iso))
camera_bp.set_attribute('gamma',
str(global_config.carla_camera.gamma))
camera_bp.set_attribute('lens_flare_intensity',
str(global_config.carla_camera.lens_flare_intensity))
camera_bp.set_attribute('sensor_tick',
str(global_config.carla_camera.sensor_tick))
camera_bp.set_attribute('shutter_speed',
str(global_config.carla_camera.shutter_speed))
# Attach camera to the vehicle
camera = world.spawn_actor(camera_bp, transform, attach_to=attach_to)
camera.listen(create_camera_callback(output_dir))
return camera
def spawn_vehicles(world,
traffic_manager,
transforms=None,
filter="vehicle.*.*",
type="car",
seed=None,
number=1,
project_to_road=False,
lane_type=carla.LaneType.Any):
"""
Spawns vehicles in the CARLA world at specified transforms or random spawn points.
Args:
world: The CARLA world object.
traffic_manager: The CARLA traffic manager object.
transforms (list): List of transforms to spawn vehicles at. If None, random spawn points are used.
filter (str): Blueprint filter for vehicles.
type (str): Type of vehicle to spawn (e.g., "car").
seed (int): Random seed for reproducibility.
number (int): Number of vehicles to spawn.
project_to_road (bool): Whether to project the spawn points to the road.
lane_type (carla.LaneType): The type of lane to project to (if project_to_road is True).
Returns:
list: A list of spawned vehicle actors.
"""
actor_list = []
if transforms is None:
transforms = _try_get_spawn_points(world, number=number, seed=seed)
if len(transforms) < number:
logging.warning(
f"Requested {number} vehicles, but only {len(transforms)} spawn points available. Spawning at available spawn points."
)
number = len(transforms)
vehicle_bps = _try_get_random_vehicles(world, number=number, filter=filter, type=type, seed=seed)
for vehicle_bp, transform in zip(vehicle_bps, transforms):
vehicle = _try_spawn_vehicle(world, vehicle_bp, transform, traffic_manager, project_to_road=project_to_road, lane_type=lane_type, autopilot=True)
if vehicle is not None:
actor_list.append(vehicle)
logging.debug(f"Vehicle {vehicle_bp.id} was spawned at {transform.location}")
return actor_list
def spawn_vehicle(world,
traffic_manager,
transform=None,
filter="vehicle.*.*",
type="car",
seed=None,
project_to_road=False,
lane_type=carla.LaneType.Any,
autopilot=True):
"""
Spawns a single vehicle in the CARLA world at the specified transform random spawn point.
Args:
world: The CARLA world object.
traffic_manager: The CARLA traffic manager object.
transform: Transform to spawn the vehicle at. If None, a random spawn point is used.
filter (str): Blueprint filter for the vehicle.
type (str): Type of vehicle to spawn (e.g., "car").
seed (int): Random seed for reproducibility.
project_to_road (bool): Whether to project the spawn point to the road.
lane_type (carla.LaneType): The type of lane to project to (if project_to_road is True).
Returns:
vehicle: The spawned vehicle actor, or None if spawning failed.
"""
if transform is None:
transforms = _try_get_spawn_points(world, number=1, seed=seed)
if not transforms:
return None
transform = transforms[0]
vehicle_bps = _try_get_random_vehicles(world, number=1, filter=filter, type=type, seed=seed)
if not vehicle_bps:
return None
vehicle_bp = vehicle_bps[0]
return _try_spawn_vehicle(world, vehicle_bp, transform, traffic_manager, project_to_road=project_to_road, lane_type=lane_type, autopilot=autopilot)