Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions lib_python3/origin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#

from origin.origin_current_time import current_time
from origin.origin_data_types import data_types
from origin.origin_registration_validation import registration_validation

TIMESTAMP = "measurement_time"

from origin.client import *
22 changes: 22 additions & 0 deletions lib_python3/origin/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class float_field:
pass


class integer_field:
pass


class string_field:
pass


class file_field:
pass


from origin.client import origin_server_connection
ServerConnection = origin_server_connection.ServerConnection
from origin.client import origin_server
Server = origin_server.Server
from origin.client import origin_subscriber
Subscriber = origin_subscriber.Subscriber
112 changes: 112 additions & 0 deletions lib_python3/origin/client/origin_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
This module provides a client subscription class that holds all the basic API
methods for subscribing to a data stream.
"""

import json
try:
import receiver
except ModuleNotFoundError:
#TODO when python3 branch is merged to main, we will pip install origin (master)
#from the repo, and this import line should no longer be needed.
from origin.client import receiver


class Reader(receiver.Receiver):
"""!@brief A class representing a data stream reader to a data server.
This class handles asynchronous read events with a data server.
"""

def __init__(self, config, logger):
"""!@brief Initialize the subscriber
@param config is a ConfigParser object
"""
# call the parent class initialization
super(Reader, self).__init__(config, logger)
# we only need the read socket for this class
self.connect(self.read_sock, self.read_port)
# request the available streams from the server
self.get_available_streams()

def get_stream_data(self, stream,
start=None, stop=None, fields=[], raw=False):
"""!@brief Request raw stream data in time window from sever.
@param stream A string holding the stream name
@param start 32b Unix timestamp that defines the start of the data
window
@param stop 32b Unix timestamp that defines the end of the data window
@param fields A list of fields from the stream that should be returned
@return data A dictionary containing data for each field in
the time window
"""
if not self.is_stream(stream):
raise KeyError

request = {
'stream': stream.strip(),
'start': start,
'stop': stop,
'raw': raw,
}
if fields != []:
if self.is_fields(stream, fields):
request['fields'] = fields
else:
self.log.error('There was an issue with the specified fields.')
return {}
self.read_sock.send_string(json.dumps(request))
try:
msg = self.read_sock.recv()
data = json.loads(msg)
except:
msg = "There was an error communicating with the server"
self.log.exception(msg)
data = (1, {'error': msg, 'stream': {}})

if data[0] != 0:
msg = "The server responds to the request with error message: `{}`"
self.log.error(msg.format(data[1]["error"]))
known_streams = data[1]['stream']
if known_streams != {}:
self.log.info('Updating stream definitions from server.')
self.update_known_streams(known_streams)
return {}
else:
return data[1]

def get_stream_raw_data(self, stream, start=None, stop=None, fields=[]):
"""!@brief Request raw stream data in time window from sever.
@param stream A string holding the stream name
@param start 32b Unix timestamp that defines the start of the data
window
@param stop 32b Unix timestamp that defines the end of the data window
@param fields A list of fields from the stream that should be returned
@return data A dictionary containing raw data for each field in
the time window
"""
return self.get_stream_data(
stream,
start=start,
stop=stop,
fields=fields,
raw=True
)

def get_stream_stat_data(self, stream,
start=None, stop=None, fields=[]):
"""!@brief Request stream data statistics in time window from sever.
@param stream A string holding the stream name
@param start 32b Unix timestamp that defines the start of the data
window
@param stop 32b Unix timestamp that defines the end of the data window
@param fields A list of fields from the stream that should be returned
@return data A dictionary containing statistical data for each field in
the time window
"""
return self.get_stream_data(
stream,
start=start,
stop=stop,
fields=fields,
raw=False
)
146 changes: 146 additions & 0 deletions lib_python3/origin/client/origin_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from origin.client import ServerConnection

from origin.client import float_field
from origin.client import integer_field
from origin.client import string_field
from origin import data_types, registration_validation


import zmq
import struct
import json


def decode(measurement_type: str) -> str:
"""
Decodes a measurement type of it happens to be a specific class
TODO is this necessary? Those classes are empty.
Args:
measurement_type : string describing the data type a measurement should be. If not in
data_types, checks and translates for a 'float_field', 'integer_field' or
'integer_field' class

Returns:
The data type description string. If measurement_type is a valid key for data_types,
the argument is returned, other wise the argument is translated (if valid) or a KeyError
is raised

Raises:
KeyError if measurement_type is not a valid data type
"""
try:
tmp = data_types[measurement_type]
except KeyError:
if measurement_type == float_field:
return "float"
if measurement_type == integer_field:
return "int"
if measurement_type == string_field:
return "string"
raise
else:
return measurement_type


def declaration_formatter(stream, records, key_order)-> bytes:
dec_str = [stream]
for key in key_order:
dec_str.append(':'.join([key, records[key]]))
return bytes(','.join(dec_str), encoding="utf8")


def format_stream_declaration(stream, records, key_order, encoding_format):
measurements = records.keys()
sent_dict = {}
for m in measurements:
try:
decoded_type = decode(records[m])
except KeyError as e:
print(f"{records[m]} is not a valid data type. Programming error. Error should be "
f"caught before this")
return None
else:
sent_dict[m] = decoded_type
if (encoding_format is not None) and (encoding_format.lower() == "json"):
if key_order is not None:
msg = (
"Warning: JSON formatting selected and a key order has been "
"defined. JSON object order is not gaurenteed therefore it is "
"not recommended to use binary data packets."
)
print(msg)
# make deterministic
return json.dumps((stream, sent_dict), sort_keys=True)
else:
return declaration_formatter(stream, sent_dict, key_order)


class Server:
def __init__(self, config):
self.config = config

def ping(self):
return True

def register_stream(self, stream, records, key_order=None, data_format=None, timeout=1000):
valid = registration_validation(stream, records, key_order)

if not valid:
print("invalid stream declaration")
return None

port = self.config.get('Server', "register_port")
msgport = self.config.get('Server', "measure_port")
if data_format == 'json':
port = self.config.get('Server', "json_register_port")
msgport = self.config.get('Server', "json_measure_port")

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.setsockopt(zmq.LINGER, 0)
host = self.config.get('Server', "ip")
socket.connect("tcp://%s:%s" % (host, port))

if (key_order is None) and (data_format is None):
key_order = records.keys()
registerComm = format_stream_declaration(stream, records, key_order, data_format)

if registerComm is None:
if data_format is None:
data_format = "comma-separated values"
print("can't format stream into {}".format(data_format))
return None

socket.send(registerComm, zmq.NOBLOCK)
try:
confirmation = socket.recv()
except Exception as e:
print(f"Problem registering stream: {stream}\nError = {e}")
print("Server did not respond in time") # TODO how do we know ??
return None
return_code, msg = bytes(confirmation).split(b',', 1)
print(return_code, msg)

if int(return_code) != 0:
print(f"Problem registering stream {stream}")
print(msg)
return None

streamID, version = struct.unpack("!II", msg)
print(f"successfully registered with streamID: {streamID}, version: {version}")
socket.close() # I 'm pretty sure we need this here

# error checking
socket_data = context.socket(zmq.PUSH)
socket_data.connect("tcp://%s:%s" % (host, msgport))
return ServerConnection(
self.config,
stream,
streamID,
key_order,
data_format,
records,
context,
socket_data
)
Loading