diff --git a/lib_python3/origin/client/origin_subscriber.py b/lib_python3/origin/client/origin_subscriber.py index 122a068..ef6e9f1 100644 --- a/lib_python3/origin/client/origin_subscriber.py +++ b/lib_python3/origin/client/origin_subscriber.py @@ -59,15 +59,14 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): if cmd['action'] == 'SUBSCRIBE': msg = 'Subscribing with stream filter: [{}]' - stream_filter = cmd['stream_filter'] - log.info(msg.format(stream_filter)) + log.info(msg.format(cmd['stream_filter'])) # add the callback to the list of things to do for the stream - if stream_filter not in subscriptions: - subscriptions[stream_filter] = [] + if cmd['stream_filter'] not in subscriptions: + subscriptions[cmd['stream_filter']] = [] #stream_filter is assigned as a key with an empty list - sub_sock.setsockopt_string(zmq.SUBSCRIBE, stream_filter) - subscriptions[stream_filter].append({ + sub_sock.setsockopt_string(zmq.SUBSCRIBE, cmd['stream_filter']) + subscriptions[cmd['stream_filter']].append({ 'callback': cmd['callback'], 'kwargs': cmd['kwargs'], 'state': {}, @@ -81,12 +80,12 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): 'kwargs': cmd['kwargs'], 'control': {'alert': True, 'pause': False} } - log.info("subscriptions: {}".format(subscriptions[stream_filter])) + log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']])) if cmd['action'] == 'UPDATE_KW': msg = 'Updating channel...' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: if cb['id'] == cmd['id']: cb['kwargs'] = cmd['kwargs'] #sub_list = {1:{'kwargs':{kwargs}, 'control':{control}} @@ -94,14 +93,14 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): 'control': cb['control'], 'kwargs': cmd['kwargs'] } - log.info("subscriptions: {}".format(subscriptions[stream_filter])) + log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']])) if (cmd['action'] == 'UNSUBSCRIBE' or cmd['action'] == 'REMOVE_ALL_CBS'): msg = 'Unsubscribing to stream filter: [{}]' log.info(msg.format(cmd['stream_filter'])) - sub_sock.setsockopt_string(zmq.UNSUBSCRIBE, stream_filter) + sub_sock.setsockopt_string(zmq.UNSUBSCRIBE, cmd['stream_filter']) if cmd['action'] == 'REMOVE_ALL_CBS': msg = 'Removing all callbacks for stream filter: [{}]' @@ -111,27 +110,27 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): if cmd['action'] == 'RESET': msg = 'Resetting channel' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: #cb is a dict with all the info of each channel subscribed #stream_filter is a list of all the cb dict. if cb['id'] == cmd['id']: cb['state']={} - log.info("subscriptions: {}".format(subscriptions[stream_filter])) + log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']])) if cmd['action'] == 'RESET_ALL': msg = 'Resetting all channels' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: #cb is a dict with all the info of each channel subscribed #stream_filter is a list of all the cb dict. cb['state']={} - log.info("subscriptions: {}".format(subscriptions[stream_filter])) + log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']])) if (cmd['action'] == 'MUTE' or cmd['action'] == 'UNMUTE'): msg = 'Muted channel alert' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: if cb['id'] == cmd['id']: cb['control']['alert'] = (cmd['action'] == 'UNMUTE') sub_list[cmd['id']]={ @@ -143,7 +142,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): cmd['action'] == 'UNMUTE_ALL'): msg = 'Muted/Unmuted all channel alert' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: cb['control']['alert'] = (cmd['action'] == 'UNMUTE_ALL') sub_list[cb['id']]={ 'control': cb['control'], @@ -154,7 +153,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): cmd['action'] == 'RESTART_ALL'): msg = 'Paused/ Restarted all channels' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: cb['control']['pause'] = (cmd['action'] == 'PAUSE_ALL') sub_list[cb['id']]={ 'control': cb['control'], @@ -165,7 +164,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): cmd['action'] == 'RESTART'): msg = 'Paused/Restarted this channel' log.info(msg.format(cmd['stream_filter'])) - for cb in subscriptions[stream_filter]: + for cb in subscriptions[cmd['stream_filter']]: if cb['id'] == cmd['id']: cb['control']['pause'] = (cmd['action'] == 'PAUSE') sub_list[cmd['id']]={ @@ -188,7 +187,10 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG): log.exception("error encountered") try: - [streamID, content] = sub_sock.recv_multipart() + try: + [streamID, content] = sub_sock.recv_multipart() + except KeyboardInterrupt: + continue try: log.debug("new data") for cb in subscriptions[streamID.decode('ascii')]: diff --git a/lib_python3/origin/client/receiver.py b/lib_python3/origin/client/receiver.py index b2508ae..e419ce3 100644 --- a/lib_python3/origin/client/receiver.py +++ b/lib_python3/origin/client/receiver.py @@ -48,6 +48,11 @@ def get_available_streams(self): self.read_sock.send_string('{}') try: err, known_streams = json.loads(self.read_sock.recv()) + except KeyboardInterrupt as e: + self.log.warning("Canceling server conncetion") + for sock in self.sockets: + sock.close() + raise except: self.log.exception("Error connecting to data server") else: