From adce1044ad7bdd8b95099b2643af5c89009cb5cc Mon Sep 17 00:00:00 2001 From: Piotr Goczal Date: Wed, 29 Aug 2018 15:29:31 +0200 Subject: [PATCH 1/3] Added traffic mirroring option --- mlat-client | 24 ++++- mlat/client/coordinator_mirror.py | 146 ++++++++++++++++++++++++++++++ mlat/client/mirror.py | 47 ++++++++++ mlat/client/receiver_mirror.py | 94 +++++++++++++++++++ mlat/client/version.py | 2 +- 5 files changed, 308 insertions(+), 5 deletions(-) create mode 100644 mlat/client/coordinator_mirror.py create mode 100644 mlat/client/mirror.py create mode 100644 mlat/client/receiver_mirror.py diff --git a/mlat-client b/mlat-client index 859529e..97bda73 100755 --- a/mlat-client +++ b/mlat-client @@ -23,8 +23,11 @@ import mlat.client.version from mlat.client.util import log from mlat.client.receiver import ReceiverConnection +from mlat.client.receiver_mirror import ReceiverConnection_mirror +from mlat.client.mirror import MirrorReceiverConnection from mlat.client.jsonclient import JsonServerConnection from mlat.client.coordinator import Coordinator +from mlat.client.coordinator_mirror import Coordinator_mirror from mlat.client import options @@ -65,6 +68,9 @@ location pin from the coverage maps.""", help="host:port of the multilateration server to connect to", type=options.hostport, default=('mlat.mutability.co.uk', 40147)) + server.add_argument('--mirror_server', + help="host:port for mirrored traffic", + type=options.hostport) server.add_argument('--no-udp', dest='udp', help="Don't offer to use UDP transport for sync/mlat messages", @@ -77,8 +83,6 @@ location pin from the coverage maps.""", outputs = options.build_outputs(args) - receiver = ReceiverConnection(host=args.input_connect[0], port=args.input_connect[1], - mode=options.connection_mode(args)) server = JsonServerConnection(host=args.server[0], port=args.server[1], handshake_data={'lat': args.lat, 'lon': args.lon, @@ -92,8 +96,20 @@ location pin from the coverage maps.""", offer_udp=args.udp, return_results=(len(outputs) > 0)) - coordinator = Coordinator(receiver=receiver, server=server, outputs=outputs, freq=options.clock_frequency(args), - allow_anon=args.allow_anon_results, allow_modeac=args.allow_modeac_results) + if args.mirror_server is not None: + + mirror_receiver = MirrorReceiverConnection(host=args.mirror_server[0], port=args.mirror_server[1]) + + receiver = ReceiverConnection_mirror(host=args.input_connect[0], port=args.input_connect[1], + mode=options.connection_mode(args)) + + coordinator = Coordinator_mirror(receiver=receiver, mirror_receiver=mirror_receiver, server=server, outputs=outputs, freq=options.clock_frequency(args), + allow_anon=args.allow_anon_results, allow_modeac=args.allow_modeac_results) + else: + receiver = ReceiverConnection(host=args.input_connect[0], port=args.input_connect[1], + mode=options.connection_mode(args)) + coordinator = Coordinator(receiver=receiver, server=server, outputs=outputs, freq=options.clock_frequency(args), + allow_anon=args.allow_anon_results, allow_modeac=args.allow_modeac_results) server.start() coordinator.run_forever() diff --git a/mlat/client/coordinator_mirror.py b/mlat/client/coordinator_mirror.py new file mode 100644 index 0000000..51f6e5f --- /dev/null +++ b/mlat/client/coordinator_mirror.py @@ -0,0 +1,146 @@ +# -*- mode: python; indent-tabs-mode: nil -*- + +# Part of mlat-client - an ADS-B multilateration client. +# Copyright 2015, Oliver Jowett +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Core of the client: track aircraft and send data to the server as needed - mirror server version. +""" + +import asyncore +import time + +import _modes +import mlat.profile +from mlat.client.util import monotonic_time, log +from mlat.client.stats import global_stats +from mlat.client.coordinator import Coordinator + +class Coordinator_mirror(Coordinator): + + def __init__(self, receiver, mirror_receiver, server, outputs, freq, allow_anon, allow_modeac): + super().__init__(receiver, server, outputs, freq, allow_anon, allow_modeac) + + self.mirror_receiver = mirror_receiver + mirror_receiver.coordinator = self + + # internals + + def run_until(self, termination_condition): + try: + next_heartbeat = monotonic_time() + 0.5 + while not termination_condition(): + # maybe there are no active sockets and + # we're just waiting on a timeout + if asyncore.socket_map: + asyncore.loop(timeout=0.1, count=5) + else: + time.sleep(0.5) + + now = monotonic_time() + if now >= next_heartbeat: + next_heartbeat = now + 0.5 + self.heartbeat(now) + + finally: + self.receiver.disconnect('Client shutting down') + self.mirror_receiver.disconnect('Client mirror shutting down') + self.server.disconnect('Client shutting down') + for o in self.outputs: + o.disconnect('Client shutting down') + + def heartbeat(self, now): + self.receiver.heartbeat(now) + self.mirror_receiver.heartbeat(now) + self.server.heartbeat(now) + for o in self.outputs: + o.heartbeat(now) + + if now >= self.next_profile: + self.next_profile = now + 30.0 + mlat.profile.dump_cpu_profiles() + + if now >= self.next_aircraft_update: + self.next_aircraft_update = now + self.update_interval + self.update_aircraft(now) + + # piggyback reporting on regular updates + # as the reporting uses data produced by the update + if self.next_report and now >= self.next_report: + self.next_report = now + self.report_interval + self.send_aircraft_report() + self.send_rate_report(now) + + if now >= self.next_stats: + self.next_stats = now + self.stats_interval + self.periodic_stats(now) + + def periodic_stats(self, now): + log('Receiver status: {0}', self.receiver.state) + log('Mirror receiver status: {0}', self.mirror_receiver.state) + log('Server status: {0}', self.server.state) + global_stats.log_and_reset() + + adsb_req = adsb_total = modes_req = modes_total = 0 + now = monotonic_time() + for ac in self.aircraft.values(): + if ac.messages < 2: + continue + + if now - ac.last_position_time < self.position_expiry_age: + adsb_total += 1 + if ac.requested: + adsb_req += 1 + else: + modes_total += 1 + if ac.requested: + modes_req += 1 + + log('Aircraft: {modes_req} of {modes_total} Mode S, {adsb_req} of {adsb_total} ADS-B used', + modes_req=modes_req, + modes_total=modes_total, + adsb_req=adsb_req, + adsb_total=adsb_total) + + if self.recent_jumps > 0: + log('Out-of-order timestamps: {recent}', recent=self.recent_jumps) + self.recent_jumps = 0 + + # callbacks from server connection + + def server_connected(self): + self.requested_traffic = set() + self.requested_modeac = set() + self.newly_seen = set() + self.aircraft = {} + self.reported = set() + self.next_report = monotonic_time() + self.report_interval + if self.receiver.state != 'ready': + self.receiver.reconnect() + if self.mirror_receiver.state != 'ready': + self.mirror_receiver.reconnect() + + def server_disconnected(self): + self.receiver.disconnect('Lost connection to multilateration server, no need for input data') + self.mirror_receiver.disconnect('Lost connection to multilateration server, no need for input data') + self.next_report = None + self.next_rate_report = None + self.next_expiry = None + + @mlat.profile.trackcpu + + def copy_received_messages(self, messages): + self.mirror_receiver.send_to_mirror(messages) diff --git a/mlat/client/mirror.py b/mlat/client/mirror.py new file mode 100644 index 0000000..d6ae049 --- /dev/null +++ b/mlat/client/mirror.py @@ -0,0 +1,47 @@ +# -*- mode: python; indent-tabs-mode: nil -*- + +# Part of mlat-client - an ADS-B multilateration client. +# Copyright 2015, Oliver Jowett +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Copy received Mode S messages to mirror server. +""" + +import socket +import errno + +import _modes +import mlat.profile +from mlat.client.stats import global_stats +from mlat.client.net import ReconnectingConnection +from mlat.client.util import log, monotonic_time + +class MirrorReceiverConnection(ReconnectingConnection): + reconnect_interval = 15.0 + + def __init__(self, host, port): + if host != 'null': + ReconnectingConnection.__init__(self, host, port) + self.reset_connection() + + def start_connection(self): + log('Mirror connected to {0}:{1}', self.host, self.port) + self.state = 'connected' + + @mlat.profile.trackcpu + + def send_to_mirror(self,messages): + self.send(messages) diff --git a/mlat/client/receiver_mirror.py b/mlat/client/receiver_mirror.py new file mode 100644 index 0000000..f9042fa --- /dev/null +++ b/mlat/client/receiver_mirror.py @@ -0,0 +1,94 @@ +# -*- mode: python; indent-tabs-mode: nil -*- + +# Part of mlat-client - an ADS-B multilateration client. +# Copyright 2015, Oliver Jowett +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Handles receiving Mode S messages from receivers using various formats - mirror server version. +""" + +import socket +import errno + +import _modes +import mlat.profile +from mlat.client.stats import global_stats +from mlat.client.net import ReconnectingConnection +from mlat.client.util import log, monotonic_time +from mlat.client.receiver import ReceiverConnection + +class ReceiverConnection_mirror(ReceiverConnection): + + def __init__(self, host, port, mode): + super().__init__(host, port, mode) + self.mirror_receiver = None + + @mlat.profile.trackcpu + def handle_read(self): + try: + moredata = self.recv(16384) + except socket.error as e: + if e.errno == errno.EAGAIN: + return + raise + + if not moredata: + self.close() + return + + global_stats.receiver_rx_bytes += len(moredata) + + self.coordinator.copy_received_messages(moredata) + + if self.residual: + moredata = self.residual + moredata + + self.last_data_received = monotonic_time() + + try: + consumed, messages, pending_error = self.feed(moredata) + except ValueError as e: + log("Parsing receiver data failed: {e}", e=str(e)) + self.reconnect_interval = 5.0 + self.close() + return + + if consumed < len(moredata): + self.residual = moredata[consumed:] + if len(self.residual) > 5120: + raise RuntimeError('parser broken - buffer not being consumed') + else: + self.residual = None + + global_stats.receiver_rx_messages += self.reader.received_messages + global_stats.receiver_rx_filtered += self.reader.suppressed_messages + self.reader.received_messages = self.reader.suppressed_messages = 0 + + if messages: + self.coordinator.input_received_messages(messages) + + if pending_error: + # call it again to get the exception + # now that we've handled all the messages + try: + if self.residual is None: + self.feed(b'') + else: + self.feed(self.residual) + except ValueError as e: + log("Parsing receiver data failed: {e}", e=str(e)) + self.close() + return \ No newline at end of file diff --git a/mlat/client/version.py b/mlat/client/version.py index 84c98da..50fdcdf 100644 --- a/mlat/client/version.py +++ b/mlat/client/version.py @@ -18,4 +18,4 @@ """Just a version constant!""" -CLIENT_VERSION = "0.2.10" +CLIENT_VERSION = "0.2.11" From 95149a847665ec0670bbdc93233cccf8cefb09e2 Mon Sep 17 00:00:00 2001 From: Piotr Goczal Date: Fri, 31 Aug 2018 13:28:20 +0200 Subject: [PATCH 2/3] Some code optimalization and cleanup --- mlat/client/coordinator_mirror.py | 93 +++---------------------------- mlat/client/mirror.py | 10 ++-- mlat/client/receiver_mirror.py | 2 +- 3 files changed, 16 insertions(+), 89 deletions(-) diff --git a/mlat/client/coordinator_mirror.py b/mlat/client/coordinator_mirror.py index 51f6e5f..d73b37c 100644 --- a/mlat/client/coordinator_mirror.py +++ b/mlat/client/coordinator_mirror.py @@ -29,7 +29,7 @@ from mlat.client.stats import global_stats from mlat.client.coordinator import Coordinator -class Coordinator_mirror(Coordinator): +class CoordinatorMirror(Coordinator): def __init__(self, receiver, mirror_receiver, server, outputs, freq, allow_anon, allow_modeac): super().__init__(receiver, server, outputs, freq, allow_anon, allow_modeac) @@ -41,106 +41,31 @@ def __init__(self, receiver, mirror_receiver, server, outputs, freq, allow_anon, def run_until(self, termination_condition): try: - next_heartbeat = monotonic_time() + 0.5 - while not termination_condition(): - # maybe there are no active sockets and - # we're just waiting on a timeout - if asyncore.socket_map: - asyncore.loop(timeout=0.1, count=5) - else: - time.sleep(0.5) - - now = monotonic_time() - if now >= next_heartbeat: - next_heartbeat = now + 0.5 - self.heartbeat(now) - + super().run_until(termination_condition) finally: - self.receiver.disconnect('Client shutting down') - self.mirror_receiver.disconnect('Client mirror shutting down') - self.server.disconnect('Client shutting down') - for o in self.outputs: - o.disconnect('Client shutting down') + self.mirror_receiver.disconnect('Client mirror shutting down') def heartbeat(self, now): - self.receiver.heartbeat(now) self.mirror_receiver.heartbeat(now) - self.server.heartbeat(now) - for o in self.outputs: - o.heartbeat(now) - - if now >= self.next_profile: - self.next_profile = now + 30.0 - mlat.profile.dump_cpu_profiles() - - if now >= self.next_aircraft_update: - self.next_aircraft_update = now + self.update_interval - self.update_aircraft(now) - - # piggyback reporting on regular updates - # as the reporting uses data produced by the update - if self.next_report and now >= self.next_report: - self.next_report = now + self.report_interval - self.send_aircraft_report() - self.send_rate_report(now) - - if now >= self.next_stats: - self.next_stats = now + self.stats_interval - self.periodic_stats(now) + super().heartbeat(now) def periodic_stats(self, now): - log('Receiver status: {0}', self.receiver.state) + super().periodic_stats(now) log('Mirror receiver status: {0}', self.mirror_receiver.state) - log('Server status: {0}', self.server.state) - global_stats.log_and_reset() - - adsb_req = adsb_total = modes_req = modes_total = 0 - now = monotonic_time() - for ac in self.aircraft.values(): - if ac.messages < 2: - continue - - if now - ac.last_position_time < self.position_expiry_age: - adsb_total += 1 - if ac.requested: - adsb_req += 1 - else: - modes_total += 1 - if ac.requested: - modes_req += 1 - - log('Aircraft: {modes_req} of {modes_total} Mode S, {adsb_req} of {adsb_total} ADS-B used', - modes_req=modes_req, - modes_total=modes_total, - adsb_req=adsb_req, - adsb_total=adsb_total) - - if self.recent_jumps > 0: - log('Out-of-order timestamps: {recent}', recent=self.recent_jumps) - self.recent_jumps = 0 # callbacks from server connection def server_connected(self): - self.requested_traffic = set() - self.requested_modeac = set() - self.newly_seen = set() - self.aircraft = {} - self.reported = set() - self.next_report = monotonic_time() + self.report_interval - if self.receiver.state != 'ready': - self.receiver.reconnect() + super().server_connected() if self.mirror_receiver.state != 'ready': self.mirror_receiver.reconnect() def server_disconnected(self): - self.receiver.disconnect('Lost connection to multilateration server, no need for input data') + super().server_disconnected() self.mirror_receiver.disconnect('Lost connection to multilateration server, no need for input data') - self.next_report = None - self.next_rate_report = None - self.next_expiry = None @mlat.profile.trackcpu def copy_received_messages(self, messages): - self.mirror_receiver.send_to_mirror(messages) +# self.mirror_receiver.send_to_mirror(messages) + self.mirror_receiver.send(messages) diff --git a/mlat/client/mirror.py b/mlat/client/mirror.py index d6ae049..576140b 100644 --- a/mlat/client/mirror.py +++ b/mlat/client/mirror.py @@ -33,15 +33,17 @@ class MirrorReceiverConnection(ReconnectingConnection): reconnect_interval = 15.0 def __init__(self, host, port): - if host != 'null': - ReconnectingConnection.__init__(self, host, port) - self.reset_connection() + ReconnectingConnection.__init__(self, host, port) + self.reset_connection() + + @mlat.profile.trackcpu def start_connection(self): log('Mirror connected to {0}:{1}', self.host, self.port) self.state = 'connected' - @mlat.profile.trackcpu +""" def send_to_mirror(self,messages): self.send(messages) +""" diff --git a/mlat/client/receiver_mirror.py b/mlat/client/receiver_mirror.py index f9042fa..69c82ac 100644 --- a/mlat/client/receiver_mirror.py +++ b/mlat/client/receiver_mirror.py @@ -30,7 +30,7 @@ from mlat.client.util import log, monotonic_time from mlat.client.receiver import ReceiverConnection -class ReceiverConnection_mirror(ReceiverConnection): +class ReceiverConnectionMirror(ReceiverConnection): def __init__(self, host, port, mode): super().__init__(host, port, mode) From 1bc1a7835586786fe8d2716e6e869ddca413d5c5 Mon Sep 17 00:00:00 2001 From: Piotr Goczal Date: Fri, 31 Aug 2018 13:30:10 +0200 Subject: [PATCH 3/3] Class name chenge --- mlat-client | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mlat-client b/mlat-client index 97bda73..5ce1700 100755 --- a/mlat-client +++ b/mlat-client @@ -23,11 +23,11 @@ import mlat.client.version from mlat.client.util import log from mlat.client.receiver import ReceiverConnection -from mlat.client.receiver_mirror import ReceiverConnection_mirror +from mlat.client.receiver_mirror import ReceiverConnectionMirror from mlat.client.mirror import MirrorReceiverConnection from mlat.client.jsonclient import JsonServerConnection from mlat.client.coordinator import Coordinator -from mlat.client.coordinator_mirror import Coordinator_mirror +from mlat.client.coordinator_mirror import CoordinatorMirror from mlat.client import options @@ -100,10 +100,10 @@ location pin from the coverage maps.""", mirror_receiver = MirrorReceiverConnection(host=args.mirror_server[0], port=args.mirror_server[1]) - receiver = ReceiverConnection_mirror(host=args.input_connect[0], port=args.input_connect[1], + receiver = ReceiverConnectionMirror(host=args.input_connect[0], port=args.input_connect[1], mode=options.connection_mode(args)) - coordinator = Coordinator_mirror(receiver=receiver, mirror_receiver=mirror_receiver, server=server, outputs=outputs, freq=options.clock_frequency(args), + coordinator = CoordinatorMirror(receiver=receiver, mirror_receiver=mirror_receiver, server=server, outputs=outputs, freq=options.clock_frequency(args), allow_anon=args.allow_anon_results, allow_modeac=args.allow_modeac_results) else: receiver = ReceiverConnection(host=args.input_connect[0], port=args.input_connect[1],