From 2301002e29203246b6e41ea4637f5d1322dfa90f Mon Sep 17 00:00:00 2001 From: OrestisTS <118257809+OrestisTS@users.noreply.github.com> Date: Wed, 22 May 2024 13:29:14 +0200 Subject: [PATCH 1/2] refactor<>:Remove ROS/Wireguard from project --- docker-compose.yml | 27 --------- pilot/pilot/app.py | 7 +-- pilot/pilot/core.py | 23 ++----- pilot/pilot/tests.py | 2 - rosnode/Dockerfile | 17 ------ rosnode/app.py | 139 ------------------------------------------- 6 files changed, 7 insertions(+), 208 deletions(-) delete mode 100644 rosnode/Dockerfile delete mode 100644 rosnode/app.py diff --git a/docker-compose.yml b/docker-compose.yml index c84fe675..f7606425 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,6 @@ version: '2' volumes: volume_zerotier_config: - volume_wireguard_config: volume_ftpd_config: volume_mongodb_config: volume_mongodb_data: @@ -22,20 +21,6 @@ services: - CAP_SYS_RAWIO volumes: - volume_zerotier_config:/var/lib/zerotier-one:rw - wireguard: - cpuset: '0' - image: masipcat/wireguard-go - container_name: wireguard - restart: always - network_mode: host - devices: - - '/dev/net/tun' - cap_add: - - SYS_ADMIN - - NET_ADMIN - - CAP_SYS_RAWIO - volumes: - - volume_wireguard_config:/etc/wireguard:rw httpd: cpuset: '0' build: @@ -61,18 +46,6 @@ services: volumes: - volume_ftpd_config:/etc/pureftpd:rw - volume_byodr_sessions:/home/ftpuser:rw - rosnode: - cpuset: '0' - build: - context: . - dockerfile: rosnode/Dockerfile - restart: always - command: ['python3', 'app.py', '--name', 'rosnode'] - network_mode: host - stop_signal: SIGKILL - volumes: - - volume_byodr_sockets:/byodr:rw - - volume_byodr_config:/config:ro mongodb: cpuset: '0' build: diff --git a/pilot/pilot/app.py b/pilot/pilot/app.py index ef491b36..224f2ba3 100644 --- a/pilot/pilot/app.py +++ b/pilot/pilot/app.py @@ -47,7 +47,6 @@ def __init__(self, event, processor, relay, config_dir=os.getcwd(), hz=100): self.ipc_server = None self.ipc_chatter = None self.teleop = None - self.ros = None self.vehicle = None self.inference = None self._init(relay) @@ -101,7 +100,7 @@ def finish(self): def step(self): teleop = self.teleop() - commands = (teleop, self.ros(), self.vehicle(), self.inference()) + commands = (teleop, self.vehicle(), self.inference()) pilot = self._processor.next_action(*commands) # print(pilot) self._monitor.step(pilot, teleop) @@ -127,19 +126,17 @@ def main(): application = PilotApplication(quit_event, processor=CommandProcessor(route_store), relay=_relay, config_dir=args.config) teleop = json_collector(url="ipc:///byodr/teleop.sock", topic=b"aav/teleop/input", event=quit_event) - ros = json_collector(url="ipc:///byodr/ros.sock", topic=b"aav/ros/input", hwm=10, pop=True, event=quit_event) vehicle = json_collector(url="ipc:///byodr/vehicle.sock", topic=b"aav/vehicle/state", event=quit_event) inference = json_collector(url="ipc:///byodr/inference.sock", topic=b"aav/inference/state", event=quit_event) ipc_chatter = json_collector(url="ipc:///byodr/teleop_c.sock", topic=b"aav/teleop/chatter", pop=True, event=quit_event) application.teleop = lambda: teleop.get() - application.ros = lambda: ros.get() application.vehicle = lambda: vehicle.get() application.inference = lambda: inference.get() application.ipc_chatter = lambda: ipc_chatter.get() application.publisher = JSONPublisher(url="ipc:///byodr/pilot.sock", topic="aav/pilot/output") application.ipc_server = LocalIPCServer(url="ipc:///byodr/pilot_c.sock", name="pilot", event=quit_event) - threads = [teleop, ros, vehicle, inference, ipc_chatter, application.ipc_server, threading.Thread(target=application.run)] + threads = [teleop, vehicle, inference, ipc_chatter, application.ipc_server, threading.Thread(target=application.run)] if quit_event.is_set(): return 0 diff --git a/pilot/pilot/core.py b/pilot/pilot/core.py index 181281cb..55006654 100644 --- a/pilot/pilot/core.py +++ b/pilot/pilot/core.py @@ -842,25 +842,13 @@ def _cache_safe(self, key, func, *arguments): finally: self._cache[key] = 1 - def _process_ros(self, c_ros): - # It could be a collection of ros commands. - ros_messages = [] if c_ros is None else c_ros if (isinstance(c_ros, tuple) or isinstance(c_ros, list)) else [c_ros] - for _msg in ros_messages: - if "pilot.driver.set" in _msg: - self._cache_safe("ros switch driver", lambda: self._driver.switch_ctl(_msg.get("pilot.driver.set"))) - if "pilot.maximum.speed" in _msg: - self._cache_safe("ros set cruise speed", lambda: self._driver.set_cruise_speed(_msg.get("pilot.maximum.speed"))) - - def _process(self, c_teleop, c_ros, c_inference): + def _process(self, c_teleop, c_inference): # r_action = None if c_external is None else c_external.get('action', None) # if r_action == 'resume': # self._cache_safe('external api call', lambda: self._driver.switch_ctl('driver_mode.inference.dnn')) self._driver.process_navigation(c_teleop, c_inference) - # Continue with ros instructions which take precedence over a route. - self._process_ros(c_ros) - # Zero out max speed on any intervention as a safety rule for ap and to detect faulty controllers. # With the left button this behavior can be overridden to allow for steer corrections. c_teleop = {} if c_teleop is None else c_teleop @@ -889,7 +877,7 @@ def _process(self, c_teleop, c_ros, c_inference): elif c_teleop.get("arrow_down", 0) == 1: self._cache_safe("decrease cruise speed", lambda: self._driver.decrease_cruise_speed()) - def _unpack_commands(self, teleop, ros, vehicle, inference): + def _unpack_commands(self, teleop, vehicle, inference): _ts = timestamp() # Check if the incoming data is within the patience time limit. # print("Received teleop data:", teleop) @@ -897,13 +885,12 @@ def _unpack_commands(self, teleop, ros, vehicle, inference): teleop = teleop if teleop is not None else None vehicle = vehicle if vehicle is not None and (_ts - vehicle.get("time", 0) < self._patience_micro) else None inference = inference if inference is not None and (_ts - inference.get("time", 0) < self._patience_micro) else None - # ROS commands are processed each time as specified in your existing logic. - return teleop, ros, vehicle, inference + return teleop, vehicle, inference def next_action(self, *args): - teleop, ros, vehicle, inference = self._unpack_commands(*args) + teleop, vehicle, inference = self._unpack_commands(*args) # Handle instructions first. - self._process(teleop, ros, inference) + self._process(teleop, inference) # What to do on message timeout depends on which driver is active. _ctl = self._driver.get_driver_ctl() # Switch off autopilot on internal errors. diff --git a/pilot/pilot/tests.py b/pilot/pilot/tests.py index f02476a6..b2b97603 100644 --- a/pilot/pilot/tests.py +++ b/pilot/pilot/tests.py @@ -18,7 +18,6 @@ def test_create_and_setup(tmpdir): publisher = CollectPublisher() ipc_server = CollectServer() teleop = QueueReceiver() - ros = QueueReceiver() vehicle = QueueReceiver() ipc_chatter = QueueReceiver() route_store = ReloadableDataSource(FileSystemRouteDataSource(directory=directory, load_instructions=True)) @@ -26,7 +25,6 @@ def test_create_and_setup(tmpdir): app.publisher = publisher app.ipc_server = ipc_server app.teleop = lambda: teleop.get_latest() - app.ros = lambda: ros.get_latest() app.vehicle = lambda: vehicle.get_latest() app.inference = lambda: None app.ipc_chatter = lambda: ipc_chatter.get_latest() diff --git a/rosnode/Dockerfile b/rosnode/Dockerfile deleted file mode 100644 index b5723962..00000000 --- a/rosnode/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -FROM ros:foxy - -RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 4B63CF8FDE49746E98FA01DDAD19BAB3CBF125EA - -# Proceed with the rest of your setup -RUN apt-get update && apt-get install -y --no-install-recommends \ - python3-zmq \ - nano \ - wget - -COPY ./common common/ -COPY ./rosnode app/ -WORKDIR /app - -ENV PYTHONPATH "${PYTHONPATH}:/common" - -CMD ["python3", "app.py"] diff --git a/rosnode/app.py b/rosnode/app.py deleted file mode 100644 index be371c67..00000000 --- a/rosnode/app.py +++ /dev/null @@ -1,139 +0,0 @@ -import argparse -import glob -import logging -import os -from configparser import ConfigParser as SafeConfigParser - -import rclpy -from actionlib_msgs.msg import GoalID -from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue -from rclpy.node import Node -from std_msgs.msg import Float32 - -from byodr.utils import Application, timestamp -from byodr.utils.ipc import json_collector, JSONPublisher - - -class Bridge(Node): - def __init__(self, node_name, get_pilot_message, publish_internal): - super().__init__('byodr1' if node_name is None else node_name) - self._get_pilot = get_pilot_message - self._publish_internal = publish_internal - self.create_subscription(GoalID, '{}/v10/pilot/set_mode'.format(self.get_name()), self._receive_mode, 10) - self.create_subscription(Float32, '{}/v10/pilot/set_maximum_speed'.format(self.get_name()), self._receive_max_speed, 10) - self._ros_publisher = self.create_publisher(DiagnosticArray, '{}/v10/pilot/diagnostics'.format(self.get_name()), 10) - # The timer period is specified in seconds. - self.timer = self.create_timer(1. / 10, self._publish_diagnostics) - - def _publish_diagnostics(self): - pilot = self._get_pilot() - if pilot: - status = DiagnosticStatus(name='mode', message='not_available') - status.values = [ - KeyValue(key='steer', value='{:+2.5f}'.format(pilot.get('steering'))), - KeyValue(key='throttle', value='{:+2.5f}'.format(pilot.get('throttle'))) - ] - driver_mode = pilot.get('driver') - if driver_mode == 'driver_mode.teleop.direct': - status.message = 'teleoperation' - elif driver_mode == 'driver_mode.inference.dnn': - status.message = 'autopilot' - status.values += [ - KeyValue(key='maximum_speed', value='{:+2.5f}'.format(pilot.get('cruise_speed'))), - KeyValue(key='desired_speed', value='{:+2.5f}'.format(pilot.get('desired_speed'))) - ] - diagnostics = DiagnosticArray() - diagnostics.header.stamp = self.get_clock().now().to_msg() - diagnostics.status = [status] - self._ros_publisher.publish(diagnostics) - - def _send_internal_command(self, cmd): - if cmd: - cmd['time'] = timestamp() - self._publish_internal(cmd) - - def _receive_mode(self, msg): - # self.get_logger().info('I heard: "%s"' % msg.data) - internal = dict() - if msg.id == 'teleoperation': - internal['pilot.driver.set'] = 'driver_mode.teleop.direct' - elif msg.id == 'autopilot': - internal['pilot.driver.set'] = 'driver_mode.inference.dnn' - self._send_internal_command(internal) - - def _receive_max_speed(self, msg): - internal = dict() - # The value may equal 0 (zero) and bool(0) evaluates to False. - if msg.data is not None: - internal['pilot.maximum.speed'] = msg.data - self._send_internal_command(internal) - - -class RosApplication(Application): - def __init__(self, config_dir=os.getcwd()): - # Just check for restarts - no need for high frequency processing. - super(RosApplication, self).__init__(run_hz=1) - self._config_dir = config_dir - self._bridge = None - self.pilot = None - self.publish = None - self.ipc_chatter = None - - def _config(self): - parser = SafeConfigParser() - [parser.read(_f) for _f in glob.glob(os.path.join(self._config_dir, '*.ini'))] - return dict(parser.items('ros2')) if parser.has_section('ros2') else {} - - def setup(self): - if self.active(): - if self._bridge is not None: - self._bridge.destroy_node() - bridge = Bridge(node_name=self._config().get('rover.node.name', 'rover1'), - get_pilot_message=self.pilot, - publish_internal=self.publish) - rclpy.spin(bridge) - self._bridge = bridge - - def finish(self): - if self._bridge is not None: - self._bridge.destroy_node() - - def step(self): - chat = self.ipc_chatter() - if chat and chat.get('command') == 'restart': - self.setup() - - -def main(): - parser = argparse.ArgumentParser(description='ROS2 rover node.') - parser.add_argument('--name', type=str, default='none', help='Process name.') - parser.add_argument('--config', type=str, default='/config', help='Config directory path.') - args = parser.parse_args() - - application = RosApplication(config_dir=args.config) - quit_event = application.quit_event - logger = application.logger - - publisher = JSONPublisher(url='ipc:///byodr/ros.sock', topic='aav/ros/input') - pilot = json_collector(url='ipc:///byodr/pilot.sock', topic=b'aav/pilot/output', event=quit_event) - ipc_chatter = json_collector(url='ipc:///byodr/teleop_c.sock', topic=b'aav/teleop/chatter', pop=True, event=quit_event) - application.publish = lambda m: publisher.publish(m) - application.pilot = lambda: pilot.get() - application.ipc_chatter = lambda: ipc_chatter.get() - threads = [pilot, ipc_chatter] - if quit_event.is_set(): - return 0 - - rclpy.init() - [t.start() for t in threads] - application.run() - - logger.info("Waiting on threads to stop.") - [t.join() for t in threads] - rclpy.shutdown() - - -if __name__ == "__main__": - logging.basicConfig(format='%(levelname)s: %(asctime)s %(filename)s %(funcName)s %(message)s', datefmt='%Y%m%d:%H:%M:%S %p %Z') - logging.getLogger().setLevel(logging.INFO) - main() From 71f709dc9c32966594a49ef7de4fd597a8d56768 Mon Sep 17 00:00:00 2001 From: Ahmed Mahfouz Date: Wed, 29 May 2024 15:38:28 +0200 Subject: [PATCH 2/2] refactor remove its occurrences the effected files are in VEH and service documentation --- Services_Documentation.txt | 7 +- docker-compose.carla.yml | 64 ----- teleop/htm/static/JS/Index/index_a_utils.js | 2 +- vehicles/carla09/Dockerfile | 20 -- vehicles/carla09/app.py | 230 ------------------ vehicles/carla09/config.template | 17 -- vehicles/carla09/vehicle.py | 257 -------------------- vehicles/carla09/video.py | 139 ----------- 8 files changed, 2 insertions(+), 734 deletions(-) delete mode 100644 docker-compose.carla.yml delete mode 100644 vehicles/carla09/Dockerfile delete mode 100644 vehicles/carla09/app.py delete mode 100644 vehicles/carla09/config.template delete mode 100644 vehicles/carla09/vehicle.py delete mode 100644 vehicles/carla09/video.py diff --git a/Services_Documentation.txt b/Services_Documentation.txt index 75dffcb7..3b2d3d98 100644 --- a/Services_Documentation.txt +++ b/Services_Documentation.txt @@ -146,12 +146,7 @@ It also hosts the site design files necessary to draw the Web App. 6) Vehicle -Input 1: Receives data in a JSON format from the Pilot service -Input 2: Receives data in a JSON format from the Teleop service --Function: This process sets up a server that connects to a CARLA simulator and communicates with it to control a self-driving car. Carla is an open-source simulator for autonomous driving research. It is used to simulate the robot’s behavior in a virtual environment. --Output 1: Sends the data to a server running an instance of CARLA. The data sent will properly represent a segment inside the simulation. --Output 2: Sends the data to a server running an instance of CARLA. The data sent will properly represent a segment inside the simulation. --Q1: Is this process exclusively used to send data to the CARLA simulator, and nothing else regarding the driving of the robot? --Q2: Where is the CARLA simulation hosted? --Q3: What do the video streams created in the server do exactly? +-Q1: What do the video streams created in the server do exactly? 7) ROS Node -Input 1: Receives data in a JSON format from the Pilot service diff --git a/docker-compose.carla.yml b/docker-compose.carla.yml deleted file mode 100644 index 102bd25d..00000000 --- a/docker-compose.carla.yml +++ /dev/null @@ -1,64 +0,0 @@ -version: '2' -services: - zerotier: - image: rwgrim/docker-noop:latest - restart: 'no' - command: ["/bin/true"] - wireguard: - image: rwgrim/docker-noop:latest - restart: 'no' - command: ["/bin/true"] - httpd: - restart: 'no' - volumes: - - ./common:/common - - ./httpd:/app - ftpd: - restart: 'no' - volumes: - - ./common:/common - - ./ftpd:/app - rosnode: - restart: 'no' - volumes: - - ./common:/common - - ./rosnode:/app - mongodb: - restart: 'no' - volumes: - - ./common:/common - - ./mongodb:/app - vehicle: - # Need extra cpu resources to run the h264 encoder sockets. - cpuset: "1,4,5" - build: - context: . - dockerfile: vehicles/carla09/Dockerfile - image: centipede2donald/byodr-ce:carla-099 - restart: 'no' - network_mode: host - volumes: - - ./common:/common - - ./vehicles/carla09:/app - teleop: - restart: 'no' - volumes: - - ./common:/common - - ./teleop:/app - pilot: - restart: 'no' - volumes: - - ./common:/common - - ./pilot:/app - inference: - build: - context: . - dockerfile: inference/runtime-cp36-x86.dockerfile - image: centipede2donald/byodr-ce:inference-carla - restart: 'no' - ipc: host - environment: - - NVIDIA_VISIBLE_DEVICES=all - volumes: - - ./common:/common - - ./inference:/app diff --git a/teleop/htm/static/JS/Index/index_a_utils.js b/teleop/htm/static/JS/Index/index_a_utils.js index db97ef46..7ac657b3 100644 --- a/teleop/htm/static/JS/Index/index_a_utils.js +++ b/teleop/htm/static/JS/Index/index_a_utils.js @@ -135,7 +135,7 @@ var page_utils = { if (dev_tools.is_develop()) { _instance._capabilities = { 'platform': { - 'vehicle': dev_tools._random_choice(['rover1', 'carla1']), + 'vehicle': 'rover1', 'video': { 'front': { 'ptz': 0 }, 'rear': { 'ptz': 0 } } } }; diff --git a/vehicles/carla09/Dockerfile b/vehicles/carla09/Dockerfile deleted file mode 100644 index d4bfbe6d..00000000 --- a/vehicles/carla09/Dockerfile +++ /dev/null @@ -1,20 +0,0 @@ -FROM carlasim/carla:0.9.9 AS builder - -FROM centipede2donald/ubuntu-bionic:python27-opencv32-gstreamer10 - -RUN pip install -r https://raw.githubusercontent.com/carla-simulator/carla/0.9.9/PythonAPI/carla/requirements.txt - -COPY ./common common/ -COPY ./vehicles/carla09 app/ -COPY --from=builder /home/carla/PythonAPI/ PythonAPI/ -WORKDIR /app - -RUN python -m easy_install --no-deps /PythonAPI/carla/dist/carla-0.9.9-py2.7-linux-x86_64.egg - -RUN pip install geographiclib - -RUN pip install "tornado==4.5.3" - -ENV PYTHONPATH "${PYTHONPATH}:/common" - -CMD ["python", "app.py"] \ No newline at end of file diff --git a/vehicles/carla09/app.py b/vehicles/carla09/app.py deleted file mode 100644 index 835a7fa1..00000000 --- a/vehicles/carla09/app.py +++ /dev/null @@ -1,230 +0,0 @@ -import argparse -import glob -import logging -import multiprocessing -import os -import shutil -import signal -import threading - -from ConfigParser import SafeConfigParser -from tornado import web, ioloop -from tornado.httpserver import HTTPServer - -from byodr.utils import Application -from byodr.utils import Configurable -from byodr.utils.ipc import JSONPublisher, ImagePublisher, LocalIPCServer, json_collector -from byodr.utils.option import parse_option -from byodr.utils.websocket import HttpLivePlayerVideoSocket -from vehicle import CarlaHandler -from video import NumpyImageVideoSource - -logger = logging.getLogger(__name__) - -io_loop = ioloop.IOLoop.instance() -signal.signal(signal.SIGINT, lambda sig, frame: io_loop.add_callback_from_signal(_interrupt)) -signal.signal(signal.SIGTERM, lambda sig, frame: io_loop.add_callback_from_signal(_interrupt)) - -quit_event = multiprocessing.Event() - - -def _interrupt(): - logger.info("Received interrupt, quitting.") - quit_event.set() - io_loop.stop() - - -class RoutingImagePublisher(object): - def __init__(self, cameras, streams): - self.front_camera = cameras[0] - self.rear_camera = cameras[1] - self.front_stream = streams[0] - self.rear_stream = streams[1] - - def quit(self): - self.front_stream.quit() - self.rear_stream.quit() - - def restart(self, **kwargs): - self.front_stream.restart(**kwargs) - self.rear_stream.restart(**kwargs) - return self.front_stream.get_errors() + self.rear_stream.get_errors() - - def publish(self, image, route=0): - if route == 0: - self.front_camera.publish(image) - self.front_stream.push(image) - else: - self.rear_camera.publish(image) - self.rear_stream.push(image) - - -class CarlaRunner(Configurable): - def __init__(self, image_publisher): - super(CarlaRunner, self).__init__() - self._process_frequency = 10 - self._patience_micro = 1000. - self._publisher = image_publisher - self._vehicle = CarlaHandler((lambda img, route: image_publisher.publish(img, route=route))) - - def get_process_frequency(self): - with self._lock: - return self._process_frequency - - def get_patience_micro(self): - with self._lock: - return self._patience_micro - - def internal_quit(self, restarting=False): - if not restarting: - self._vehicle.quit() - self._publisher.quit() - - def internal_start(self, **kwargs): - self._vehicle.restart(**kwargs) - _errors = self._vehicle.get_errors() - _errors += self._publisher.restart(**kwargs) - self._process_frequency = parse_option('clock.hz', int, 80, _errors, **kwargs) - self._patience_micro = parse_option('patience.ms', int, 100, _errors, **kwargs) * 1000. - return _errors - - def reset_agent(self): - self._vehicle.reset() - - def state(self): - return self._vehicle.state() - - def noop(self): - self._vehicle.noop() - - def drive(self, cmd): - self._vehicle.drive(cmd) - - -class CarlaApplication(Application): - def __init__(self, image_router, config_dir=os.getcwd()): - super(CarlaApplication, self).__init__(quit_event=quit_event) - self._runner = CarlaRunner(image_router) - self._config_dir = config_dir - self.publisher = None - self.ipc_chatter = None - self.pilot = None - self.teleop = None - self.ipc_server = None - - def _check_user_file(self): - # One user configuration file is optional and can be used to persist settings. - _candidates = glob.glob(os.path.join(self._config_dir, '*.ini')) - if len(_candidates) == 0: - shutil.copyfile('config.template', os.path.join(self._config_dir, 'config.ini')) - logger.info("Created a new user configuration file from template.") - - def _config(self): - parser = SafeConfigParser() - [parser.read(_f) for _f in glob.glob(os.path.join(self._config_dir, '*.ini'))] - cfg = dict(parser.items('platform')) if parser.has_section('platform') else {} - cfg.update(dict(parser.items('vehicle')) if parser.has_section('vehicle') else {}) - cfg.update(dict(parser.items('camera')) if parser.has_section('camera') else {}) - self.logger.info(cfg) - return cfg - - @staticmethod - def _capabilities(): - # The video dimensions are determined by the websocket services. - return { - 'vehicle': 'carla1', - 'video': {'front': {'ptz': 0}, 'rear': {'ptz': 0}} - } - - def setup(self): - if self.active(): - self._check_user_file() - _restarted = self._runner.restart(**self._config()) - if _restarted: - self.ipc_server.register_start(self._runner.get_errors(), self._capabilities()) - _process_frequency = self._runner.get_process_frequency() - _patience_micro = self._runner.get_patience_micro() - self.set_hz(_process_frequency) - self.logger.info("Processing at {} Hz and a patience of {} ms.".format(_process_frequency, _patience_micro / 1000)) - - def finish(self): - self._runner.quit() - - # def run(self): - # from byodr.utils import Profiler - # profiler = Profiler() - # with profiler(): - # super(CarlaApplication, self).run() - # profiler.dump_stats('/config/carla.stats') - - def step(self): - runner, pilot, teleop, ipc_chatter, ipc_server = self._runner, self.pilot, self.teleop, self.ipc_chatter, self.ipc_server - c_pilot = self._latest_or_none(pilot, patience=(runner.get_patience_micro())) - c_teleop = self._latest_or_none(teleop, patience=(runner.get_patience_micro())) - if c_teleop is not None and c_teleop.get('button_a', 0): - runner.reset_agent() - if c_pilot is not None: - runner.drive(c_pilot) - else: - runner.noop() - self.publisher.publish(runner.state()) - chat = self.ipc_chatter() - if chat and chat.get('command') == 'restart': - self.setup() - - -def main(): - parser = argparse.ArgumentParser(description='Carla vehicle client.') - parser.add_argument('--name', type=str, default='none', help='Process name.') - parser.add_argument('--config', type=str, default='/config', help='Config directory path.') - args = parser.parse_args() - - front_camera = ImagePublisher(url='ipc:///byodr/camera_0.sock', topic='aav/camera/0') - rear_camera = ImagePublisher(url='ipc:///byodr/camera_1.sock', topic='aav/camera/1') - - front_stream = NumpyImageVideoSource(name='front') - rear_stream = NumpyImageVideoSource(name='rear') - - image_router = RoutingImagePublisher(cameras=(front_camera, rear_camera), streams=(front_stream, rear_stream)) - application = CarlaApplication(image_router=image_router, config_dir=args.config) - - pilot = json_collector(url='ipc:///byodr/pilot.sock', topic=b'aav/pilot/output', event=quit_event) - teleop = json_collector(url='ipc:///byodr/teleop.sock', topic=b'aav/teleop/input', event=quit_event) - ipc_chatter = json_collector(url='ipc:///byodr/teleop_c.sock', topic=b'aav/teleop/chatter', pop=True, event=quit_event) - - application.publisher = JSONPublisher(url='ipc:///byodr/vehicle.sock', topic='aav/vehicle/state') - application.ipc_server = LocalIPCServer(url='ipc:///byodr/vehicle_c.sock', name='platform', event=quit_event) - application.pilot = lambda: pilot.get() - application.teleop = lambda: teleop.get() - application.ipc_chatter = lambda: ipc_chatter.get() - app_thread = threading.Thread(target=application.run) - - threads = [pilot, teleop, ipc_chatter, application.ipc_server, app_thread] - if quit_event.is_set(): - return 0 - - [t.start() for t in threads] - - try: - class_ref = HttpLivePlayerVideoSocket - front_app = web.Application([(r"/", class_ref, dict(video_source=front_stream, io_loop=io_loop))]) - front_server = HTTPServer(front_app, xheaders=True) - front_server.bind(9101) - front_server.start() - rear_app = web.Application([(r"/", class_ref, dict(video_source=rear_stream, io_loop=io_loop))]) - rear_server = HTTPServer(rear_app, xheaders=True) - rear_server.bind(9102) - rear_server.start() - logger.info("Video streams started on port 9101 and 9102.") - io_loop.start() - except KeyboardInterrupt: - quit_event.set() - - logger.info("Waiting on threads to stop.") - [t.join() for t in threads] - - -if __name__ == "__main__": - logging.basicConfig(format='%(levelname)s: %(asctime)s %(filename)s %(funcName)s %(message)s', datefmt='%Y%m%d:%H:%M:%S %p %Z') - logging.getLogger().setLevel(logging.INFO) - main() diff --git a/vehicles/carla09/config.template b/vehicles/carla09/config.template deleted file mode 100644 index 064a3535..00000000 --- a/vehicles/carla09/config.template +++ /dev/null @@ -1,17 +0,0 @@ -[platform] -host.location = localhost:2000 -weather.random.each.seconds = 300 -camera.image.input.shape = 640x480 -world.spawn.preferred.id = -1 -world.spawn.preferred.weather = ClearNoon - -[pilot] -driver.steering.teleop.scale = 0.40 -driver.cc.static.gear.step = 5 -driver.cc.static.speed.max = 200 -driver.cc.static.speed.min = 0.1 -driver.cc.stop.pid_controller.p = 1.0 -driver.cc.throttle.pid_controller.p = 0.2 -driver.cc.throttle.pid_controller.i = 1.0 -driver.cc.throttle.pid_controller.d = 0.0 -driver.handler.steering.low_pass.momentum = 0.95 \ No newline at end of file diff --git a/vehicles/carla09/vehicle.py b/vehicles/carla09/vehicle.py deleted file mode 100644 index d2301efb..00000000 --- a/vehicles/carla09/vehicle.py +++ /dev/null @@ -1,257 +0,0 @@ -import collections -import logging -import math -import multiprocessing -import re -import threading -import time - -import carla -import numpy as np -from carla import Transform, Location, Rotation - -from byodr.utils import timestamp, Configurable -from byodr.utils.location import GeoTracker -from byodr.utils.option import parse_option - -logger = logging.getLogger(__name__) - - -class GeoTrackerThread(threading.Thread): - def __init__(self, world, actor): - super(GeoTrackerThread, self).__init__() - self._world = world - self._actor = actor - self._quit_event = threading.Event() - self._tracker = GeoTracker() - self._queue = collections.deque(maxlen=1) - - def _location(self): - location = None if self._actor is None else self._actor.get_location() - return None if location is None else self._world.get_map().transform_to_geolocation(location) - - def _track(self): - # Getting the geolocation from carla takes 150ms on average. - c_geo = self._location() - if c_geo is not None: - latitude, longitude = (c_geo.latitude, c_geo.longitude) - self._queue.append(self._tracker.track((latitude, longitude))) - - def get(self): - # latitude, longitude, bearing - return self._queue[-1] if len(self._queue) > 0 else (0, 0, 0) - - def quit(self): - self._quit_event.set() - - def run(self): - while not self._quit_event.is_set(): - try: - self._track() - except RuntimeError: - pass - # Determines this thread's run frequency. - time.sleep(0.200) - - -class CarlaHandler(Configurable): - - def __init__(self, fn_on_image): - super(CarlaHandler, self).__init__() - self._camera_callback = fn_on_image - self._image_shape = (600, 800, 3) - self._tm_port = 8000 - self._rand_weather_seconds = -1 - self._spawn_preferred_id = -1 - self._spawn_preferred_weather = None - self._world = None - self._traffic_manager = None - self._actor = None - self._sensors = [] - self._geo_tracker = None - self._actor_lock = multiprocessing.Lock() - self._spawn_index = 1 - self._vehicle_tick = None - self._in_carla_autopilot = False - self._change_weather_time = 0 - self._in_reverse = False - - def internal_quit(self, restarting=False): - if self._vehicle_tick: - self._world.remove_on_tick(self._vehicle_tick) - if self._geo_tracker is not None: - self._geo_tracker.quit() - self._destroy() - - def internal_start(self, **kwargs): - _errors = [] - _remote = parse_option('host.location', str, '127.0.0.1:2000', _errors, **kwargs) - _img_wh = parse_option('camera.image.input.shape', str, '640x480', errors=_errors, **kwargs) - carla_host, carla_port = _remote, 2000 - if ':' in carla_host: - host, port = carla_host.split(':') - carla_host, carla_port = host, int(port) - carla_client = carla.Client(carla_host, carla_port) - carla_client.set_timeout(5.) - _shape = [int(x) for x in _img_wh.split('x')] - _shape = (_shape[1], _shape[0], 3) - self._image_shape = _shape - self._rand_weather_seconds = parse_option('weather.random.each.seconds', int, 180, _errors, **kwargs) - self._spawn_preferred_id = parse_option('world.spawn.preferred.id', int, -1, _errors, **kwargs) - self._spawn_preferred_weather = parse_option('world.spawn.preferred.weather', str, 'ClearNoon', _errors, **kwargs) - self._world = carla_client.get_world() - self._traffic_manager = carla_client.get_trafficmanager(self._tm_port) - self._traffic_manager.global_percentage_speed_difference(65) - self._change_weather_time = time.time() + self._rand_weather_seconds - self._vehicle_tick = self._world.on_tick(lambda x: self.tick(x)) - self.reset() - return _errors - - def _destroy(self): - if self._actor is not None and self._actor.is_alive: - self._actor.destroy() - for sensor in self._sensors: - if sensor.is_alive: - sensor.destroy() - - def _create_camera(self, sensor_type='sensor.camera.rgb'): - camera_bp = self._world.get_blueprint_library().find(sensor_type) - # Modify the attributes of the blueprint to set image resolution and field of view. - im_height, im_width = self._image_shape[:2] - camera_bp.set_attribute('image_size_x', '{}'.format(im_width)) - camera_bp.set_attribute('image_size_y', '{}'.format(im_height)) - # camera_bp.set_attribute('fov', '150') - # Set the time in seconds between sensor captures - camera_bp.set_attribute('sensor_tick', "{:2.2f}".format(1. / 50)) - return camera_bp - - def reset(self, attempt=0): - logger.info('Resetting ...') - self._in_carla_autopilot = False - self._in_reverse = False - self._destroy() - # - blueprint_library = self._world.get_blueprint_library() - vehicle_bp = blueprint_library.find('vehicle.tesla.model3') - spawn_points = self._world.get_map().get_spawn_points() - spawn_id = self._spawn_preferred_id if attempt == 0 and self._spawn_preferred_id >= 0 else np.random.randint(len(spawn_points)) - spawn_point = spawn_points[spawn_id] - try: - self._actor = self._world.spawn_actor(vehicle_bp, spawn_point) - except RuntimeError as e: - if attempt < 4: - self.reset(attempt + 1) - else: - raise e - logger.info("Spawn point is '{}' with id {}.".format(spawn_point, spawn_id)) - # Attach the camera's - defaults at https://carla.readthedocs.io/en/latest/cameras_and_sensors/. - # Provide the position of the sensor relative to the vehicle. - # Tell the world to spawn the sensor, don't forget to attach it to your vehicle actor. - camera_front = self._world.spawn_actor(self._create_camera(), - Transform(Location(x=1.25, z=1.4)), - attach_to=self._actor) - camera_rear = self._world.spawn_actor(self._create_camera(), - Transform(Location(x=-1.0, z=2.0), Rotation(pitch=180, roll=180)), - attach_to=self._actor) - self._sensors.append(camera_front) - self._sensors.append(camera_rear) - # Subscribe to the sensor stream by providing a callback function, - # this function is called each time a new image is generated by the sensor. - camera_front.listen(lambda data: self._on_camera(data, camera=0)) - camera_rear.listen(lambda data: self._on_camera(data, camera=1)) - self._traffic_manager.ignore_lights_percentage(self._actor, 100.) - self._set_weather(use_preset=True) - # The tracker need recreation through the actor dependency. - if self._geo_tracker is not None: - self._geo_tracker.quit() - self._geo_tracker = GeoTrackerThread(self._world, self._actor) - self._geo_tracker.start() - - def _on_camera(self, data, camera=0): - img = np.frombuffer(data.raw_data, dtype=np.dtype("uint8")) - _height, _width = self._image_shape[:2] - img = np.reshape(img, (_height, _width, 4)) # To bgr_a format. - img = img[:, :, :3] # The image standard is hwc bgr. - self._camera_callback(img, camera) - - def _carla_vel(self): - # Carla measures in meters and seconds. - velocity = self._actor.get_velocity() - return math.sqrt(velocity.x ** 2 + velocity.y ** 2 + velocity.z ** 2) - - # def _heading(self): - # return 0 if self._actor is None else self._actor.get_transform().rotation.yaw - - def _velocity(self): - return 0 if self._actor is None else self._carla_vel() - - def _drive(self, steering, throttle): - try: - _reverse = self._in_reverse - control = carla.VehicleControl() - control.reverse = _reverse - control.steer = steering - if _reverse: - control.throttle = abs(throttle) - elif throttle > 0: - control.throttle = throttle - else: - control.brake = abs(throttle) - self._actor.apply_control(control) - except Exception as e: - logger.error("{}".format(e)) - - def _track_autopilot(self, driver_mode): - if driver_mode in ('driver_mode.inference.dnn', 'driver_mode.automatic.backend'): - self._in_reverse = False - _autopilot = driver_mode == 'driver_mode.automatic.backend' - if self._in_carla_autopilot != _autopilot: - self._in_carla_autopilot = _autopilot - self._actor.set_autopilot(_autopilot, self._tm_port) - - def _track_reverse(self, command): - if self._in_reverse: - self._in_reverse = command.get('throttle') <= 0 - else: - self._in_reverse = self._velocity() < 1e-2 and command.get('throttle') < -.99 # and command.get('arrow_down', 0) == 1 - - def _set_weather(self, use_preset=False): - preset = self._spawn_preferred_weather if use_preset else None - if use_preset or (self._rand_weather_seconds > 0 and time.time() > self._change_weather_time): - presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)] - preset = preset if preset in presets else np.random.choice(presets) - logger.info("Setting the weather to '{}'.".format(preset)) - self._world.set_weather(getattr(carla.WeatherParameters, preset)) - self._change_weather_time = time.time() + self._rand_weather_seconds - - def state(self): - ap_active, ap_steering, ap_throttle = False, 0, 0 - if self._actor is not None and self._actor.is_alive and self._in_carla_autopilot: - ap_active = True - ap_steering = self._actor.get_control().steer - ap_throttle = self._actor.get_control().throttle - latitude, longitude, bearing = self._geo_tracker.get() - return dict(latitude_geo=latitude, - longitude_geo=longitude, - heading=bearing, - velocity=self._velocity(), - trust_velocity=1, - auto_active=ap_active, - auto_steering=ap_steering, - auto_throttle=ap_throttle, - time=timestamp()) - - def tick(self, _): - if self._actor is not None and self._actor.is_alive: - with self._actor_lock: - self._set_weather() - - def noop(self): - self._drive(steering=0, throttle=0) - - def drive(self, cmd): - if cmd is not None and self._actor is not None: - self._track_autopilot(cmd.get('driver')) - self._track_reverse(cmd) - if not self._in_carla_autopilot: - self._drive(steering=cmd.get('steering'), throttle=cmd.get('throttle')) diff --git a/vehicles/carla09/video.py b/vehicles/carla09/video.py deleted file mode 100644 index 5ba534ab..00000000 --- a/vehicles/carla09/video.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -import collections -import logging - -import gi - -from byodr.utils import Configurable -from byodr.utils.option import parse_option - -gi.require_version('Gst', '1.0') - -from gi.repository import GObject, Gst - -GObject.threads_init() -Gst.init(None) - -logger = logging.getLogger(__name__) - - -# noinspection PyUnusedLocal -def bus_message(bus, message, loop): - m_type = message.type - """ - Gstreamer Message Types and how to parse - https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MessageType - """ - if m_type == Gst.MessageType.EOS: - logger.info("End of stream") - loop.quit() - elif m_type == Gst.MessageType.ERROR: - err, debug = message.parse_error() - logger.error("{} {}".format(err, debug)) - loop.quit() - elif m_type == Gst.MessageType.WARNING: - err, debug = message.parse_warning() - logger.warning("{} {}".format(err, debug)) - return True - - -def parse_width_height(key, default_value, errors, **kwargs): - return [int(x) for x in parse_option(key, str, default_value, errors=errors, **kwargs).split('x')] - - -class NumpyImageVideoSource(Configurable): - def __init__(self, name='front'): - super(NumpyImageVideoSource, self).__init__() - self._stream_width, self._stream_height = 640, 480 - self._name = name - self._listeners = collections.deque() - self._pipeline = None - self._source = None - self._caps = None - - def get_width(self): - # The lock is used at (re)configuration. - with self._lock: - # This is from the output shape. - return self._stream_width - - def get_height(self): - # The lock is used at (re)configuration. - with self._lock: - # This is from the output shape. - return self._stream_height - - def add_listener(self, listener): - self._listeners.append(listener) - - def remove_listener(self, listener): - self._listeners.remove(listener) - - def _publish(self, sink): - buffer = sink.emit('pull-sample').get_buffer() - array = buffer.extract_dup(0, buffer.get_size()) - for listen in self._listeners: - listen(array) - return Gst.FlowReturn.OK - - def push(self, image): - with self._lock: - if self._source is not None: - buffer = Gst.Buffer.new_wrapped(image.tobytes()) - sample = Gst.Sample.new(buffer, self._caps, None, None) - self._source.emit('push-sample', sample) - - def _close(self): - if self._source is not None: - self._source.emit("end-of-stream") - if self._pipeline is not None: - self._pipeline.set_state(Gst.State.NULL) - self._pipeline = None - self._source = None - self._caps = None - - def internal_quit(self, restarting=False): - # This method runs under lock. - self._close() - - def internal_start(self, **kwargs): - # This method runs under lock. - self._close() - _errors = [] - input_width, input_height = parse_width_height('camera.image.input.shape', '640x480', _errors, **kwargs) - bitrate = parse_option(self._name + '.video.encoding.bitrate', int, 1024, errors=_errors, **kwargs) - self._stream_width, self._stream_height = parse_width_height(self._name + '.video.output.shape', '640x480', _errors, **kwargs) - if len(_errors) == 0: - _args = dict(input_width=input_width, - input_height=input_height, - bitrate=bitrate, - stream_width=self._stream_width, - stream_height=self._stream_height) - command = "appsrc name=source emit-signals=True is-live=True " \ - "caps=video/x-raw,format=BGR,width={input_width},height={input_height} ! " \ - "videoconvert ! queue ! " \ - "videoscale ! video/x-raw,width={stream_width},height={stream_height} ! " \ - "queue ! x264enc bframes=0 bitrate={bitrate} b-adapt=0 tune=zerolatency key-int-max=60 ! " \ - "video/x-h264,profile=baseline,stream-format=\"byte-stream\" ! queue ! " \ - "appsink name=sink emit-signals=true sync=false async=false max-buffers=1 drop=true".format(**_args) - pipeline = Gst.parse_launch(command) - loop = GObject.MainLoop() - bus = pipeline.get_bus() - bus.add_signal_watch() - bus.connect("message", bus_message, loop) - src = pipeline.get_by_name('source') - # src.set_property("format", Gst.Format.TIME) - # src.set_property("block", True) - src.set_property('format', 'time') - src.set_property('do-timestamp', True) - _caps = "video/x-raw,format=BGR,width={input_width},height={input_height},framerate={fps}/1".format( - **dict(input_width=input_width, input_height=input_height, fps=20) - ) - video_sink = pipeline.get_by_name('sink') - video_sink.connect('new-sample', self._publish) - pipeline.set_state(Gst.State.PLAYING) - self._caps = Gst.Caps.from_string(_caps) - self._pipeline = pipeline - self._source = src - logger.info("Setup video stream '{}' with input caps '{}'.".format(self._name, _caps)) - return _errors