diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index fac08c8..c9908ea 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -103,6 +103,9 @@ from ._common import with_logger # noqa +# strong references to running background tasks +background_tasks = set() + @with_logger class _QThreadWorker(QtCore.QThread): @@ -437,6 +440,8 @@ def run_until_complete(self, future): raise RuntimeError("Event loop already running") self.__log_debug("Running %s until complete", future) + + # future may actually be a coroutine. This ensures it is wrapped in a Task. future = asyncio.ensure_future(future, loop=self) def stop(*args): @@ -855,9 +860,15 @@ def asyncClose(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): - f = asyncio.ensure_future(fn(*args, **kwargs)) - while not f.done(): - QApplication.instance().processEvents() + loop = asyncio.get_running_loop() + assert isinstance(loop, QEventLoop) + task = loop.create_task(fn(*args, **kwargs)) + while not task.done(): + QApplication.processEvents(AllEvents) + try: + return task.result() + except asyncio.CancelledError: + pass return wrapper @@ -865,13 +876,11 @@ def wrapper(*args, **kwargs): def asyncSlot(*args, **kwargs): """Make a Qt async slot run on asyncio loop.""" - def _error_handler(task): + async def _error_handler(fn, args, kwargs): try: - task.result() + await fn(*args, **kwargs) except Exception: sys.excepthook(*sys.exc_info()) - except asyncio.CancelledError: - pass def outer_decorator(fn): @Slot(*args, **kwargs) @@ -894,9 +903,10 @@ def wrapper(*args, **kwargs): "asyncSlot was not callable from Signal. Potential signature mismatch." ) else: - task = asyncio.ensure_future(fn(*args, **kwargs)) - task.add_done_callback(_error_handler) - return task + task = asyncio.create_task(_error_handler(fn, args, kwargs)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + return return wrapper diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index 4acb73a..c5fb848 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -14,6 +14,7 @@ import threading import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from unittest import mock import pytest @@ -595,8 +596,8 @@ def cb3(): loop._add_reader(c_sock.fileno(), cb1) - _client_task = asyncio.ensure_future(client_coro()) - _server_task = asyncio.ensure_future(server_coro()) + _client_task = loop.create_task(client_coro()) + _server_task = loop.create_task(server_coro()) both_done = asyncio.gather(client_done, server_done) loop.run_until_complete(asyncio.wait_for(both_done, timeout=1.0)) @@ -640,8 +641,8 @@ async def client_coro(): loop._remove_reader(c_sock.fileno()) assert (await loop.sock_recv(c_sock, 3)) == b"foo" - client_done = asyncio.ensure_future(client_coro()) - server_done = asyncio.ensure_future(server_coro()) + client_done = loop.create_task(client_coro()) + server_done = loop.create_task(server_coro()) both_done = asyncio.wait( [server_done, client_done], return_when=asyncio.FIRST_EXCEPTION @@ -758,7 +759,7 @@ def exct_handler(loop, data): handler_called = True loop.set_exception_handler(exct_handler) - asyncio.ensure_future(future_except()) + loop.create_task(future_except()) loop.run_forever() assert coro_run @@ -775,7 +776,11 @@ def exct_handler(loop, data): loop.set_exception_handler(exct_handler) fut1 = asyncio.Future() fut1.set_exception(ExceptionTester()) - asyncio.ensure_future(fut1) + + async def coro(future): + await future + + loop.create_task(coro(fut1)) del fut1 loop.call_later(0.1, loop.stop) loop.run_forever() @@ -798,6 +803,8 @@ def test_async_slot(loop): no_args_called = asyncio.Event() with_args_called = asyncio.Event() trailing_args_called = asyncio.Event() + error_called = asyncio.Event() + cancel_called = asyncio.Event() async def slot_no_args(): no_args_called.set() @@ -812,6 +819,14 @@ async def slot_trailing_args(flag: bool): async def slot_signature_mismatch(_: bool): ... + async def slot_with_error(): + error_called.set() + raise ValueError("Test") + + async def slot_with_cancel(): + cancel_called.set() + raise asyncio.CancelledError() + async def main(): # passing kwargs to the underlying Slot such as name, arguments, return sig = qasync._make_signaller(qasync.QtCore) @@ -836,6 +851,73 @@ async def main(): ) await asyncio.wait_for(all_done, timeout=1.0) + with mock.patch.object(sys, "excepthook") as excepthook: + sig3 = qasync._make_signaller(qasync.QtCore) + sig3.signal.connect(qasync.asyncSlot()(slot_with_error)) + sig3.signal.emit() + await asyncio.wait_for(error_called.wait(), timeout=1.0) + excepthook.assert_called_once() + assert isinstance(excepthook.call_args[0][1], ValueError) + + with mock.patch.object(sys, "excepthook") as excepthook: + sig4 = qasync._make_signaller(qasync.QtCore) + sig4.signal.connect(qasync.asyncSlot()(slot_with_cancel)) + sig4.signal.emit() + await asyncio.wait_for(cancel_called.wait(), timeout=1.0) + excepthook.assert_not_called() + + loop.run_until_complete(main()) + + +def test_async_close(loop, application): + close_called = asyncio.Event() + close_err_called = asyncio.Event() + close_hang_called = asyncio.Event() + + @qasync.asyncClose + async def close(): + close_called.set() + return 33 + + @qasync.asyncClose + async def close_err(): + close_err_called.set() + raise ValueError("Test") + + @qasync.asyncClose + async def close_hang(): + # do an actual cancel instead of directly raising, for completeness. + current = asyncio.current_task() + assert current is not None + + async def killer(): + await asyncio.sleep(0.001) + current.cancel() + + asyncio.create_task(killer()) + close_hang_called.set() + await asyncio.Event().wait() + assert False, "Should have been cancelled" + + # need to run in async context to have a running event loop + async def main(): + # close() is a synchronous top level call, need + # to wrap it to be able to enter event loop + + # test that a regular close works + assert await qasync.asyncWrap(close) == 33 + assert close_called.is_set() + + # test that an exception in the async close is propagated + with pytest.raises(ValueError) as err: + await qasync.asyncWrap(close_err) + assert err.value.args[0] == "Test" + assert close_err_called.is_set() + + # test that a CancelledError is not propagated + assert await qasync.asyncWrap(close_hang) is None + assert close_hang_called.is_set() + loop.run_until_complete(main()) @@ -915,6 +997,14 @@ async def coro(): assert loop.run_until_complete(asyncio.wait_for(coro(), timeout=1)) == 42 +def test_run_until_complete_future(loop): + """Test that run_until_complete accepts futures""" + + fut = asyncio.Future() + loop.call_soon(lambda: fut.set_result(42)) + assert loop.run_until_complete(fut) == 42 + + def test_run_forever_custom_exit_code(loop, application): if hasattr(application, "exec"): orig_exec = application.exec @@ -932,6 +1022,24 @@ def test_run_forever_custom_exit_code(loop, application): application.exec_ = orig_exec +def test_loop_non_reentrant(loop): + async def noop(): + pass + + async def task(): + t = loop.create_task(noop()) + with pytest.raises(RuntimeError): + loop.run_forever() + + with pytest.raises(RuntimeError): + loop.run_until_complete(t) + return 43 + + t = loop.create_task(task()) + loop.run_until_complete(t) + assert t.result() == 43 + + @pytest.mark.parametrize("qtparent", [False, True]) def test_qeventloop_in_qthread(qtparent): class CoroutineExecutorThread(qasync.QtCore.QThread):