From eade1eaa0eb9f2e2c3a89cb3651045fb297566c2 Mon Sep 17 00:00:00 2001 From: Michael D Date: Thu, 17 Jun 2021 14:22:05 -0400 Subject: [PATCH] Retry attempts on communication between parent and child Added in try blocks around get()/put() commands between worker and its parent tab. A timeout value of 30 seconds is added and a TimeoutError exception is captured to retry the message. Reason: When using secure communication zmq messages get lost/dropped/missed between the parent tab and its worker seemingly at random. The most often scenario of this occurring is when sending a new job to a worker. The parent will issue a job to the worker and proceed to wait for an acknowledgement. The worker never receives this message and continues to wait for a job from the parent. This causes the blacs tab to enter an infinite wait state, and the tab will error out with a 'Device has not responded for xx:xx time'. The tab will then sit in this error state counting up for hours, days, or weeks until the tab is restarted. Bringing any experiment to a halt. This seems to occur infrequently, but devices which send requests repeatedly in a short amount of time tend to encounter this error more often (such as the Pulseblaster with its 'check_status' job.) This error is not computer specific, and seems to plague all computers on the network, even those simply communicating over their localhost loopback interface. Adding in these retry blocks seem to get around this issue if turning off secure communication is not an option. So far only one retry is needed to restore communication between the tab and its worker. --- blacs/tab_base_classes.py | 41 ++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/blacs/tab_base_classes.py b/blacs/tab_base_classes.py index 590d4277..bbfe23d9 100644 --- a/blacs/tab_base_classes.py +++ b/blacs/tab_base_classes.py @@ -11,6 +11,7 @@ # # ##################################################################### from zprocess import Process, Interruptor, Interrupted +from zprocess.utils import TimeoutError import time import sys import threading @@ -809,16 +810,28 @@ def mainloop(self): # Send the command to the worker to_worker = workers[worker_process][1] from_worker = workers[worker_process][2] - to_worker.put(worker_arg_list) - self.state = '%s (%s)'%(worker_function,worker_process) - # Confirm that the worker got the message: - logger.debug('Waiting for worker to acknowledge job request') - success, message, results = from_worker.get() + try: + to_worker.put(worker_arg_list, 30) + self.state = '%s (%s)'%(worker_function,worker_process) + # Confirm that the worker got the message: + logger.debug('Waiting for worker to acknowledge job request') + success, message, results = from_worker.get(30) + except TimeoutError: + logger.info('Connection timed out. Trying again.') + try: + to_worker.put(worker_arg_list, 30) + self.state = '%s (%s)'%(worker_function,worker_process) + # Confirm that the worker got the message: + logger.debug('Waiting for worker to acknowledge job request') + success, message, results = from_worker.get(30) + except TimeoutError: + raise TimeoutError('BLACs Device thread timed out talking to worker.') if not success: logger.info('Worker reported failure to start job') raise Exception(message) # Wait for and get the results of the work: logger.debug('Worker reported job started, waiting for completion') + success,message,results = from_worker.get() if not success: logger.info('Worker reported exception during job') @@ -915,7 +928,14 @@ def mainloop(self): message = traceback.format_exc() self.logger.error('Couldn\'t start job:\n %s'%message) # Report to the parent whether method lookup was successful or not: - self.to_parent.put((success,message,None)) + try: + self.to_parent.put((success,message,None), 30) + except TimeoutError: + self.logger.info('Connection timed out. Trying again.') + try: + self.to_parent.put((success,message,None), 30) + except TimeoutError: + raise TimeoutError('Communication timed out in worker.') if success: # Try to do the requested work: self.logger.debug('Starting job %s'%funcname) @@ -942,7 +962,14 @@ def mainloop(self): results = None # Report to the parent whether work was successful or not, # and what the results were: - self.to_parent.put((success,message,results)) + try: + self.to_parent.put((success,message,results), 30) + except TimeoutError: + self.logger.info('Connection timed out. Trying again.') + try: + self.to_parent.put((success,message,results), 30) + except TimeoutError: + raise TimeoutError('Communication timed out in worker.') class PluginTab(object):