diff --git a/amqpy/connection.py b/amqpy/connection.py index 809bf63..b4ce7c3 100644 --- a/amqpy/connection.py +++ b/amqpy/connection.py @@ -1,5 +1,6 @@ """AMQP Connections """ +import datetime import logging import socket from array import array @@ -332,17 +333,29 @@ def close(self, reply_code=0, reply_text='', method_type=method_t(0, 0)): return self.wait_any([spec.Connection.Close, spec.Connection.CloseOk]) def _heartbeat_run(self): - # `is_alive()` sends heartbeats if the connection is alive - while self.is_alive(): - # `close` is set to true if the `close_event` is signalled - close = self._close_event.wait(self._heartbeat_final / 1.5) - if close: + maxdiff = datetime.timedelta(seconds=1.5*self._heartbeat_final) + while True: + closed = self._close_event.wait(self._heartbeat_final / 1.5) + transport = self.transport + if transport is None or closed: + break + try: + transport.send_heartbeat() + except socket.error as err: + log.error("Failed sending heartbeat to server: %s ; closing " + "connection..", err) break + diff = datetime.datetime.now() - transport.last_heartbeat_received + if diff > maxdiff: + log.warning("Heartbeat timeout: diff=%s ; closing " + "connection..", diff) + break + if not closed and transport is not None: + self._close() def _close(self): try: self.transport.close() - channels = [x for x in self.channels.values() if x is not self] for ch in channels: # noinspection PyProtectedMember diff --git a/amqpy/transport.py b/amqpy/transport.py index aeb2353..a1d81db 100644 --- a/amqpy/transport.py +++ b/amqpy/transport.py @@ -41,7 +41,7 @@ def __init__(self, host, port, connect_timeout, buf_size): #: :type: datetime.datetime self.last_heartbeat_sent = None #: :type: datetime.datetime - self.last_heartbeat_received = None + self.last_heartbeat_received = datetime.datetime.now() self.last_heartbeat_sent_monotonic = 0.0