From 7ddee57d66caa60622784e9588c528adca6e1eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 2 Sep 2025 13:40:25 +0000 Subject: [PATCH 1/6] Convert "ensure_future" to "create_task" --- src/qasync/__init__.py | 42 +++++++++++++++++++++------------------- tests/test_qeventloop.py | 16 +++++++++------ 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index 0e34c0b..d601af2 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -417,25 +417,24 @@ def run_until_complete(self, future): if self.__is_running: raise RuntimeError("Event loop already running") - self.__log_debug("Running %s until complete", future) - future = asyncio.ensure_future(future, loop=self) + async def wrapper(): + try: + return await future + finally: + self.stop() - def stop(*args): - self.stop() # noqa + self.__log_debug("Running %s until complete", future) + task = self.create_task(wrapper()) - future.add_done_callback(stop) - try: - self.run_forever() - finally: - future.remove_done_callback(stop) + self.run_forever() self.__app.eventDispatcher().processEvents( AllEvents ) # run loop one last time to process all the events - if not future.done(): + if not task.done(): raise RuntimeError("Event loop stopped before Future completed.") self.__log_debug("Future %s finished running", future) - return future.result() + return task.result() def stop(self): """Stop event loop.""" @@ -799,9 +798,15 @@ def asyncClose(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): - f = asyncio.ensure_future(fn(*args, **kwargs)) - while not f.done(): - QApplication.instance().processEvents() + loop = asyncio.get_running_loop() + assert isinstance(loop, QEventLoop) + task = loop.create_task(fn(*args, **kwargs)) + while not task.done(): + QApplication.processEvents(AllEvents) + try: + return task.result() + except asyncio.CancelledError: + pass return wrapper @@ -809,13 +814,11 @@ def wrapper(*args, **kwargs): def asyncSlot(*args, **kwargs): """Make a Qt async slot run on asyncio loop.""" - def _error_handler(task): + async def _error_handler(fn, args, kwargs): try: - task.result() + await fn(*args, **kwargs) except Exception: sys.excepthook(*sys.exc_info()) - except asyncio.CancelledError: - pass def outer_decorator(fn): @Slot(*args, **kwargs) @@ -838,8 +841,7 @@ def wrapper(*args, **kwargs): "asyncSlot was not callable from Signal. Potential signature mismatch." ) else: - task = asyncio.ensure_future(fn(*args, **kwargs)) - task.add_done_callback(_error_handler) + task = asyncio.create_task(_error_handler(fn, args, kwargs)) return task return wrapper diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index e8ba213..ab28631 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -594,8 +594,8 @@ def cb3(): loop._add_reader(c_sock.fileno(), cb1) - _client_task = asyncio.ensure_future(client_coro()) - _server_task = asyncio.ensure_future(server_coro()) + _client_task = loop.create_task(client_coro()) + _server_task = loop.create_task(server_coro()) both_done = asyncio.gather(client_done, server_done) loop.run_until_complete(asyncio.wait_for(both_done, timeout=1.0)) @@ -639,8 +639,8 @@ async def client_coro(): loop._remove_reader(c_sock.fileno()) assert (await loop.sock_recv(c_sock, 3)) == b"foo" - client_done = asyncio.ensure_future(client_coro()) - server_done = asyncio.ensure_future(server_coro()) + client_done = loop.create_task(client_coro()) + server_done = loop.create_task(server_coro()) both_done = asyncio.wait( [server_done, client_done], return_when=asyncio.FIRST_EXCEPTION @@ -757,7 +757,7 @@ def exct_handler(loop, data): handler_called = True loop.set_exception_handler(exct_handler) - asyncio.ensure_future(future_except()) + loop.create_task(future_except()) loop.run_forever() assert coro_run @@ -774,7 +774,11 @@ def exct_handler(loop, data): loop.set_exception_handler(exct_handler) fut1 = asyncio.Future() fut1.set_exception(ExceptionTester()) - asyncio.ensure_future(fut1) + + async def coro(future): + await future + + loop.create_task(coro(fut1)) del fut1 loop.call_later(0.1, loop.stop) loop.run_forever() From fbe32d3380817c5031a1e6057d16a8a75125c681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 2 Sep 2025 17:55:33 +0000 Subject: [PATCH 2/6] Add unit tests for asyncSlot error and asyncClose --- tests/test_qeventloop.py | 78 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index ab28631..1626bd3 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -14,6 +14,7 @@ import threading import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from unittest import mock import pytest @@ -801,6 +802,8 @@ def test_async_slot(loop): no_args_called = asyncio.Event() with_args_called = asyncio.Event() trailing_args_called = asyncio.Event() + error_called = asyncio.Event() + cancel_called = asyncio.Event() async def slot_no_args(): no_args_called.set() @@ -815,6 +818,14 @@ async def slot_trailing_args(flag: bool): async def slot_signature_mismatch(_: bool): ... + async def slot_with_error(): + error_called.set() + raise ValueError("Test") + + async def slot_with_cancel(): + cancel_called.set() + raise asyncio.CancelledError() + async def main(): # passing kwargs to the underlying Slot such as name, arguments, return sig = qasync._make_signaller(qasync.QtCore) @@ -839,6 +850,73 @@ async def main(): ) await asyncio.wait_for(all_done, timeout=1.0) + with mock.patch.object(sys, "excepthook") as excepthook: + sig3 = qasync._make_signaller(qasync.QtCore) + sig3.signal.connect(qasync.asyncSlot()(slot_with_error)) + sig3.signal.emit() + await asyncio.wait_for(error_called.wait(), timeout=1.0) + excepthook.assert_called_once() + assert isinstance(excepthook.call_args[0][1], ValueError) + + with mock.patch.object(sys, "excepthook") as excepthook: + sig4 = qasync._make_signaller(qasync.QtCore) + sig4.signal.connect(qasync.asyncSlot()(slot_with_cancel)) + sig4.signal.emit() + await asyncio.wait_for(cancel_called.wait(), timeout=1.0) + excepthook.assert_not_called() + + loop.run_until_complete(main()) + + +def test_async_close(loop, application): + close_called = asyncio.Event() + close_err_called = asyncio.Event() + close_hang_called = asyncio.Event() + + @qasync.asyncClose + async def close(): + close_called.set() + return 33 + + @qasync.asyncClose + async def close_err(): + close_err_called.set() + raise ValueError("Test") + + @qasync.asyncClose + async def close_hang(): + # do an actual cancel instead of directly raising, for completeness. + current = asyncio.current_task() + assert current is not None + + async def killer(): + await asyncio.sleep(0.001) + current.cancel() + + asyncio.create_task(killer()) + close_hang_called.set() + await asyncio.Event().wait() + assert False, "Should have been cancelled" + + # need to run in async context to have a running event loop + async def main(): + # close() is a synchronous top level call, need + # to wrap it to be able to enter event loop + + # test that a regular close works + assert await qasync.asyncWrap(close) == 33 + assert close_called.is_set() + + # test that an exception in the async close is propagated + with pytest.raises(ValueError) as err: + await qasync.asyncWrap(close_err) + assert err.value.args[0] == "Test" + assert close_err_called.is_set() + + # test that a CancelledError is not propagated + assert await qasync.asyncWrap(close_hang) is None + assert close_hang_called.is_set() + loop.run_until_complete(main()) From 7d8a923a1871f0f05224704bb7a8e4ee969be505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 3 Sep 2025 08:49:17 +0000 Subject: [PATCH 3/6] add test for loop.run* methods being non-re-entrant. --- tests/test_qeventloop.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index 1626bd3..adf2fee 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -1013,6 +1013,24 @@ def test_run_forever_custom_exit_code(loop, application): application.exec_ = orig_exec +def test_loop_non_reentrant(loop): + async def noop(): + pass + + async def task(): + t = loop.create_task(noop()) + with pytest.raises(RuntimeError): + loop.run_forever() + + with pytest.raises(RuntimeError): + loop.run_until_complete(t) + return 43 + + t = loop.create_task(task()) + loop.run_until_complete(t) + assert t.result() == 43 + + def test_qeventloop_in_qthread(): class CoroutineExecutorThread(qasync.QtCore.QThread): def __init__(self, coro): From 3e0b13c01c8f299fc59495be78257e661bd631f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 4 Sep 2025 12:28:07 +0000 Subject: [PATCH 4/6] add test for run_until_complete for futures. --- tests/test_qeventloop.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index adf2fee..813cbe7 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -996,6 +996,14 @@ async def coro(): assert loop.run_until_complete(asyncio.wait_for(coro(), timeout=1)) == 42 +def test_run_until_complete_future(loop): + """Test that run_until_complete accepts futures""" + + fut = asyncio.Future() + loop.call_soon(lambda: fut.set_result(42)) + assert loop.run_until_complete(fut) == 42 + + def test_run_forever_custom_exit_code(loop, application): if hasattr(application, "exec"): orig_exec = application.exec From 7c34999652dff6903ba6dc21defba7da85ff7c8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 4 Sep 2025 12:33:39 +0000 Subject: [PATCH 5/6] undo change to create_task in run_until_complete --- src/qasync/__init__.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index d601af2..08141a2 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -417,24 +417,27 @@ def run_until_complete(self, future): if self.__is_running: raise RuntimeError("Event loop already running") - async def wrapper(): - try: - return await future - finally: - self.stop() - self.__log_debug("Running %s until complete", future) - task = self.create_task(wrapper()) - self.run_forever() + # future may actually be a coroutine. This ensures it is wrapped in a Task. + future = asyncio.ensure_future(future, loop=self) + + def stop(*args): + self.stop() # noqa + + future.add_done_callback(stop) + try: + self.run_forever() + finally: + future.remove_done_callback(stop) self.__app.eventDispatcher().processEvents( AllEvents ) # run loop one last time to process all the events - if not task.done(): + if not future.done(): raise RuntimeError("Event loop stopped before Future completed.") self.__log_debug("Future %s finished running", future) - return task.result() + return future.result() def stop(self): """Stop event loop.""" From 4f763894ed65824e79e55b44a0099819c8f35f7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 4 Sep 2025 12:42:14 +0000 Subject: [PATCH 6/6] Store strong reference to background tasks while they run. This is recommended by python documentation, event loop only stores weak references to running tasks. --- src/qasync/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index 08141a2..92245c7 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -103,6 +103,9 @@ from ._common import with_logger # noqa +# strong references to running background tasks +background_tasks = set() + @with_logger class _QThreadWorker(QtCore.QThread): @@ -845,7 +848,9 @@ def wrapper(*args, **kwargs): ) else: task = asyncio.create_task(_error_handler(fn, args, kwargs)) - return task + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + return return wrapper