|
| 1 | +import os |
| 2 | +from typing import * |
| 3 | +import numpy as np |
| 4 | +import gymnasium as gym |
| 5 | + |
| 6 | +from config import * |
| 7 | + |
| 8 | + |
| 9 | +class WebotsSimulationGymEnvironment(gym.Env): |
| 10 | + """ |
| 11 | + One environment for each vehicle |
| 12 | +
|
| 13 | + n: index of the vehicle |
| 14 | + supervisor: the supervisor of the simulation |
| 15 | + """ |
| 16 | + |
| 17 | + def __init__(self, simulation_rank: int, vehicle_rank: int): |
| 18 | + |
| 19 | + |
| 20 | + super().__init__() |
| 21 | + self.simulation_rank = simulation_rank |
| 22 | + self.vehicle_rank = vehicle_rank |
| 23 | + |
| 24 | + self.handler = logging.FileHandler(f"/tmp/autotech/Voiture_{self.simulation_rank}_{self.vehicle_rank}.log") |
| 25 | + self.handler.setFormatter(FORMATTER) |
| 26 | + self.log = logging.getLogger("SERVER") |
| 27 | + self.log.setLevel(level=LOG_LEVEL) |
| 28 | + self.log.addHandler(self.handler) |
| 29 | + |
| 30 | + |
| 31 | + self.log.info("Initialisation started") |
| 32 | + |
| 33 | + # this is only true if lidar_horizontal_resolution = camera_horizontal_resolution |
| 34 | + box_min = np.zeros([2, context_size, lidar_horizontal_resolution], dtype=np.float32) |
| 35 | + box_max = np.ones([2, context_size, lidar_horizontal_resolution], dtype=np.float32) * 30 |
| 36 | + |
| 37 | + self.observation_space = gym.spaces.Box(box_min, box_max, dtype=np.float32) |
| 38 | + self.action_space = gym.spaces.MultiDiscrete([n_actions_steering, n_actions_speed]) |
| 39 | + |
| 40 | + if not os.path.exists("/tmp/autotech"): |
| 41 | + os.mkdir("/tmp/autotech") |
| 42 | + |
| 43 | + self.log.debug(f"Creation of the pipes") |
| 44 | + |
| 45 | + os.mkfifo(f"/tmp/autotech/{simulation_rank}_{vehicle_rank}toserver.pipe") |
| 46 | + os.mkfifo(f"/tmp/autotech/serverto{simulation_rank}_{vehicle_rank}.pipe") |
| 47 | + os.mkfifo(f"/tmp/autotech/{simulation_rank}_{vehicle_rank}tosupervisor.pipe") |
| 48 | + |
| 49 | + # --mode=fast --minimize --no-rendering --batch --stdout |
| 50 | + if vehicle_rank == 0 : |
| 51 | + os.system(f""" |
| 52 | + webots {__file__.rsplit('/', 1)[0]}/worlds/piste{simulation_rank % n_map}.wbt --mode=fast --minimize --batch --stdout & |
| 53 | + echo $! {simulation_rank}_{vehicle_rank} >>/tmp/autotech/simulationranks |
| 54 | + """) |
| 55 | + |
| 56 | + self.log.debug("Connection to the vehicle") |
| 57 | + self.fifo_w = open(f"/tmp/autotech/serverto{simulation_rank}_{vehicle_rank}.pipe", "wb") |
| 58 | + self.log.debug("Connection to the supervisor") |
| 59 | + self.fifo_r = open(f"/tmp/autotech/{simulation_rank}_{vehicle_rank}toserver.pipe", "rb") |
| 60 | + |
| 61 | + self.log.info("Initialisation finished\n") |
| 62 | + |
| 63 | + def reset(self, seed=0): |
| 64 | + # basically useless function |
| 65 | + |
| 66 | + # lidar data |
| 67 | + # this is true for lidar_horizontal_resolution = camera_horizontal_resolution |
| 68 | + self.context = obs = np.zeros([2, context_size, lidar_horizontal_resolution], dtype=np.float32) |
| 69 | + info = {} |
| 70 | + self.log.info(f"reset finished\n") |
| 71 | + return obs, info |
| 72 | + |
| 73 | + def step(self, action): |
| 74 | + |
| 75 | + self.log.info("Starting step") |
| 76 | + self.log.info(f"sending {action=}") |
| 77 | + self.fifo_w.write(action.tobytes()) |
| 78 | + self.fifo_w.flush() |
| 79 | + |
| 80 | + # communication with the supervisor |
| 81 | + self.log.debug("trying to get info from supervisor") |
| 82 | + cur_state = np.frombuffer(self.fifo_r.read(np.dtype(np.float32).itemsize * (n_sensors + lidar_horizontal_resolution + camera_horizontal_resolution)), dtype=np.float32) |
| 83 | + self.log.info(f"received {cur_state=}") |
| 84 | + reward = np.frombuffer(self.fifo_r.read(np.dtype(np.float32).itemsize), dtype=np.float32)[0] # scalar |
| 85 | + self.log.info(f"received {reward=}") |
| 86 | + done = np.frombuffer(self.fifo_r.read(np.dtype(np.bool).itemsize), dtype=np.bool)[0] # scalar |
| 87 | + self.log.info(f"received {done=}") |
| 88 | + truncated = np.frombuffer(self.fifo_r.read(np.dtype(np.bool).itemsize), dtype=np.bool)[0] # scalar |
| 89 | + self.log.info(f"received {truncated=}") |
| 90 | + info = {} |
| 91 | + |
| 92 | + cur_state = np.nan_to_num(cur_state[n_sensors:], nan=0., posinf=30.) |
| 93 | + |
| 94 | + lidar_obs = cur_state[:lidar_horizontal_resolution] |
| 95 | + camera_obs = cur_state[lidar_horizontal_resolution:] |
| 96 | + |
| 97 | + self.context = obs = np.concatenate([ |
| 98 | + self.context[:, 1:], |
| 99 | + [lidar_obs[None], camera_obs[None]] |
| 100 | + ], axis=1) |
| 101 | + |
| 102 | + self.log.info("step over") |
| 103 | + |
| 104 | + return obs, reward, done, truncated, info |
| 105 | + |
| 106 | + |
0 commit comments