diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..5d8170b --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,4 @@ +# Default ignored files +/workspace.xml + +.idea/ \ No newline at end of file diff --git a/lib_python3/origin/__init__.py b/lib_python3/origin/__init__.py new file mode 100644 index 0000000..498e5af --- /dev/null +++ b/lib_python3/origin/__init__.py @@ -0,0 +1,9 @@ +# + +from origin.origin_current_time import current_time +from origin.origin_data_types import data_types +from origin.origin_registration_validation import registration_validation + +TIMESTAMP = "measurement_time" + +from origin.client import * \ No newline at end of file diff --git a/lib_python3/origin/client/__init__.py b/lib_python3/origin/client/__init__.py new file mode 100644 index 0000000..c300cae --- /dev/null +++ b/lib_python3/origin/client/__init__.py @@ -0,0 +1,22 @@ +class float_field: + pass + + +class integer_field: + pass + + +class string_field: + pass + + +class file_field: + pass + + +from origin.client import origin_server_connection +ServerConnection = origin_server_connection.ServerConnection +from origin.client import origin_server +Server = origin_server.Server +from origin.client import origin_subscriber +Subscriber = origin_subscriber.Subscriber \ No newline at end of file diff --git a/lib_python3/origin/client/origin_reader.py b/lib_python3/origin/client/origin_reader.py new file mode 100644 index 0000000..9273957 --- /dev/null +++ b/lib_python3/origin/client/origin_reader.py @@ -0,0 +1,112 @@ +""" +This module provides a client subscription class that holds all the basic API +methods for subscribing to a data stream. +""" + +import json +try: + import receiver +except ModuleNotFoundError: + #TODO when python3 branch is merged to main, we will pip install origin (master) + #from the repo, and this import line should no longer be needed. + from origin.client import receiver + + +class Reader(receiver.Receiver): + """!@brief A class representing a data stream reader to a data server. + This class handles asynchronous read events with a data server. + """ + + def __init__(self, config, logger): + """!@brief Initialize the subscriber + @param config is a ConfigParser object + """ + # call the parent class initialization + super(Reader, self).__init__(config, logger) + # we only need the read socket for this class + self.connect(self.read_sock, self.read_port) + # request the available streams from the server + self.get_available_streams() + + def get_stream_data(self, stream, + start=None, stop=None, fields=[], raw=False): + """!@brief Request raw stream data in time window from sever. + @param stream A string holding the stream name + @param start 32b Unix timestamp that defines the start of the data + window + @param stop 32b Unix timestamp that defines the end of the data window + @param fields A list of fields from the stream that should be returned + @return data A dictionary containing data for each field in + the time window + """ + if not self.is_stream(stream): + raise KeyError + + request = { + 'stream': stream.strip(), + 'start': start, + 'stop': stop, + 'raw': raw, + } + if fields != []: + if self.is_fields(stream, fields): + request['fields'] = fields + else: + self.log.error('There was an issue with the specified fields.') + return {} + self.read_sock.send_string(json.dumps(request)) + try: + msg = self.read_sock.recv() + data = json.loads(msg) + except: + msg = "There was an error communicating with the server" + self.log.exception(msg) + data = (1, {'error': msg, 'stream': {}}) + + if data[0] != 0: + msg = "The server responds to the request with error message: `{}`" + self.log.error(msg.format(data[1]["error"])) + known_streams = data[1]['stream'] + if known_streams != {}: + self.log.info('Updating stream definitions from server.') + self.update_known_streams(known_streams) + return {} + else: + return data[1] + + def get_stream_raw_data(self, stream, start=None, stop=None, fields=[]): + """!@brief Request raw stream data in time window from sever. + @param stream A string holding the stream name + @param start 32b Unix timestamp that defines the start of the data + window + @param stop 32b Unix timestamp that defines the end of the data window + @param fields A list of fields from the stream that should be returned + @return data A dictionary containing raw data for each field in + the time window + """ + return self.get_stream_data( + stream, + start=start, + stop=stop, + fields=fields, + raw=True + ) + + def get_stream_stat_data(self, stream, + start=None, stop=None, fields=[]): + """!@brief Request stream data statistics in time window from sever. + @param stream A string holding the stream name + @param start 32b Unix timestamp that defines the start of the data + window + @param stop 32b Unix timestamp that defines the end of the data window + @param fields A list of fields from the stream that should be returned + @return data A dictionary containing statistical data for each field in + the time window + """ + return self.get_stream_data( + stream, + start=start, + stop=stop, + fields=fields, + raw=False + ) \ No newline at end of file diff --git a/lib_python3/origin/client/origin_server.py b/lib_python3/origin/client/origin_server.py new file mode 100644 index 0000000..dcf3cb1 --- /dev/null +++ b/lib_python3/origin/client/origin_server.py @@ -0,0 +1,146 @@ +from origin.client import ServerConnection + +from origin.client import float_field +from origin.client import integer_field +from origin.client import string_field +from origin import data_types, registration_validation + + +import zmq +import struct +import json + + +def decode(measurement_type: str) -> str: + """ + Decodes a measurement type of it happens to be a specific class + TODO is this necessary? Those classes are empty. + Args: + measurement_type : string describing the data type a measurement should be. If not in + data_types, checks and translates for a 'float_field', 'integer_field' or + 'integer_field' class + + Returns: + The data type description string. If measurement_type is a valid key for data_types, + the argument is returned, other wise the argument is translated (if valid) or a KeyError + is raised + + Raises: + KeyError if measurement_type is not a valid data type + """ + try: + tmp = data_types[measurement_type] + except KeyError: + if measurement_type == float_field: + return "float" + if measurement_type == integer_field: + return "int" + if measurement_type == string_field: + return "string" + raise + else: + return measurement_type + + +def declaration_formatter(stream, records, key_order)-> bytes: + dec_str = [stream] + for key in key_order: + dec_str.append(':'.join([key, records[key]])) + return bytes(','.join(dec_str), encoding="utf8") + + +def format_stream_declaration(stream, records, key_order, encoding_format): + measurements = records.keys() + sent_dict = {} + for m in measurements: + try: + decoded_type = decode(records[m]) + except KeyError as e: + print(f"{records[m]} is not a valid data type. Programming error. Error should be " + f"caught before this") + return None + else: + sent_dict[m] = decoded_type + if (encoding_format is not None) and (encoding_format.lower() == "json"): + if key_order is not None: + msg = ( + "Warning: JSON formatting selected and a key order has been " + "defined. JSON object order is not gaurenteed therefore it is " + "not recommended to use binary data packets." + ) + print(msg) + # make deterministic + return json.dumps((stream, sent_dict), sort_keys=True) + else: + return declaration_formatter(stream, sent_dict, key_order) + + +class Server: + def __init__(self, config): + self.config = config + + def ping(self): + return True + + def register_stream(self, stream, records, key_order=None, data_format=None, timeout=1000): + valid = registration_validation(stream, records, key_order) + + if not valid: + print("invalid stream declaration") + return None + + port = self.config.get('Server', "register_port") + msgport = self.config.get('Server', "measure_port") + if data_format == 'json': + port = self.config.get('Server', "json_register_port") + msgport = self.config.get('Server', "json_measure_port") + + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.setsockopt(zmq.RCVTIMEO, timeout) + socket.setsockopt(zmq.LINGER, 0) + host = self.config.get('Server', "ip") + socket.connect("tcp://%s:%s" % (host, port)) + + if (key_order is None) and (data_format is None): + key_order = records.keys() + registerComm = format_stream_declaration(stream, records, key_order, data_format) + + if registerComm is None: + if data_format is None: + data_format = "comma-separated values" + print("can't format stream into {}".format(data_format)) + return None + + socket.send(registerComm, zmq.NOBLOCK) + try: + confirmation = socket.recv() + except Exception as e: + print(f"Problem registering stream: {stream}\nError = {e}") + print("Server did not respond in time") # TODO how do we know ?? + return None + return_code, msg = bytes(confirmation).split(b',', 1) + print(return_code, msg) + + if int(return_code) != 0: + print(f"Problem registering stream {stream}") + print(msg) + return None + + streamID, version = struct.unpack("!II", msg) + print(f"successfully registered with streamID: {streamID}, version: {version}") + socket.close() # I 'm pretty sure we need this here + + # error checking + socket_data = context.socket(zmq.PUSH) + socket_data.connect("tcp://%s:%s" % (host, msgport)) + return ServerConnection( + self.config, + stream, + streamID, + key_order, + data_format, + records, + context, + socket_data + ) diff --git a/lib_python3/origin/client/origin_server_connection.py b/lib_python3/origin/client/origin_server_connection.py new file mode 100644 index 0000000..aeb34c8 --- /dev/null +++ b/lib_python3/origin/client/origin_server_connection.py @@ -0,0 +1,101 @@ +import json +import sys +import zmq +import struct +import ctypes +import traceback + +from origin import data_types, TIMESTAMP + + +# returns string and size tuple +def make_format_string(config, key_order, records): + try: + ts_type = config.get('Server',"timestamp_type") + except KeyError: + ts_type = "uint" + ts_size = data_types[ts_type]["size"] + fstr = "!I" + data_types[ts_type]["format_char"]# network byte order + + data_length = ts_size + for entry in key_order: + data_length += data_types[records[entry]]["size"] + fstr += data_types[records[entry]]["format_char"] + return fstr, data_length + + +class ServerConnection: + def __init__(self, config, stream, streamID, key_order, data_format, records, context, socket): + self.config = config + self.stream = stream + self.streamID = streamID + self.key_order = key_order + try: + self.data_format = data_format.lower() + except AttributeError: + self.data_format = None + self.records = records + self.context = context + self.socket = socket + + self.socket.setsockopt(zmq.SNDTIMEO,2000) + + if key_order is None: + self.format_string, self.data_size = (None, None) + else: + self.format_string, self.data_size = make_format_string(self.config, key_order, records) + + def send(self,**kwargs): + msg_data = [ self.streamID ] + try: + msg_data.append(kwargs[TIMESTAMP]) + except KeyError: + #print "No timestamp specified, server will timestamp on arrival" + msg_data.append(0) + + if self.data_format is None: + for k in self.key_order: + msg_data.append(kwargs[k]) + try: + self.socket.send(self.format_record(msg_data), zmq.NOBLOCK) + except zmq.Again: + print("Connection to Server Failed") + #self.socket.close() + exit(1) + except Exception as e: + print(f"Uncaught exception: {e}") + print('-'*60) + traceback.print_exc(file=sys.stdout) + print('-'*60) + print("Exiting") + #self.socket.close() + exit(1) + + elif self.data_format == "json": + msg_data[0] = self.stream + msg_map = {} + for k in kwargs.keys(): + if k != TIMESTAMP: + msg_map[k] = kwargs[k] + msg_data.append(msg_map) + try: + self.socket.send(json.dumps(msg_data), zmq.NOBLOCK) + except zmq.Again: + print("Connection to Server Failed") + #self.socket.close() + exit(1) + except Exception as e: + print(f"Uncaught exception: {e}") + print('-' * 60) + traceback.print_exc(file=sys.stdout) + print('-' * 60) + print("Exiting") + # self.socket.close() + exit(1) + + def close(self): + print("closing socket") + self.socket.close() + + def format_record(self, data): + return struct.pack( self.format_string, *data ) diff --git a/lib_python3/origin/client/origin_subscriber.py b/lib_python3/origin/client/origin_subscriber.py new file mode 100644 index 0000000..122a068 --- /dev/null +++ b/lib_python3/origin/client/origin_subscriber.py @@ -0,0 +1,435 @@ +""" +This module provides a client subscription class that holds all the basic API +methods for subscribing to a data stream. +""" + +import zmq +import sys +import json + +import origin.client.receiver as receiver + +import multiprocessing + +import requests +import logging + +def sub_print(stream_id, data, state, log, ctrl): + """!@brief Default stream data callback. Prints data. + + @param stream_id data stream id + @param data data to be printed + @param log logging object + """ + log.info("[{}]: {}".format(stream_id, data)) + return state + + +def poller_loop(sub_addr, queue, level=logging.DEBUG): + log = logging.getLogger('poller') + log.setLevel(level) + ch = logging.StreamHandler() + ch.setLevel(level) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + log.addHandler(ch) + # a hash table (dict) of callbacks to perform when a message is recieved + # the hash is the data stream filter, the value is a list of callbacks + subscriptions = {} + #a dict shows which channel is subscribed and unsubscribed: + sub_list = {} + #subscription is a dict + context = zmq.Context() + sub_sock = context.socket(zmq.SUB) + # listen for one second, before doing housekeeping + sub_sock.setsockopt(zmq.RCVTIMEO, 1000) + sub_sock.connect(sub_addr) + # cmd = { + # 'action': 'SUBSCRIBE', + # 'stream_filter': stream_filter, + # 'callback': callback, + # 'kwargs': kwargs, + # } + while True: + try: + #get command from command queue + cmd = queue.get_nowait() + if cmd['action'] == 'SHUTDOWN': + break + + if cmd['action'] == 'SUBSCRIBE': + msg = 'Subscribing with stream filter: [{}]' + stream_filter = cmd['stream_filter'] + log.info(msg.format(stream_filter)) + + # add the callback to the list of things to do for the stream + if stream_filter not in subscriptions: + subscriptions[stream_filter] = [] + #stream_filter is assigned as a key with an empty list + sub_sock.setsockopt_string(zmq.SUBSCRIBE, stream_filter) + subscriptions[stream_filter].append({ + 'callback': cmd['callback'], + 'kwargs': cmd['kwargs'], + 'state': {}, + 'control': {'alert': True, 'pause': False}, + 'id': cmd['id'] + }) + + # add subscribed channel info to dict + #sub_list = {1:{'kwargs':{kwargs}, 'control':{control}} + sub_list[cmd['id']] = { + 'kwargs': cmd['kwargs'], + 'control': {'alert': True, 'pause': False} + } + log.info("subscriptions: {}".format(subscriptions[stream_filter])) + + if cmd['action'] == 'UPDATE_KW': + msg = 'Updating channel...' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + if cb['id'] == cmd['id']: + cb['kwargs'] = cmd['kwargs'] + #sub_list = {1:{'kwargs':{kwargs}, 'control':{control}} + sub_list[cmd['id']]={ + 'control': cb['control'], + 'kwargs': cmd['kwargs'] + } + log.info("subscriptions: {}".format(subscriptions[stream_filter])) + + + if (cmd['action'] == 'UNSUBSCRIBE' or + cmd['action'] == 'REMOVE_ALL_CBS'): + msg = 'Unsubscribing to stream filter: [{}]' + log.info(msg.format(cmd['stream_filter'])) + sub_sock.setsockopt_string(zmq.UNSUBSCRIBE, stream_filter) + + if cmd['action'] == 'REMOVE_ALL_CBS': + msg = 'Removing all callbacks for stream filter: [{}]' + log.info(msg.format(cmd['stream_filter'])) + del subscriptions[cmd['stream_filter']] + + if cmd['action'] == 'RESET': + msg = 'Resetting channel' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + #cb is a dict with all the info of each channel subscribed + #stream_filter is a list of all the cb dict. + if cb['id'] == cmd['id']: + cb['state']={} + log.info("subscriptions: {}".format(subscriptions[stream_filter])) + + if cmd['action'] == 'RESET_ALL': + msg = 'Resetting all channels' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + #cb is a dict with all the info of each channel subscribed + #stream_filter is a list of all the cb dict. + cb['state']={} + log.info("subscriptions: {}".format(subscriptions[stream_filter])) + + if (cmd['action'] == 'MUTE' or + cmd['action'] == 'UNMUTE'): + msg = 'Muted channel alert' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + if cb['id'] == cmd['id']: + cb['control']['alert'] = (cmd['action'] == 'UNMUTE') + sub_list[cmd['id']]={ + 'control': cb['control'], + 'kwargs': cb['kwargs'] + } + + if (cmd['action'] == 'MUTE_ALL' or + cmd['action'] == 'UNMUTE_ALL'): + msg = 'Muted/Unmuted all channel alert' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + cb['control']['alert'] = (cmd['action'] == 'UNMUTE_ALL') + sub_list[cb['id']]={ + 'control': cb['control'], + 'kwargs': cb['kwargs'] + } + + if (cmd['action'] == 'PAUSE_ALL' or + cmd['action'] == 'RESTART_ALL'): + msg = 'Paused/ Restarted all channels' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + cb['control']['pause'] = (cmd['action'] == 'PAUSE_ALL') + sub_list[cb['id']]={ + 'control': cb['control'], + 'kwargs': cb['kwargs'] + } + + if (cmd['action'] == 'PAUSE' or + cmd['action'] == 'RESTART'): + msg = 'Paused/Restarted this channel' + log.info(msg.format(cmd['stream_filter'])) + for cb in subscriptions[stream_filter]: + if cb['id'] == cmd['id']: + cb['control']['pause'] = (cmd['action'] == 'PAUSE') + sub_list[cmd['id']]={ + 'control': cb['control'], + 'kwargs': cb['kwargs'] + } + + sub_list_json = json.dumps(sub_list) + try: + requests.put('http://127.0.0.1:5000/monitor', json=sub_list_json) + except IOError: + log.error('no monitor port found') + + except multiprocessing.queues.Empty: + pass + except IOError: + log.exception('IOError, probably a broken pipe. Exiting..') + sys.exit(1) + except: + log.exception("error encountered") + + try: + [streamID, content] = sub_sock.recv_multipart() + try: + log.debug("new data") + for cb in subscriptions[streamID.decode('ascii')]: + if cb['control']['pause'] == False: + cb['state'] = cb['callback']( + streamID.decode('ascii'), json.loads(content), + cb['state'], log, cb['control'], **cb['kwargs'] + ) + else: + pass + + except KeyError: + msg = "An unrecognized streamID `{}` was encountered" + log.error(msg.format(streamID)) + log.error(subscriptions) + except zmq.ZMQError as e: + if e.errno != zmq.EAGAIN: + log.exception("zmq error encountered") + except: + log.exception("error encountered") + + log.info('Shutting down poller loop.') + sub_sock.close() + context.term() + + +class Subscriber(receiver.Receiver): + """!@brief A class representing a data stream subscription to a data server + """ + + def __init__(self, config, logger, loop=poller_loop,**kwargs): + """!@brief Initialize the subscriber + + @param config configuration object + @param logger python logging object + @param loop custom poller loop + @param kwargs of loop function + """ + # call the parent class initialization + super(Subscriber, self).__init__(config, logger) + # we need the read socket for this class so we can get stream defs + self.connect(self.read_sock, self.read_port) + # request the available streams from the server + self.get_available_streams() + # set up queue for inter-process communication + self.queue = multiprocessing.Queue() + # start process + sub_addr = "tcp://{}:{}".format(self.ip, self.sub_port) + #setup a process as obj.loop for poller-loop + self.loop = multiprocessing.Process( + target=loop, + args=(sub_addr, self.queue), + kwargs=kwargs + ) + #start loop process every time subscriber class is called + self.loop.start() + self.id_list=[] + self.last_index=0 + + def close(self): + super(Subscriber, self).close() + self.send_command({'action': 'SHUTDOWN'}) + + def subscribe(self, stream, callback=None, **kwargs): + """!@brief Subscribe to a data stream and assign a callback + + You can subscribe to multiple data streams simultaneously using the + same socket. + Just call subscribe again with a new filter. + You can also register multiple callbacks for the same stream, by + calling subscribe again. + + The callback has to be a standalone function, not a class method or it + will throw an error. + + @param stream A string holding the stream name + @param callback A callback function that expects a python dict with + data + @return success True if the data stream subscription was successful, + False otherwise + """ + try: + stream_filter = self.get_stream_filter(stream) + self.id = self.get_id() + except KeyError: + msg = "No stream matching string: `{}` found." + self.log.error(msg.format(stream)) + return False + msg = "Subscribing to stream: {} [{}]" + self.log.info(msg.format(stream, stream_filter)) + + # default is a function that just prints the data as it comes in + if callback is None: + callback = sub_print + + # send subscription info to the poller loop + #kwargs is a dict including all the keyword parameters needed + cmd = { + 'action': 'SUBSCRIBE', + 'stream_filter': stream_filter, + 'callback': callback, + 'kwargs': kwargs, + 'id' : self.get_id(), + } + self.log.info('sending cmd to process: {}'.format(cmd)) + self.send_command(cmd) + + def get_stream_filter(self, stream): + """!@brief Make the appropriate stream filter to subscribe to a stream + + @param stream A string holding the stream name + @return stream_filter A string holding the filter to subscribe to the + resquested data stream + """ + stream_id = str(self.known_streams[stream]['id']) + # ascii to unicode str + stream_id = stream_id.zfill(self.filter_len) + #stream_id = stream_id.encode().decode('ascii') + #print(type(stream_id)) + self.log.info(stream_id) + return stream_id + + def remove_callbacks(self, stream): + """Remove all callbacks associate with the given stream. + + Calling this leaves the callbacks associated with the data stream. + Call remove_callbacks if you want to remove the callbacks. + + @param stream A string holding the stream name + """ + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'REMOVE_ALL_CBS', + 'stream_filter': stream_filter, + }) + + def unsubscribe(self, stream, i): + """Unsubscribe from stream at the publisher. + + Calling this leaves the callbacks associated with the data stream. + Call remove_callbacks if you want to remove the callbacks. + + @param stream A string holding the stream name + """ + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'UNSUBSCRIBE', + 'stream_filter': stream_filter, + }) + + def reset(self, stream, id): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'RESET', + 'stream_filter': stream_filter, + 'id': id + }) + + def reset_all(self, stream): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'RESET_ALL', + 'stream_filter': stream_filter, + }) + + def update(self, stream, id, **kwargs): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'UPDATE_KW', + 'stream_filter': stream_filter, + 'kwargs': kwargs, + 'id': id + }) + + def mute(self, stream, id): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'MUTE', + 'stream_filter': stream_filter, + 'id': id + }) + + def unmute(self, stream, id): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'UNMUTE', + 'stream_filter': stream_filter, + 'id': id, + }) + + def mute_all(self, stream): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'MUTE_ALL', + 'stream_filter': stream_filter, + }) + + def unmute_all(self, stream): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'UNMUTE_ALL', + 'stream_filter': stream_filter, + }) + + def pause(self, stream, id): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'PAUSE', + 'stream_filter': stream_filter, + 'id': id, + }) + + def pause_all(self, stream): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'PAUSE_ALL', + 'stream_filter': stream_filter, + }) + + def restart(self, stream, id): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'RESTART', + 'stream_filter': stream_filter, + 'id': id, + }) + + def restart_all(self, stream): + stream_filter = self.get_stream_filter(stream) + self.send_command({ + 'action': 'RESTART_ALL', + 'stream_filter': stream_filter, + }) + + def send_command(self, cmd): + #function/method to put command into a queue + self.queue.put(cmd) + + def get_id(self): + #function to assign unique id index to each subscribed channel + while self.last_index in self.id_list: + self.last_index+=1 + self.id_list.append(self.last_index) + return self.id_list[-1] diff --git a/lib_python3/origin/client/receiver.py b/lib_python3/origin/client/receiver.py new file mode 100644 index 0000000..b2508ae --- /dev/null +++ b/lib_python3/origin/client/receiver.py @@ -0,0 +1,113 @@ +""" +This module provides a client generic reciever class that is intended to be +extended by reader and subscriber classes +""" + +import zmq +import json + + +class Receiver(object): + """!@brief A class representing a data stream reader to a data server. + This class handles asynchronous read events with a data server. + """ + + def __init__(self, config, logger): + """!@brief Initialize the subscriber + @param config is a ConfigParser object + """ + self.config = config + self.known_streams = {} + self.stream_list = [] + + self.log = logger + self.setup() + + def close(self): + """@!brief Prepare to stop.""" + for sock in self.sockets: + sock.close() + self.context.term() + + def connect(self, socket, port): + """!@brief Open a connection to the data server on the socket. + @param socket The zmq socket object to connect + @param port The port to connect on + """ + try: + socket.connect("tcp://{}:{}".format(self.ip, port)) + except: + self.log.exception("Error connecting to data server") + + def get_available_streams(self): + """!@brief Request the knownStreams object from the server. + @return knownStreams + """ + # Sending an empty JSON object requests an object containing the + # available streams + self.read_sock.send_string('{}') + try: + err, known_streams = json.loads(self.read_sock.recv()) + except: + self.log.exception("Error connecting to data server") + else: + self.update_known_streams(known_streams['streams']) + return self.known_streams + + def is_fields(self, stream, fields): + """!@brief Check that all the requested fields exist in the stream. + @param stream A string holding the stream name + @param fields A list of strings holding the field names + @return True if fields are defined in stream, False otherwise + """ + ok = True + for field in fields: + if field in self.known_streams[stream]: + ok = False + msg = "field: `{}` not listed in known_streams['{}']" + self.log.warning(msg.format(field, stream)) + return ok + + def is_stream(self, stream): + """!@brief Check that the requested stream exists on the server. + @param stream A string holding the stream name + @return True if stream is in known_streams, False otherwise + """ + return stream.strip() in self.stream_list + + def setup(self): + """!@brief extract configuration settings from the config object. + Child classes should define the necessary sockets after this. + """ + self.ip = self.config.get('Server', 'ip') + # save all ports, we dont want to expose the JSON ports + self.read_port = self.config.getint('Server', 'read_port') + self.sub_port = self.config.getint('Server', 'pub_port') + self.alert_port = self.config.getint('Server', 'alert_port') + self.register_port = self.config.getint('Server', 'register_port') + self.measure_port = self.config.getint('Server', 'measure_port') + self.timeout = self.config.getint('Reader', 'timeout') + self.filter_len = self.config.getint('Subscriber', 'filter_len') + # initialize the possible sockets + self.context = zmq.Context() + self.read_sock = self.context.socket(zmq.REQ) + self.sub_sock = self.context.socket(zmq.SUB) + self.alert_sock = self.context.socket(zmq.SUB) + self.reg_sock = self.context.socket(zmq.REQ) + self.meas_sock = self.context.socket(zmq.PUSH) + # make a list of sockets for convience + self.sockets = [ + self.read_sock, + self.sub_sock, + self.alert_sock, + self.reg_sock, + self.meas_sock + ] + + def update_known_streams(self, known_streams): + """@!brief Update the known_streams defintion with new data. + @param known_streams A dictionary containing the streams that exist on + the server. + """ + self.known_streams = known_streams + self.stream_list = self.known_streams.keys() \ No newline at end of file diff --git a/lib_python3/origin/origin_current_time.py b/lib_python3/origin/origin_current_time.py new file mode 100644 index 0000000..3dbbe30 --- /dev/null +++ b/lib_python3/origin/origin_current_time.py @@ -0,0 +1,18 @@ +import calendar +import time +from configparser import ConfigParser + + +def current_time(config: ConfigParser): + """ + Figures out the current time in the format that origin wants + Args: + config : config object + Returns: + time in format desired by origin + """ + + #Unix time (in UTC) + if config.get('Server', "timestamp_type") == "uint64": + return int(time.time()*2**32) + return calendar.timegm(time.gmtime()) diff --git a/lib_python3/origin/origin_data_types.py b/lib_python3/origin/origin_data_types.py new file mode 100644 index 0000000..7a3eb64 --- /dev/null +++ b/lib_python3/origin/origin_data_types.py @@ -0,0 +1,123 @@ +import origin + +# dict of datatypes recognized by the server +# entries keys are the server string and contain: the mysql data type, the format char for the +# python struct library, and if the type is allowed in the binary data format + +# for the python struct ilbrary see: +data_types = {} + +# integer types ==================================================================================== +# 32b - default +data_types["int"] = { + "mysql": "INT", + "numpy": "int32", + "format_char": "i", + "binary_allowed": True, + "size": 4, + "type": int +} +data_types["uint"] = { + "mysql": "INT UNSIGNED", + "numpy": "uint32", + "format_char": "I", + "binary_allowed": True, + "size": 4, + "type": int +} +data_types["int32"] = data_types["int"] +data_types["uint32"] = data_types["uint"] +# 64b +data_types["int64"] = { + "mysql": "BIGINT", + "numpy": "int64", + "format_char": "q", + "binary_allowed": True, + "size": 8, + "type": int +} +data_types["uint64"] = { + "mysql": "BIGINT UNSIGNED", + "numpy": "uint64", + "format_char": "Q", + "binary_allowed": True, + "size": 8, + "type": int +} +# 8b +data_types["int8"] = { + "mysql": "TINYINT", + "numpy": "int8", + "format_char": "b", + "binary_allowed": True, + "size": 1, + "type": int +} +data_types["uint8"] = { + "mysql": "TINYINT UNSIGNED", + "numpy": "uint8", + "format_char": "B", + "binary_allowed": True, + "size": 1 , + "type": int +} +# 16b +data_types["int16"] = { + "mysql": "SMALLINT", + "numpy": "int16", + "format_char": "h", + "binary_allowed": True, + "size": 2, + "type": int +} +data_types["uint16"] = { + "mysql": "SMALLINT UNSIGNED", + "numpy": "uint16", + "format_char": "H", + "binary_allowed": True, + "size": 2, + "type": int +} + +# floating point types ============================================================================= +# 32b +data_types["float"] = { + "mysql": "FLOAT", + "numpy": "float32", + "format_char": "f", + "binary_allowed": True, + "size": 4, + "type": float +} +# 64b +data_types["double"] = { + "mysql": "DOUBLE", + "numpy": "float64", + "format_char": "d", + "binary_allowed": True, + "size": 8, + "type": float +} +data_types["float64"] = data_types["double"] +data_types["float32"] = data_types["float"] + +# boolean types ==================================================================================== +data_types["bool"] = { + "mysql": "BOOL", + "numpy": "bool_", + "format_char": "?", + "binary_allowed": True, + "size": 1, + "type": bool +} + +# other types ====================================================================================== +# max string size is fixed for hdf5 files, if you want to change it change "S10" to "Sx" +data_types["string"] = { + "mysql": "TEXT", + "numpy": "S10", + "format_char": "s", + "binary_allowed": False, + "size": None, + "type": str +} diff --git a/lib_python3/origin/origin_registration_validation.py b/lib_python3/origin/origin_registration_validation.py new file mode 100644 index 0000000..aec6442 --- /dev/null +++ b/lib_python3/origin/origin_registration_validation.py @@ -0,0 +1,72 @@ +""" +Function for making sure the stream registration conforms to the standard +""" + +from origin import data_types +import string +from typing import Dict, List, Tuple + + +def simple_string(string_input: str) -> int: + """ + Check for punctuation in the string, underscore is ok + Args: + string_input: string to be checked + Returns: + 1 if string contains invalid characters + 0 if string is ok for use + """ + invalid_chars = set(string.punctuation.replace("_", "")) + if any(char in invalid_chars for char in string_input): + return 1 + return 0 + + +def registration_validation( + stream_name: str, + template: Dict[str, str], + key_order: List[str] = None +) -> Tuple[bool, str]: + """ + Making sure the registration conforms to the standard + Returns True if the registration is valid + + Checks made: + stream_name is valid (doesn't have punctuation except for '_') + the data types outlined in template are recognized by origin + field names outlined in template are valid (as in stream_name) + all the field names template match the field names in key_order + Args: + stream_name : name of the stream to be registered + template : dict of fields to be registered in the stream, and the data type of each field + key_order : order in which fields will be registered. If none order is arbitrary + + Returns: + valid, error_message: + valid : true if the arguments are valid + error_message : an error message related to the failure mode, '' if stream is valid + """ + fields = template.keys() + for field, data_type in template.items(): + try: + data_types[data_type] + except KeyError: + msg = f"type '{data_type}' not recognized" + return False, msg + if simple_string(field) != 0: + msg = f"Invalid field name: '{field}'" + return False, msg + if simple_string(stream_name) != 0: + msg = f"Invalid stream name: '{stream_name}'" + return False, msg + + if key_order is not None: + if len(key_order) != len(fields): + msg = "Fields and key_order do not have the same length." + return False, msg + for field in template: + if field not in key_order: + msg = f"Field `{field}` in registration is not present in key_order" + return False, msg.format(field) + + return True, ''