Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions lib_python3/origin/client/origin_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):

if cmd['action'] == 'SUBSCRIBE':
msg = 'Subscribing with stream filter: [{}]'
stream_filter = cmd['stream_filter']
log.info(msg.format(stream_filter))
log.info(msg.format(cmd['stream_filter']))

# add the callback to the list of things to do for the stream
if stream_filter not in subscriptions:
subscriptions[stream_filter] = []
if cmd['stream_filter'] not in subscriptions:
subscriptions[cmd['stream_filter']] = []
#stream_filter is assigned as a key with an empty list
sub_sock.setsockopt_string(zmq.SUBSCRIBE, stream_filter)
subscriptions[stream_filter].append({
sub_sock.setsockopt_string(zmq.SUBSCRIBE, cmd['stream_filter'])
subscriptions[cmd['stream_filter']].append({
'callback': cmd['callback'],
'kwargs': cmd['kwargs'],
'state': {},
Expand All @@ -81,27 +80,27 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
'kwargs': cmd['kwargs'],
'control': {'alert': True, 'pause': False}
}
log.info("subscriptions: {}".format(subscriptions[stream_filter]))
log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']]))

if cmd['action'] == 'UPDATE_KW':
msg = 'Updating channel...'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
if cb['id'] == cmd['id']:
cb['kwargs'] = cmd['kwargs']
#sub_list = {1:{'kwargs':{kwargs}, 'control':{control}}
sub_list[cmd['id']]={
'control': cb['control'],
'kwargs': cmd['kwargs']
}
log.info("subscriptions: {}".format(subscriptions[stream_filter]))
log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']]))


if (cmd['action'] == 'UNSUBSCRIBE' or
cmd['action'] == 'REMOVE_ALL_CBS'):
msg = 'Unsubscribing to stream filter: [{}]'
log.info(msg.format(cmd['stream_filter']))
sub_sock.setsockopt_string(zmq.UNSUBSCRIBE, stream_filter)
sub_sock.setsockopt_string(zmq.UNSUBSCRIBE, cmd['stream_filter'])

if cmd['action'] == 'REMOVE_ALL_CBS':
msg = 'Removing all callbacks for stream filter: [{}]'
Expand All @@ -111,27 +110,27 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
if cmd['action'] == 'RESET':
msg = 'Resetting channel'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
#cb is a dict with all the info of each channel subscribed
#stream_filter is a list of all the cb dict.
if cb['id'] == cmd['id']:
cb['state']={}
log.info("subscriptions: {}".format(subscriptions[stream_filter]))
log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']]))

if cmd['action'] == 'RESET_ALL':
msg = 'Resetting all channels'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
#cb is a dict with all the info of each channel subscribed
#stream_filter is a list of all the cb dict.
cb['state']={}
log.info("subscriptions: {}".format(subscriptions[stream_filter]))
log.info("subscriptions: {}".format(subscriptions[cmd['stream_filter']]))

if (cmd['action'] == 'MUTE' or
cmd['action'] == 'UNMUTE'):
msg = 'Muted channel alert'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
if cb['id'] == cmd['id']:
cb['control']['alert'] = (cmd['action'] == 'UNMUTE')
sub_list[cmd['id']]={
Expand All @@ -143,7 +142,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
cmd['action'] == 'UNMUTE_ALL'):
msg = 'Muted/Unmuted all channel alert'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
cb['control']['alert'] = (cmd['action'] == 'UNMUTE_ALL')
sub_list[cb['id']]={
'control': cb['control'],
Expand All @@ -154,7 +153,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
cmd['action'] == 'RESTART_ALL'):
msg = 'Paused/ Restarted all channels'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
cb['control']['pause'] = (cmd['action'] == 'PAUSE_ALL')
sub_list[cb['id']]={
'control': cb['control'],
Expand All @@ -165,7 +164,7 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
cmd['action'] == 'RESTART'):
msg = 'Paused/Restarted this channel'
log.info(msg.format(cmd['stream_filter']))
for cb in subscriptions[stream_filter]:
for cb in subscriptions[cmd['stream_filter']]:
if cb['id'] == cmd['id']:
cb['control']['pause'] = (cmd['action'] == 'PAUSE')
sub_list[cmd['id']]={
Expand All @@ -188,7 +187,10 @@ def poller_loop(sub_addr, queue, level=logging.DEBUG):
log.exception("error encountered")

try:
[streamID, content] = sub_sock.recv_multipart()
try:
[streamID, content] = sub_sock.recv_multipart()
except KeyboardInterrupt:
continue
try:
log.debug("new data")
for cb in subscriptions[streamID.decode('ascii')]:
Expand Down
5 changes: 5 additions & 0 deletions lib_python3/origin/client/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def get_available_streams(self):
self.read_sock.send_string('{}')
try:
err, known_streams = json.loads(self.read_sock.recv())
except KeyboardInterrupt as e:
self.log.warning("Canceling server conncetion")
for sock in self.sockets:
sock.close()
raise
except:
self.log.exception("Error connecting to data server")
else:
Expand Down