diff --git a/stan/aio/client.py b/stan/aio/client.py index 860c5bc..e2865be 100644 --- a/stan/aio/client.py +++ b/stan/aio/client.py @@ -562,13 +562,17 @@ async def _close(self): except: continue self._sub_map = {} - + async def _close_due_to_ping(self, err): - await self._close() - if self._conn_lost_cb is not None: - await self._conn_lost_cb(err) - self._conn_lost_cb = None + + async def _shield_close_due_to_ping(): + await self._close() + if self._conn_lost_cb is not None: + await self._conn_lost_cb(err) + self._conn_lost_cb = None + await asyncio.shield(_shield_close_due_to_ping(), loop=self._loop) + async def close(self): """ Close terminates a session with NATS Streaming. diff --git a/tests/client_test.py b/tests/client_test.py index 78e31f1..d2de3ea 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -560,7 +560,7 @@ async def cb(msg): self.assertEqual(len(msgs), 10) await sc.close() - + @async_test async def test_ping_responses_trigger_conn_lost_cb(self): nc = NATS() @@ -576,6 +576,10 @@ async def _process_heartbeats(self, msg): received_error_str = "" future = asyncio.Future(loop=self.loop) async def conn_lost_cb(err): + # Added an await on something here, to illustrate that + # cancelling the ping-task, used to also cancel this callback. + await asyncio.sleep(0.1, loop=self.loop) + nonlocal received_error_str received_error_str = str(err) future.set_result(True) @@ -619,6 +623,10 @@ async def _process_ping_response(self, msg): received_error_str = "" future = asyncio.Future(loop=self.loop) async def conn_lost_cb(err): + # Added an await on something here, to illustrate that + # cancelling the ping-task, used to also cancel this callback. + await asyncio.sleep(0.1, loop=self.loop) + nonlocal received_error_str received_error_str = str(err) future.set_result(True)