diff --git a/winloop/__init__.py b/winloop/__init__.py index c944b89..3210b62 100644 --- a/winloop/__init__.py +++ b/winloop/__init__.py @@ -1,5 +1,4 @@ import asyncio as __asyncio -import collections.abc as _collections_abc import typing as _typing import sys as _sys import warnings as _warnings @@ -9,7 +8,7 @@ from ._version import __version__ # NOQA -__all__: tuple[str, ...] = ("new_event_loop", "run") +__all__: _typing.Tuple[str, ...] = ('new_event_loop', 'run') _AbstractEventLoop = __asyncio.AbstractEventLoop @@ -26,16 +25,16 @@ def new_event_loop() -> Loop: if _typing.TYPE_CHECKING: - def run( - main: _collections_abc.Coroutine[_typing.Any, _typing.Any, _T], + main: _typing.Coroutine[_typing.Any, _typing.Any, _T], *, - loop_factory: _collections_abc.Callable[[], Loop] | None = new_event_loop, - debug: bool | None = None, + loop_factory: _typing.Optional[ + _typing.Callable[[], Loop] + ] = new_event_loop, + debug: _typing.Optional[bool]=None, ) -> _T: """The preferred way of running a coroutine with winloop.""" else: - def run(main, *, loop_factory=new_event_loop, debug=None, **run_kwargs): """The preferred way of running a coroutine with winloop.""" @@ -45,7 +44,7 @@ async def wrapper(): # is using `winloop.run()` intentionally. loop = __asyncio._get_running_loop() if not isinstance(loop, Loop): - raise TypeError("winloop.run() uses a non-winloop event loop") + raise TypeError('winloop.run() uses a non-winloop event loop') return await main vi = _sys.version_info[:2] @@ -55,11 +54,12 @@ async def wrapper(): if __asyncio._get_running_loop() is not None: raise RuntimeError( - "asyncio.run() cannot be called from a running event loop" - ) + "asyncio.run() cannot be called from a running event loop") if not __asyncio.iscoroutine(main): - raise ValueError("a coroutine was expected, got {!r}".format(main)) + raise ValueError( + "a coroutine was expected, got {!r}".format(main) + ) loop = loop_factory() try: @@ -71,8 +71,10 @@ async def wrapper(): try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) - if hasattr(loop, "shutdown_default_executor"): - loop.run_until_complete(loop.shutdown_default_executor()) + if hasattr(loop, 'shutdown_default_executor'): + loop.run_until_complete( + loop.shutdown_default_executor() + ) finally: __asyncio.set_event_loop(None) loop.close() @@ -80,18 +82,22 @@ async def wrapper(): elif vi == (3, 11): if __asyncio._get_running_loop() is not None: raise RuntimeError( - "asyncio.run() cannot be called from a running event loop" - ) + "asyncio.run() cannot be called from a running event loop") with __asyncio.Runner( - loop_factory=loop_factory, debug=debug, **run_kwargs + loop_factory=loop_factory, + debug=debug, + **run_kwargs ) as runner: return runner.run(wrapper()) else: assert vi >= (3, 12) return __asyncio.run( - wrapper(), loop_factory=loop_factory, debug=debug, **run_kwargs + wrapper(), + loop_factory=loop_factory, + debug=debug, + **run_kwargs ) @@ -105,22 +111,22 @@ def _cancel_all_tasks(loop: _AbstractEventLoop) -> None: for task in to_cancel: task.cancel() - loop.run_until_complete(__asyncio.gather(*to_cancel, return_exceptions=True)) + loop.run_until_complete( + __asyncio.gather(*to_cancel, return_exceptions=True) + ) for task in to_cancel: if task.cancelled(): continue if task.exception() is not None: - loop.call_exception_handler( - { - "message": "unhandled exception during asyncio.run() shutdown", - "exception": task.exception(), - "task": task, - } - ) + loop.call_exception_handler({ + 'message': 'unhandled exception during asyncio.run() shutdown', + 'exception': task.exception(), + 'task': task, + }) -_deprecated_names = ("install", "EventLoopPolicy") +_deprecated_names = ('install', 'EventLoopPolicy') if _sys.version_info[:2] < (3, 16): @@ -146,8 +152,8 @@ def install() -> None: """ if _sys.version_info[:2] >= (3, 12): _warnings.warn( - "winloop.install() is deprecated in favor of winloop.run() " - "starting with Python 3.12.", + 'winloop.install() is deprecated in favor of winloop.run() ' + 'starting with Python 3.12.', DeprecationWarning, stacklevel=1, ) @@ -155,7 +161,7 @@ def install() -> None: class EventLoopPolicy( # This is to avoid a mypy error about AbstractEventLoopPolicy - getattr(__asyncio, "AbstractEventLoopPolicy") # type: ignore[misc] + getattr(__asyncio, 'AbstractEventLoopPolicy') # type: ignore[misc] ): """Event loop policy for winloop. @@ -177,12 +183,16 @@ def _loop_factory(self) -> Loop: # marked as abstract in typeshed, we have to put them in so mypy # thinks the base methods are overridden. This is the same approach # taken for the Windows event loop policy classes in typeshed. - def get_child_watcher(self) -> _typing.NoReturn: ... + def get_child_watcher(self) -> _typing.NoReturn: + ... - def set_child_watcher(self, watcher: _typing.Any) -> _typing.NoReturn: ... + def set_child_watcher( + self, watcher: _typing.Any + ) -> _typing.NoReturn: + ... class _Local(threading.local): - _loop: _AbstractEventLoop | None = None + _loop: _typing.Optional[_AbstractEventLoop] = None def __init__(self) -> None: self._local = self._Local() @@ -194,13 +204,15 @@ def get_event_loop(self) -> _AbstractEventLoop: """ if self._local._loop is None: raise RuntimeError( - "There is no current event loop in thread %r." + 'There is no current event loop in thread %r.' % threading.current_thread().name ) return self._local._loop - def set_event_loop(self, loop: _AbstractEventLoop | None) -> None: + def set_event_loop( + self, loop: _typing.Optional[_AbstractEventLoop] + ) -> None: """Set the event loop.""" if loop is not None and not isinstance(loop, _AbstractEventLoop): raise TypeError( @@ -216,6 +228,6 @@ def new_event_loop(self) -> Loop: """ return self._loop_factory() - globals()["install"] = install - globals()["EventLoopPolicy"] = EventLoopPolicy + globals()['install'] = install + globals()['EventLoopPolicy'] = EventLoopPolicy return globals()[name] diff --git a/winloop/_testbase.py b/winloop/_testbase.py index ccd3ffb..29c90f8 100644 --- a/winloop/_testbase.py +++ b/winloop/_testbase.py @@ -1,6 +1,8 @@ """Test utilities. Don't use outside of the winloop project.""" + import asyncio +import asyncio.events import collections import contextlib import gc @@ -15,7 +17,6 @@ import tempfile import threading import time -import typing import unittest import winloop @@ -26,41 +27,39 @@ def __eq__(self, other): class TestCaseDict(collections.UserDict): + def __init__(self, name): super().__init__() self.name = name def __setitem__(self, key, value): if key in self.data: - raise RuntimeError("duplicate test {}.{}".format(self.name, key)) + raise RuntimeError('duplicate test {}.{}'.format( + self.name, key)) super().__setitem__(key, value) class BaseTestCaseMeta(type): + @classmethod def __prepare__(mcls, name, bases): return TestCaseDict(name) def __new__(mcls, name, bases, dct): for test_name in dct: - if not test_name.startswith("test_"): + if not test_name.startswith('test_'): continue for base in bases: if hasattr(base, test_name): raise RuntimeError( - "duplicate test {}.{} (also defined in {} parent class)".format( - name, test_name, base.__name__ - ) - ) + 'duplicate test {}.{} (also defined in {} ' + 'parent class)'.format( + name, test_name, base.__name__)) return super().__new__(mcls, name, bases, dict(dct)) class BaseTestCase(unittest.TestCase, metaclass=BaseTestCaseMeta): - if typing.TYPE_CHECKING: - loop: typing.Optional[asyncio.AbstractEventLoop] - - # TODO: See about making it so that all of these have typehints in the future. def new_loop(self): raise NotImplementedError @@ -80,7 +79,7 @@ async def wait_closed(self, obj): pass def is_asyncio_loop(self): - return type(self.loop).__module__.startswith("asyncio.") + return type(self.loop).__module__.startswith('asyncio.') def run_loop_briefly(self, *, delay=0.01): self.loop.run_until_complete(asyncio.sleep(delay)) @@ -104,9 +103,9 @@ def tearDown(self): self.loop.close() if self.__unhandled_exceptions: - print("Unexpected calls to loop.call_exception_handler():") + print('Unexpected calls to loop.call_exception_handler():') pprint.pprint(self.__unhandled_exceptions) - self.fail("unexpected calls to loop.call_exception_handler()") + self.fail('unexpected calls to loop.call_exception_handler()') return if not self._check_unclosed_resources_in_debug: @@ -117,7 +116,7 @@ def tearDown(self): gc.collect() gc.collect() - if getattr(self.loop, "_debug_cc", False): + if getattr(self.loop, '_debug_cc', False): gc.collect() gc.collect() gc.collect() @@ -125,38 +124,33 @@ def tearDown(self): self.assertEqual( self.loop._debug_uv_handles_total, self.loop._debug_uv_handles_freed, - "not all uv_handle_t handles were freed", - ) + 'not all uv_handle_t handles were freed') self.assertEqual( - self.loop._debug_cb_handles_count, - 0, - "not all callbacks (call_soon) are GCed", - ) + self.loop._debug_cb_handles_count, 0, + 'not all callbacks (call_soon) are GCed') self.assertEqual( - self.loop._debug_cb_timer_handles_count, - 0, - "not all timer callbacks (call_later) are GCed", - ) + self.loop._debug_cb_timer_handles_count, 0, + 'not all timer callbacks (call_later) are GCed') self.assertEqual( - self.loop._debug_stream_write_ctx_cnt, - 0, - "not all stream write contexts are GCed", - ) + self.loop._debug_stream_write_ctx_cnt, 0, + 'not all stream write contexts are GCed') for h_name, h_cnt in self.loop._debug_handles_current.items(): - with self.subTest("Alive handle after test", handle_name=h_name): - self.assertEqual(h_cnt, 0, "alive {} after test".format(h_name)) + with self.subTest('Alive handle after test', + handle_name=h_name): + self.assertEqual( + h_cnt, 0, + 'alive {} after test'.format(h_name)) for h_name, h_cnt in self.loop._debug_handles_total.items(): - with self.subTest("Total/closed handles", handle_name=h_name): + with self.subTest('Total/closed handles', + handle_name=h_name): self.assertEqual( - h_cnt, - self.loop._debug_handles_closed[h_name], - "total != closed for {}".format(h_name), - ) + h_cnt, self.loop._debug_handles_closed[h_name], + 'total != closed for {}'.format(h_name)) asyncio.set_event_loop(None) asyncio.set_event_loop_policy(None) @@ -165,30 +159,27 @@ def tearDown(self): def skip_unclosed_handles_check(self): self._check_unclosed_resources_in_debug = False - def tcp_server( - self, - server_prog, - *, - family=socket.AF_INET, - addr=None, - timeout=5, - backlog=1, - max_clients=10, - ): + def tcp_server(self, server_prog, *, + family=socket.AF_INET, + addr=None, + timeout=5, + backlog=1, + max_clients=10): + if addr is None: # Winloop comment: Windows has no Unix sockets if hasattr(socket, "AF_UNIX") and family == socket.AF_UNIX: with tempfile.NamedTemporaryFile() as tmp: addr = tmp.name else: - addr = ("127.0.0.1", 0) + addr = ('127.0.0.1', 0) sock = socket.socket(family, socket.SOCK_STREAM) if timeout is None: - raise RuntimeError("timeout is required") + raise RuntimeError('timeout is required') if timeout <= 0: - raise RuntimeError("only blocking sockets are supported") + raise RuntimeError('only blocking sockets are supported') sock.settimeout(timeout) try: @@ -198,18 +189,23 @@ def tcp_server( sock.close() raise ex - return TestThreadedServer(self, sock, server_prog, timeout, max_clients) + return TestThreadedServer( + self, sock, server_prog, timeout, max_clients) + + def tcp_client(self, client_prog, + family=socket.AF_INET, + timeout=10): - def tcp_client(self, client_prog, family=socket.AF_INET, timeout=10): sock = socket.socket(family, socket.SOCK_STREAM) if timeout is None: - raise RuntimeError("timeout is required") + raise RuntimeError('timeout is required') if timeout <= 0: - raise RuntimeError("only blocking sockets are supported") + raise RuntimeError('only blocking sockets are supported') sock.settimeout(timeout) - return TestThreadedClient(self, sock, client_prog, timeout) + return TestThreadedClient( + self, sock, client_prog, timeout) def unix_server(self, *args, **kwargs): return self.tcp_server(*args, family=socket.AF_UNIX, **kwargs) @@ -220,7 +216,7 @@ def unix_client(self, *args, **kwargs): @contextlib.contextmanager def unix_sock_name(self): with tempfile.TemporaryDirectory() as td: - fn = os.path.join(td, "sock") + fn = os.path.join(td, 'sock') try: yield fn finally: @@ -237,22 +233,21 @@ def _abort_socket_test(self, ex): def _cert_fullname(test_file_name, cert_file_name): - fullname = os.path.abspath( - os.path.join(os.path.dirname(test_file_name), "certs", cert_file_name) - ) + fullname = os.path.abspath(os.path.join( + os.path.dirname(test_file_name), 'certs', cert_file_name)) assert os.path.isfile(fullname) return fullname @contextlib.contextmanager def silence_long_exec_warning(): + class Filter(logging.Filter): def filter(self, record): - return not ( - record.msg.startswith("Executing") and record.msg.endswith("seconds") - ) + return not (record.msg.startswith('Executing') and + record.msg.endswith('seconds')) - logger = logging.getLogger("asyncio") + logger = logging.getLogger('asyncio') filter = Filter() logger.addFilter(filter) try: @@ -266,19 +261,20 @@ def find_free_port(start_from=50000): sock = socket.socket() with sock: try: - sock.bind(("", port)) + sock.bind(('', port)) except socket.error: continue else: return port - raise RuntimeError("could not find a free port") + raise RuntimeError('could not find a free port') class SSLTestCase: + def _create_server_ssl_context(self, certfile, keyfile=None): - if hasattr(ssl, "PROTOCOL_TLS_SERVER"): + if hasattr(ssl, 'PROTOCOL_TLS_SERVER'): sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - elif hasattr(ssl, "PROTOCOL_TLS"): + elif hasattr(ssl, 'PROTOCOL_TLS'): sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS) else: sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) @@ -296,8 +292,8 @@ def _create_client_ssl_context(self, *, disable_verify=True): @contextlib.contextmanager def _silence_eof_received_warning(self): # TODO This warning has to be fixed in asyncio. - logger = logging.getLogger("asyncio") - filter = logging.Filter("has no effect when using ssl") + logger = logging.getLogger('asyncio') + filter = logging.Filter('has no effect when using ssl') logger.addFilter(filter) try: yield @@ -306,7 +302,8 @@ def _silence_eof_received_warning(self): class UVTestCase(BaseTestCase): - implementation = "winloop" + + implementation = 'winloop' def new_loop(self): return winloop.new_event_loop() @@ -316,18 +313,19 @@ def new_policy(self): class AIOTestCase(BaseTestCase): - implementation = "asyncio" + + implementation = 'asyncio' def setUp(self): super().setUp() - if sys.platform != "win32" and sys.version_info < (3, 12): + if sys.version_info < (3, 12) and sys.platform != "win32": watcher = asyncio.SafeChildWatcher() watcher.attach_loop(self.loop) asyncio.set_child_watcher(watcher) def tearDown(self): - if sys.platform != "win32" and sys.version_info < (3, 12): + if sys.version_info < (3, 12) and sys.platform != "win32": asyncio.set_child_watcher(None) super().tearDown() @@ -342,7 +340,7 @@ def has_IPv6(): server_sock = socket.socket(socket.AF_INET6) with server_sock: try: - server_sock.bind(("::1", 0)) + server_sock.bind(('::1', 0)) except OSError: return False else: @@ -358,34 +356,30 @@ def has_IPv6(): class TestSocketWrapper: + def __init__(self, sock): self.__sock = sock def recv_all(self, n): - buf = b"" + buf = b'' while len(buf) < n: data = self.recv(n - len(buf)) - if data == b"": + if data == b'': raise ConnectionAbortedError buf += data return buf - def starttls( - self, - ssl_context, - *, - server_side=False, - server_hostname=None, - do_handshake_on_connect=True, - ): + def starttls(self, ssl_context, *, + server_side=False, + server_hostname=None, + do_handshake_on_connect=True): + assert isinstance(ssl_context, ssl.SSLContext) ssl_sock = ssl_context.wrap_socket( - self.__sock, - server_side=server_side, + self.__sock, server_side=server_side, server_hostname=server_hostname, - do_handshake_on_connect=do_handshake_on_connect, - ) + do_handshake_on_connect=do_handshake_on_connect) if server_side: ssl_sock.do_handshake() @@ -397,10 +391,11 @@ def __getattr__(self, name): return getattr(self.__sock, name) def __repr__(self): - return "<{} {!r}>".format(type(self).__name__, self.__sock) + return '<{} {!r}>'.format(type(self).__name__, self.__sock) class SocketThread(threading.Thread): + def stop(self): self._active = False self.join() @@ -414,8 +409,9 @@ def __exit__(self, *exc): class TestThreadedClient(SocketThread): + def __init__(self, test, sock, prog, timeout): - threading.Thread.__init__(self, None, None, "test-client") + threading.Thread.__init__(self, None, None, 'test-client') self.daemon = True self._timeout = timeout @@ -434,8 +430,9 @@ def run(self): class TestThreadedServer(SocketThread): + def __init__(self, test, sock, prog, timeout, max_clients): - threading.Thread.__init__(self, None, None, "test-server") + threading.Thread.__init__(self, None, None, 'test-server') self.daemon = True self._clients = 0 @@ -456,7 +453,7 @@ def stop(self): try: if self._s2 and self._s2.fileno() != -1: try: - self._s2.send(b"stop") + self._s2.send(b'stop') except OSError: pass finally: @@ -476,7 +473,8 @@ def _run(self): if self._clients >= self._max_clients: return - r, w, x = select.select([self._sock, self._s1], [], [], self._timeout) + r, w, x = select.select( + [self._sock, self._s1], [], [], self._timeout) if self._s1 in r: return @@ -522,7 +520,6 @@ def addr(self): def run_briefly(loop): async def once(): pass - gen = once() t = loop.create_task(gen) # Don't log a warning if the task is not done after run_until_complete(). diff --git a/winloop/_version.py b/winloop/_version.py index 9c1905c..3116a2e 100644 --- a/winloop/_version.py +++ b/winloop/_version.py @@ -10,4 +10,4 @@ # supported platforms, publish the packages on PyPI, merge the PR # to the target branch, create a Git tag pointing to the commit. -__version__ = "0.5.0" +__version__ = '0.6.0' diff --git a/winloop/cbhandles.pxd b/winloop/cbhandles.pxd index 52a0439..757ae4d 100644 --- a/winloop/cbhandles.pxd +++ b/winloop/cbhandles.pxd @@ -11,7 +11,7 @@ cdef class Handle: object __weakref__ - readonly _source_traceback # type: ignore + readonly _source_traceback cdef inline _set_loop(self, Loop loop) cdef inline _set_context(self, object context) diff --git a/winloop/cbhandles.pyx b/winloop/cbhandles.pyx index 5655a4f..833b0da 100644 --- a/winloop/cbhandles.pyx +++ b/winloop/cbhandles.pyx @@ -9,8 +9,10 @@ cdef class Handle: cdef inline _set_loop(self, Loop loop): self.loop = loop if UVLOOP_DEBUG: - loop._debug_cb_handles_total += 1 - loop._debug_cb_handles_count += 1 + system.__atomic_fetch_add( + &loop._debug_cb_handles_total, 1, system.__ATOMIC_RELAXED) + system.__atomic_fetch_add( + &loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED) if loop._debug: self._source_traceback = extract_stack() @@ -21,7 +23,8 @@ cdef class Handle: def __dealloc__(self): if UVLOOP_DEBUG and self.loop is not None: - self.loop._debug_cb_handles_count -= 1 + system.__atomic_fetch_sub( + &self.loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED) if self.loop is None: raise RuntimeError('Handle.loop is None in Handle.__dealloc__') @@ -174,8 +177,10 @@ cdef class TimerHandle: self._cancelled = 0 if UVLOOP_DEBUG: - self.loop._debug_cb_timer_handles_total += 1 - self.loop._debug_cb_timer_handles_count += 1 + system.__atomic_fetch_add( + &self.loop._debug_cb_timer_handles_total, 1, system.__ATOMIC_RELAXED) + system.__atomic_fetch_add( + &self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED) if context is None: context = Context_CopyCurrent() @@ -205,7 +210,8 @@ cdef class TimerHandle: def __dealloc__(self): if UVLOOP_DEBUG: - self.loop._debug_cb_timer_handles_count -= 1 + system.__atomic_fetch_sub( + &self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED) if self.timer is not None: raise RuntimeError('active TimerHandle is deallacating') @@ -410,7 +416,7 @@ cdef new_MethodHandle3(Loop loop, str name, method3_t callback, object context, return handle -cdef object extract_stack(): +cdef extract_stack(): """Replacement for traceback.extract_stack() that only does the necessary work for asyncio debug mode. """ diff --git a/winloop/dns.pyx b/winloop/dns.pyx index d312078..e6afb25 100644 --- a/winloop/dns.pyx +++ b/winloop/dns.pyx @@ -154,7 +154,7 @@ cdef __convert_pyaddr_to_sockaddr(int family, object addr, (&ret.addr).sin6_flowinfo = flowinfo (&ret.addr).sin6_scope_id = scope_id - elif not system.PLATFORM_IS_WINDOWS and family == uv.AF_UNIX: + elif family == uv.AF_UNIX and (not system.PLATFORM_IS_WINDOWS): if isinstance(addr, str): addr = addr.encode(sys_getfilesystemencoding()) elif not isinstance(addr, bytes): @@ -352,16 +352,17 @@ cdef class AddrInfoRequest(UVRequest): if host is None: chost = NULL - elif host == b'' and sys.platform == 'darwin': + elif host == b'' and sys_platform == 'darwin': # It seems `getaddrinfo("", ...)` on macOS is equivalent to # `getaddrinfo("localhost", ...)`. This is inconsistent with # libuv 1.48 which treats empty nodename as EINVAL. chost = 'localhost' - elif host == b'' and sys.platform == 'win32': + elif host == b'' and sys_platform == "win32": # On Windows, `getaddrinfo("", ...)` is *almost* equivalent to # `getaddrinfo("..localmachine", ...)`. This is inconsistent with # libuv 1.48 which treats empty nodename as EINVAL. chost = '..localmachine' + else: chost = host @@ -397,7 +398,7 @@ cdef class AddrInfoRequest(UVRequest): # EAI_NONAME [ErrNo 10001] "No such host is known. ". # We replace the message with "getaddrinfo failed". # See also errors.pyx. - if sys.platform == 'win32': + if sys_platform == 'win32': msg = 'getaddrinfo failed' else: msg = system.gai_strerror(socket_EAI_NONAME).decode('utf-8') diff --git a/winloop/errors.pyx b/winloop/errors.pyx index 25051aa..2f5624e 100644 --- a/winloop/errors.pyx +++ b/winloop/errors.pyx @@ -1,19 +1,70 @@ + + +cdef str __strerr(int errno): + return strerror(errno).decode() + + cdef __convert_python_error(int uverr): # XXX Won't work for Windows: # From libuv docs: # Implementation detail: on Unix error codes are the # negated errno (or -errno), while on Windows they # are defined by libuv to arbitrary negative numbers. + + cdef int oserr + cdef object err + + if system.PLATFORM_IS_WINDOWS: + + # So Let's try converting them a different way if were using windows. + # Winloop has a smarter technique for showing these errors. + err = getattr(win_errno, uv.uv_err_name(uverr).decode(), uverr) + return OSError(err, uv.uv_strerror(uverr).decode()) + + oserr = -uverr + + + exc = OSError + + if uverr in (uv.UV_EACCES, uv.UV_EPERM): + exc = PermissionError + + elif uverr in (uv.UV_EAGAIN, uv.UV_EALREADY): + exc = BlockingIOError + + elif uverr in (uv.UV_EPIPE, uv.UV_ESHUTDOWN): + exc = BrokenPipeError + + elif uverr == uv.UV_ECONNABORTED: + exc = ConnectionAbortedError + + elif uverr == uv.UV_ECONNREFUSED: + exc = ConnectionRefusedError + + elif uverr == uv.UV_ECONNRESET: + exc = ConnectionResetError + + elif uverr == uv.UV_EEXIST: + exc = FileExistsError + + elif uverr == uv.UV_ENOENT: + exc = FileNotFoundError + + elif uverr == uv.UV_EINTR: + exc = InterruptedError + + elif uverr == uv.UV_EISDIR: + exc = IsADirectoryError + + elif uverr == uv.UV_ESRCH: + exc = ProcessLookupError + + elif uverr == uv.UV_ETIMEDOUT: + exc = TimeoutError + + return exc(oserr, __strerr(oserr)) - # Winloop comment: The following approach seems to work for Windows: - # translation from uverr, which is a negative number like -4088 or -4071 - # defined by libuv (as mentioned above), to error numbers obtained via - # the Python module errno. - err = getattr(errno, uv.uv_err_name(uverr).decode(), uverr) - return OSError(err, uv.uv_strerror(uverr).decode()) -# TODO: Create a switch block for dealing with this otherwise we're waiting on match blocks -# to be fully implemented cdef int __convert_socket_error(int uverr): cdef int sock_err = 0 @@ -80,10 +131,10 @@ cdef convert_error(int uverr): # EAI_FAMILY [ErrNo 10047] "An address incompatible with the requested protocol was used. " # EAI_NONAME [ErrNo 10001] "No such host is known. " # We replace these messages with "getaddrinfo failed" - if sys.platform == 'win32': + if sys_platform == "win32": if sock_err in (socket_EAI_FAMILY, socket_EAI_NONAME): msg = 'getaddrinfo failed' - return socket_gaierror(sock_err, msg) + return socket_gaierror(sock_err, msg) return __convert_python_error(uverr) diff --git a/winloop/handles/handle.pxd b/winloop/handles/handle.pxd index fe97e50..5af1c14 100644 --- a/winloop/handles/handle.pxd +++ b/winloop/handles/handle.pxd @@ -1,11 +1,3 @@ -cimport cython - -# NOTE: Uvloop expects you to use no_gc_clear -# Reason beind doing so has to do with the debug -# RuntimeError which hints at this and makes it -# very clear to use it. -# so please do not remove this wrapper, thank you :) -@cython.no_gc_clear cdef class UVHandle: cdef: uv.uv_handle_t *_handle @@ -21,26 +13,22 @@ cdef class UVHandle: # All "inline" methods are final - cdef inline int _start_init(self, Loop loop) except -1 - cdef inline int _abort_init(self) except -1 + cdef inline _start_init(self, Loop loop) + cdef inline _abort_init(self) cdef inline _finish_init(self) - cdef inline bint _is_alive(self) except -1 - cdef inline int _ensure_alive(self) except -1 + cdef inline bint _is_alive(self) + cdef inline _ensure_alive(self) cdef _error(self, exc, throw) - # in CPython it returns NULL on exception raised - # so let's define that an object of NONE is returning. - cdef object _fatal_error(self, exc, throw, reason=?) + cdef _fatal_error(self, exc, throw, reason=?) + cdef _warn_unclosed(self) - cdef void _free(self) noexcept - # TODO: Optimize to return an integer if - # exception handling of CPython can be better learned. + cdef _free(self) cdef _close(self) -@cython.no_gc_clear cdef class UVSocketHandle(UVHandle): cdef: # Points to a Python file-object that should be closed diff --git a/winloop/handles/handle.pyx b/winloop/handles/handle.pyx index c2c1de6..2c96458 100644 --- a/winloop/handles/handle.pyx +++ b/winloop/handles/handle.pyx @@ -1,5 +1,3 @@ - - cdef class UVHandle: """A base class for all libuv handles. @@ -70,7 +68,7 @@ cdef class UVHandle: self._closed = 1 self._free() - cdef void _free(self) noexcept: + cdef _free(self): if self._handle == NULL: return @@ -95,37 +93,29 @@ cdef class UVHandle: msg = 'unclosed resource {!r}; {}'.format(self, tb) else: msg = 'unclosed resource {!r}'.format(self) - - # There should be a better way to do this in CPython... warnings_warn(msg, ResourceWarning) - cdef inline int _abort_init(self) except -1: + cdef inline _abort_init(self): if self._handle is not NULL: self._free() - return 0 try: if UVLOOP_DEBUG: name = self.__class__.__name__ if self._inited: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '_abort_init: {}._inited is set'.format(name)) - return -1 if self._closed: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '_abort_init: {}._closed is set'.format(name)) - return -1 finally: self._closed = 1 - return 0 - + cdef inline _finish_init(self): self._inited = 1 if self._has_handle == 1: self._handle.data = self if self._loop._debug: - # extract-stack throws exception so _finish_init(self) - # can't currently be optimzied further... self._source_traceback = extract_stack() if UVLOOP_DEBUG: cls_name = self.__class__.__name__ @@ -133,59 +123,44 @@ cdef class UVHandle: self._loop._debug_handles_total.update([cls_name]) self._loop._debug_handles_current.update([cls_name]) - cdef inline int _start_init(self, Loop loop) except -1: + cdef inline _start_init(self, Loop loop): if UVLOOP_DEBUG: if self._loop is not None: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '{}._start_init can only be called once'.format( self.__class__.__name__)) - return -1 - self._loop = loop - return 0 - # TODO: Better excpetion handling when UVLOOP_DEBUG is in effect.. - # we should be handling errors as if we were writing in CPython to minimize - # the amount of code being generated. Doing so leads to faster performance. - # There is a lot of areas that could use pleanty of internal refractoring. - # Same goes for users with uvloop. - Vizonex + self._loop = loop - # TODO change bint to int so that were allowing the optional exceptions bringing -1 can be handled appropreately. - cdef inline bint _is_alive(self) except -1: + cdef inline bint _is_alive(self): cdef bint res res = self._closed != 1 and self._inited == 1 if UVLOOP_DEBUG: if res and self._has_handle == 1: name = self.__class__.__name__ if self._handle is NULL: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '{} is alive, but _handle is NULL'.format(name)) - return -1 if self._loop is None: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '{} is alive, but _loop is None'.format(name)) - return -1 if self._handle.loop is not self._loop.uvloop: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '{} is alive, but _handle.loop is not ' 'initialized'.format(name)) - return -1 if self._handle.data is not self: - PyErr_SetObject(RuntimeError, + raise RuntimeError( '{} is alive, but _handle.data is not ' 'initialized'.format(name)) - return -1 return res - - cdef inline int _ensure_alive(self) except -1: + cdef inline _ensure_alive(self): if not self._is_alive(): - PyErr_SetObject(RuntimeError, + raise RuntimeError( 'unable to perform operation on {!r}; ' 'the handler is closed'.format(self)) - return -1 - return 0 - cdef object _fatal_error(self, exc, throw, reason=None): + cdef _fatal_error(self, exc, throw, reason=None): # Fatal error means an error that was returned by the # underlying libuv handle function. We usually can't # recover from that, hence we just close the handle. @@ -322,9 +297,6 @@ cdef class UVSocketHandle(UVHandle): finally: self._fileobj = None - # TODO: PyErr_SetNone Followed by return -1 - # We should internally refractor socket portions, this - # way if possible to reduce the C Generated code-size. cdef _open(self, int sockfd): raise NotImplementedError diff --git a/winloop/handles/pipe.pyx b/winloop/handles/pipe.pyx index 5e37d9e..08481dd 100644 --- a/winloop/handles/pipe.pyx +++ b/winloop/handles/pipe.pyx @@ -1,5 +1,3 @@ -from libc.stdint cimport uintptr_t - cdef __pipe_init_uv_handle(UVStream handle, Loop loop): cdef int err @@ -27,7 +25,7 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop): cdef __pipe_open(UVStream handle, int fd): cdef int err err = uv.uv_pipe_open(handle._handle, - fd) + fd) if err < 0: exc = convert_error(err) raise exc @@ -198,7 +196,7 @@ cdef class WriteUnixTransport(UVStream): cdef _new_socket(self): return __pipe_get_socket(self) - cdef _open(self, int sockfd): + cdef _open(self, uv.uv_os_fd_t sockfd): __pipe_open(self, sockfd) def pause_reading(self): diff --git a/winloop/handles/process.pyx b/winloop/handles/process.pyx index dc4d572..3aad40a 100644 --- a/winloop/handles/process.pyx +++ b/winloop/handles/process.pyx @@ -27,8 +27,8 @@ cdef class UVProcess(UVHandle): _stdin, _stdout, _stderr, # std* can be defined as macros in C pass_fds, debug_flags, preexec_fn, restore_signals): - global __forking + if not system.PLATFORM_IS_WINDOWS: global __forking_loop global __forkHandler @@ -59,6 +59,7 @@ cdef class UVProcess(UVHandle): try: self._init_options(args, env, cwd, start_new_session, _stdin, _stdout, _stderr, force_fork) + restore_inheritable = set() if pass_fds: for fd in pass_fds: @@ -80,10 +81,9 @@ cdef class UVProcess(UVHandle): self._errpipe_read, self._errpipe_write = os_pipe() fds_to_close = self._fds_to_close self._fds_to_close = None - # add the write pipe last so we can close it early fds_to_close.append(self._errpipe_read) + # add the write pipe last so we can close it early fds_to_close.append(self._errpipe_write) - try: os_set_inheritable(self._errpipe_write, True) @@ -91,6 +91,8 @@ cdef class UVProcess(UVHandle): self._restore_signals = restore_signals loop.active_process_handler = self + + if not system.PLATFORM_IS_WINDOWS: __forking = 1 __forking_loop = loop @@ -98,53 +100,25 @@ cdef class UVProcess(UVHandle): PyOS_BeforeFork() else: - # pass - # NOTE: There's a good change we might consider getting rid of gil related features in the future and - # instead try without to see if we can get the pids to correctly match. I'll save this for 0.2.1 however. py_gil_state = PyGILState_Ensure() - - # Also important to note... https://docs.libuv.org/en/v1.x/guide/processes.html#option-flags - # "Changing the UID/GID is only supported on Unix, uv_spawn will fail on Windows with UV_ENOTSUP." - Libuv Docs - # This means that we cannot use any flags with this setup - # Finding examples of how uv_spawn is used will be helful as well... - # https://docs.libuv.org/en/v1.x/process.html#c.uv_process_flags - - # (Winloop & Vizonex) Note: To make up for the loss of forking (which I saw as an enhancement on other systems) - # Try releasing the gil during the spawning phase which is what CPython does... - # in order to attempt to try and mimic forking behaviors but also to try and prevent the processes from blocking - - # I cannot seem to get with nogil to work here so this was my workaround... - # This might be our answer... - # https://github.com/saghul/pyuv/blob/39342fc2fd688f2fb2120d3092dd9cf52f537de2/src/process.c - - # This is similar to how CPython handles process spawning - # It releases the gil during spawntime and then brings it back in... - + err = uv.uv_spawn(loop.uvloop, - self._handle, - &self.options) - + self._handle, + &self.options) + + if not system.PLATFORM_IS_WINDOWS: __forking = 0 __forking_loop = None system.resetForkHandler() + PyOS_AfterFork_Parent() else: PyGILState_Release(py_gil_state) - # NOTE I brought the PyGILState_Release here instead of later - # so that we can have better control over the os module... - Vizonex - - # This GIL release couldn't be deferred to self._after_fork() anyway because the latter call - # only happens if forking is enabled (call via system.handleAtFork which is set - # using system.setForkHandler). - # Actually, since self._after_fork() is not called in this Windows version, - # and that call is the part executed with gil in the nonWindows version, see: - # https://github.com/MagicStack/uvloop/blob/6c770dc3fbdd281d15c2ad46588c139696f9269c/uvloop/loop.pyx#L3353-L3357 - # the question is why the GIL is needed at all here? - # (Answer) Cython does not like when class objects from python are being handled whenever the gil is being released. - + loop.active_process_handler = None + if err < 0: self._close_process_handle() self._abort_init() @@ -292,19 +266,19 @@ cdef class UVProcess(UVHandle): self.options.args = self.uv_opt_args if start_new_session: - # Try disable this... self.options.flags |= uv.UV_PROCESS_DETACHED - if system.PLATFORM_IS_WINDOWS: + # if system.PLATFORM_IS_WINDOWS: # TODO Forget these flags for right now until we have figured out/diagnosed the real issue... # "All of these flags have been set because they're all meaningful on windows systems... # see uv_process_fags for more reasons why I had to set all of these up this way" - Vizonex # https://docs.libuv.org/en/v1.x/process.html#c.uv_process_flags # enabling VERBATIM_ARGUMENTS is helpful here because we're not enabling children... - self.options.flags |= uv.UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS - pass + # self.options.flags |= uv.UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS + # pass + - # if force_fork: + if force_fork: # This is a hack to work around the change in libuv 1.44: # > macos: use posix_spawn instead of fork # where Python subprocess options like preexec_fn are @@ -316,8 +290,8 @@ cdef class UVProcess(UVHandle): # Based on current (libuv 1.46) behavior, setting # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make # libuv fallback to use fork, so let's just use it for now. - # self.options.flags |= uv.UV_PROCESS_SETUID - # self.options.uid = uv.getuid() + self.options.flags |= uv.UV_PROCESS_SETUID + self.options.uid = uv.getuid() if cwd is not None: cwd = os_fspath(cwd) @@ -465,6 +439,10 @@ cdef class UVProcessTransport(UVProcess): else: self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) + # TODO: https://github.com/Vizonex/Winloop/issues/126 bug fix for uvloop + # Might need a special implementation for subprocess.Popen._get_handles() + # but can't seem to wrap my head around how to go about doing it. + cdef _file_redirect_stdio(self, int fd): fd = os_dup(fd) os_set_inheritable(fd, True) diff --git a/winloop/handles/stream.pxd b/winloop/handles/stream.pxd index 8ca8743..71c8288 100644 --- a/winloop/handles/stream.pxd +++ b/winloop/handles/stream.pxd @@ -1,3 +1,9 @@ +cdef enum ProtocolType: + SIMPLE = 0 # User Protocol doesn't support asyncio.BufferedProtocol + BUFFERED = 1 # User Protocol supports asyncio.BufferedProtocol + SSL_PROTOCOL = 2 # Our own SSLProtocol + + cdef class UVStream(UVBaseTransport): cdef: uv.uv_shutdown_t _shutdown_req @@ -5,7 +11,7 @@ cdef class UVStream(UVBaseTransport): bint __reading bint __read_error_close - bint __buffered + ProtocolType __protocol_type object _protocol_get_buffer object _protocol_buffer_updated @@ -16,6 +22,8 @@ cdef class UVStream(UVBaseTransport): Py_buffer _read_pybuf bint _read_pybuf_acquired + cpdef write(self, object buf) + # All "inline" methods are final cdef inline _init(self, Loop loop, object protocol, Server server, @@ -39,8 +47,8 @@ cdef class UVStream(UVBaseTransport): # _exec_write() is the method that does the actual send, and _try_write() # is a fast-path used in _exec_write() to send a single chunk. - cdef inline _exec_write(self) - cdef inline _try_write(self, object data) + cdef inline bint _exec_write(self) except -1 + cdef inline Py_ssize_t _try_write(self, object data) except -2 cdef _close(self) diff --git a/winloop/handles/stream.pyx b/winloop/handles/stream.pyx index 310c828..974d6ab 100644 --- a/winloop/handles/stream.pyx +++ b/winloop/handles/stream.pyx @@ -213,14 +213,15 @@ cdef class UVStream(UVBaseTransport): self.__shutting_down = 0 self.__reading = 0 self.__read_error_close = 0 - self.__buffered = 0 - self._eof = 0 - self._buffer = [] - self._buffer_size = 0 + self.__protocol_type = ProtocolType.SIMPLE self._protocol_get_buffer = None self._protocol_buffer_updated = None + self._eof = 0 + self._buffer = [] + self._buffer_size = 0 + self._read_pybuf_acquired = False cdef _set_protocol(self, object protocol): @@ -229,22 +230,24 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) - if (hasattr(protocol, 'get_buffer') and + if isinstance(protocol, SSLProtocol): + self.__protocol_type = ProtocolType.SSL_PROTOCOL + elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): try: self._protocol_get_buffer = protocol.get_buffer self._protocol_buffer_updated = protocol.buffer_updated - self.__buffered = 1 + self.__protocol_type = ProtocolType.BUFFERED except AttributeError: pass else: - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef _clear_protocol(self): UVBaseTransport._clear_protocol(self) self._protocol_get_buffer = None self._protocol_buffer_updated = None - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef inline _shutdown(self): cdef int err @@ -294,14 +297,14 @@ cdef class UVStream(UVBaseTransport): if self.__reading: return - if self.__buffered: - err = uv.uv_read_start(self._handle, - __uv_stream_buffered_alloc, - __uv_stream_buffered_on_read) - else: + if self.__protocol_type == ProtocolType.SIMPLE: err = uv.uv_read_start(self._handle, __loop_alloc_buffer, __uv_stream_on_read) + else: + err = uv.uv_read_start( self._handle, + __uv_stream_buffered_alloc, + __uv_stream_buffered_on_read) if err < 0: exc = convert_error(err) self._fatal_error(exc, True) @@ -341,16 +344,18 @@ cdef class UVStream(UVBaseTransport): else: self.__reading_stopped() - cdef inline _try_write(self, object data): + cdef inline Py_ssize_t _try_write(self, object data) except -2: + # Returns number of bytes written. + # -1 - in case of fatal errors cdef: - ssize_t written + Py_ssize_t written bint used_buf = 0 Py_buffer py_buf void* buf - size_t blen + Py_ssize_t blen int saved_errno int fd - + if system.PLATFORM_IS_WINDOWS: # Winloop comment: WSASend below does not work with pipes. # For pipes, using Writefile() from Windows fileapi.h would @@ -358,7 +363,7 @@ cdef class UVStream(UVBaseTransport): # FILE_FLAG_OVERLAPPED set, but we don't want to go that way here. # We detect pipes on Windows as pseudosockets. if self._get_socket().family == uv.AF_UNIX: - return -1 + return 0 # use zero instead of an error as this is not a problem. if (self._handle).write_queue_size != 0: raise RuntimeError( @@ -377,6 +382,8 @@ cdef class UVStream(UVBaseTransport): blen = py_buf.len if blen == 0: + if used_buf: + PyBuffer_Release(&py_buf) # Empty data, do nothing. return 0 @@ -384,8 +391,6 @@ cdef class UVStream(UVBaseTransport): # Use `unistd.h/write` directly, it's faster than # uv_try_write -- less layers of code. The error # checking logic is copied from libuv. - # Winloop comment: similarly, use WSASend directly - # on Windows via call to system.write. written = system.write(fd, buf, blen) if not system.PLATFORM_IS_WINDOWS: while written == -1 and ( @@ -404,27 +409,27 @@ cdef class UVStream(UVBaseTransport): PyBuffer_Release(&py_buf) if written < 0: - if saved_errno == errno.EAGAIN or \ - saved_errno == system.EWOULDBLOCK: - return -1 - else: + if saved_errno in (errno.EAGAIN, system.EWOULDBLOCK): + return 0 + elif system.PLATFORM_IS_WINDOWS: # Winloop comment: use uv_translate_sys_error for # correct results on all platforms as -saved_errno # only works for POSIX. exc = convert_error(uv.uv_translate_sys_error(saved_errno)) self._fatal_error(exc, True) - return + return -1 + else: + exc = convert_error(-saved_errno) + self._fatal_error(exc, True) + return -1 if UVLOOP_DEBUG: self._loop._debug_stream_write_tries += 1 - if written == blen: - return 0 - return written cdef inline _buffer_write(self, object data): - cdef int dlen + cdef Py_ssize_t dlen if not PyBytes_CheckExact(data): data = memoryview(data).cast('b') @@ -437,19 +442,20 @@ cdef class UVStream(UVBaseTransport): self._buffer.append(data) cdef inline _initiate_write(self): + cdef bint all_sent + if (not self._protocol_paused and - (self._handle).write_queue_size == 0 and - self._buffer_size > self._high_water): + (self._handle).write_queue_size == 0 and + self._buffer_size > self._high_water): # Fast-path. If: # - the protocol isn't yet paused, # - there is no data in libuv buffers for this stream, - # - the protocol will be paused if we continue to buffer data # # Then: # - Try to write all buffered data right now. all_sent = self._exec_write() if UVLOOP_DEBUG: - if self._buffer_size != 0 or self._buffer != []: + if self._buffer_size != 0 or self._buffer: raise RuntimeError( '_buffer_size is not 0 after a successful _exec_write') @@ -465,20 +471,23 @@ cdef class UVStream(UVBaseTransport): self._maybe_pause_protocol() self._loop._queue_write(self) - cdef inline _exec_write(self): + cdef inline bint _exec_write(self) except -1: + # Returns True if all data from self._buffers has been sent, + # False - otherwise cdef: int err - int buf_len + Py_ssize_t buf_len + Py_ssize_t sent _StreamWriteContext ctx = None if self._closed: # If the handle is closed, just return, it's too # late to do anything. - return + return False buf_len = len(self._buffer) if not buf_len: - return + return True if (self._handle).write_queue_size == 0: # libuv internal write buffers for this stream are empty. @@ -488,34 +497,16 @@ cdef class UVStream(UVBaseTransport): data = self._buffer[0] sent = self._try_write(data) - if sent is None: - # A `self._fatal_error` was called. - # It might not raise an exception under some - # conditions. - self._buffer_size = 0 - self._buffer.clear() - if not self._closing: - # This should never happen. - raise RuntimeError( - 'stream is open after UVStream._try_write ' - 'returned None') - return - - if sent == 0: - # All data was successfully written. + if sent == len(data): + # The most likely and latency sensitive outcome goes first, + # all data was successfully written. self._buffer_size = 0 self._buffer.clear() # on_write will call "maybe_resume_protocol". self._on_write() return True - if sent > 0: - if UVLOOP_DEBUG: - if sent == len(data): - raise RuntimeError( - '_try_write sent all data and returned ' - 'non-zero') - + elif sent > 0: if PyBytes_CheckExact(data): # Cast bytes to memoryview to avoid copying # data that wasn't sent. @@ -525,6 +516,19 @@ cdef class UVStream(UVBaseTransport): self._buffer_size -= sent self._buffer[0] = data + elif sent == -1: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + self._buffer_size = 0 + self._buffer.clear() + if not self._closing: + # This should never happen. + raise RuntimeError( + 'stream is open after UVStream._try_write ' + 'returned None') + return False + # At this point it's either data was sent partially, # or an EAGAIN has happened. @@ -558,7 +562,7 @@ cdef class UVStream(UVBaseTransport): self._fatal_error(ex, True) self._buffer.clear() self._buffer_size = 0 - return + return False elif err != uv.UV_EAGAIN: ctx.close() @@ -566,7 +570,7 @@ cdef class UVStream(UVBaseTransport): self._fatal_error(exc, True) self._buffer.clear() self._buffer_size = 0 - return + return False # fall through @@ -590,9 +594,10 @@ cdef class UVStream(UVBaseTransport): exc = convert_error(err) self._fatal_error(exc, True) - return + return False self._maybe_resume_protocol() + return False cdef size_t _get_write_buffer_size(self): if self._handle is NULL: @@ -686,10 +691,9 @@ cdef class UVStream(UVBaseTransport): self.__reading, id(self)) - def write(self, object buf): + cpdef write(self, object buf): self._ensure_alive() - - + if system.PLATFORM_IS_WINDOWS: # Winloop Comment: Winloop gets itself into trouble if this is # is not checked immediately, it's too costly to call the python function @@ -697,7 +701,7 @@ cdef class UVStream(UVBaseTransport): # SEE: https://github.com/Vizonex/Winloop/issues/84 if self._closing: raise RuntimeError("Cannot call write() when UVStream is closing") - + if self._eof: raise RuntimeError('Cannot call write() after write_eof()') if not buf: @@ -952,9 +956,24 @@ cdef void __uv_stream_buffered_alloc( "UVStream alloc buffer callback") == 0: return + cdef UVStream sc = stream.data + + # Fast pass for our own SSLProtocol + # avoid python calls, memoryviews, context enter/exit, etc + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + try: + (sc._protocol).get_buffer_impl( + suggested_size, &uvbuf.base, &uvbuf.len) + return + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + cdef: - UVStream sc = stream.data - Loop loop = sc._loop Py_buffer* pybuf = &sc._read_pybuf int got_buf = 0 @@ -1015,7 +1034,12 @@ cdef void __uv_stream_buffered_on_read( return try: - if nread > 0 and not sc._read_pybuf_acquired: + # When our own SSLProtocol is used, we get buffer pointer directly, + # through SSLProtocol.get_buffer_impl, not through Py_Buffer interface. + # Therefore sc._read_pybuf_acquired is always False for SSLProtocol. + if (nread > 0 and + sc.__protocol_type == ProtocolType.BUFFERED and + not sc._read_pybuf_acquired): # From libuv docs: # nread is > 0 if there is data available or < 0 on error. When # we’ve reached EOF, nread will be set to UV_EOF. When @@ -1036,12 +1060,20 @@ cdef void __uv_stream_buffered_on_read( if UVLOOP_DEBUG: loop._debug_stream_read_cb_total += 1 - run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + Context_Enter(sc.context) + try: + (sc._protocol).buffer_updated_impl(nread) + finally: + Context_Exit(sc.context) + else: + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) except BaseException as exc: if UVLOOP_DEBUG: loop._debug_stream_read_cb_errors_total += 1 sc._fatal_error(exc, False) finally: - sc._read_pybuf_acquired = 0 - PyBuffer_Release(pybuf) + if sc._read_pybuf_acquired: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/winloop/handles/udp.pyx b/winloop/handles/udp.pyx index ef20c3f..eac1bca 100644 --- a/winloop/handles/udp.pyx +++ b/winloop/handles/udp.pyx @@ -208,6 +208,10 @@ cdef class UDPTransport(UVBaseTransport): if addr is None: saddr = NULL else: + # resolve special hostname to the broadcast address before use + if self._family == uv.AF_INET and addr[0] == '': + addr = (b'255.255.255.255', addr[1]) + try: __convert_pyaddr_to_sockaddr(self._family, addr, &saddr_st) diff --git a/winloop/includes/__init__.py b/winloop/includes/__init__.py index 4b84e3e..2ccf9ca 100644 --- a/winloop/includes/__init__.py +++ b/winloop/includes/__init__.py @@ -12,12 +12,12 @@ import os import signal import socket +import subprocess import ssl import stat -import subprocess import sys import threading -import time import traceback +import time import warnings import weakref diff --git a/winloop/includes/compat.h b/winloop/includes/compat.h index ee7132f..d42a8f1 100644 --- a/winloop/includes/compat.h +++ b/winloop/includes/compat.h @@ -1,6 +1,3 @@ -#ifndef __WINLOOP_COMPAT_H__ -#define __WINLOOP_COMPAT_H__ - #include #include #include @@ -13,6 +10,7 @@ #include #include #endif + #include "Python.h" #include "uv.h" @@ -33,17 +31,6 @@ # include #else # define PLATFORM_IS_LINUX 0 -#endif - -#ifdef __APPLE__ -# define EPOLL_CTL_DEL 2 -struct epoll_event {}; -int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { - return 0; -}; -#endif - -#ifdef _WIN32 # define EPOLL_CTL_DEL 2 /* error C2016: C requires that a struct or union have at least one member on Windows with default compilation flags. Therefore put dummy field for now. */ @@ -53,6 +40,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { }; #endif + #ifdef _WIN32 int SIGCHLD = 0; int SO_REUSEPORT = 0; @@ -118,7 +106,6 @@ int getuid() { #endif -/* TODO: (Vizonex) add static keyword to these functions */ #if PY_VERSION_HEX < 0x03070100 PyObject * Context_CopyCurrent(void) { @@ -149,133 +136,6 @@ int Context_Exit(PyObject *ctx) { #endif - -/* Originally apart of loop.pyx via calling context.run -moved here so we can began optimizing more -reason why something like this is more costly is because -we have to find the .run method if these were C Functions instead -This Call would no longer be needed and we skip right to the -meat of the function (run) immediately however, we can go further -to optimize that code too. - -Before: -cdef inline run_in_context1(context, method, arg): - Py_INCREF(method) - try: - return context.run(method, arg) - finally: - Py_DECREF(method) - -After: -cdef inline run_in_context1(context, method, arg): - Py_INCREF(method) - try: - return Context_RunNoArgs(context, method) - finally: - Py_DECREF(method) -*/ - -/* Context.run is literally this code right here referenced from python 3.15.1 - -static PyObject * -context_run(PyObject *self, PyObject *const *args, - Py_ssize_t nargs, PyObject *kwnames) -{ - PyThreadState *ts = _PyThreadState_GET(); - - if (nargs < 1) { - _PyErr_SetString(ts, PyExc_TypeError, - "run() missing 1 required positional argument"); - return NULL; - } - - if (_PyContext_Enter(ts, self)) { - return NULL; - } - - PyObject *call_result = _PyObject_VectorcallTstate( - ts, args[0], args + 1, nargs - 1, kwnames); - - if (_PyContext_Exit(ts, self)) { - Py_XDECREF(call_result); - return NULL; - } - - return call_result; -} - -As we can see this code is not very expensive to maintain -at all and can be simply reproduced and improved upon -for our needs of being faster. - -Will name them after the different object calls made -to keep things less confusing. -We also eliminate needing to -find the run method in the ContextVar by doing so. -*/ - -static PyObject* Context_RunNoArgs(PyObject* context, PyObject* method){ - /* NOTE: Were looking for -1 but we can also say it's not - a no-zero value so we could even treat it as a true case... */ - if (Context_Enter(context)){ - return NULL; - } - - #if PY_VERSION_HEX >= 0x030a0000 - PyObject* call_result = PyObject_CallNoArgs(method); - #else - PyObject* call_result = PyObject_CallFunctionObjArgs(method, NULL); - #endif - - if (Context_Exit(context)){ - Py_XDECREF(call_result); - return NULL; - } - - return call_result; -} - -static PyObject* Context_RunOneArg(PyObject* context, PyObject* method, PyObject* arg){ - if (Context_Enter(context)){ - return NULL; - } - /* Introduced in 3.9 */ - /* NOTE: Kept around backwards compatability since the same features are planned for uvloop */ - #if PY_VERSION_HEX >= 0x03090000 - PyObject* call_result = PyObject_CallOneArg(method, arg); - #else /* verison < 3.9 */ - PyObject* call_result = PyObject_CallFunctionObjArgs(method, arg, NULL); - #endif - if (Context_Exit(context)){ - Py_XDECREF(call_result); - return NULL; - } - return call_result; -} - -static PyObject* Context_RunTwoArgs(PyObject* context, PyObject* method, PyObject* arg1, PyObject* arg2){ - /* Cython can't really do this PyObject array packing so writing this in C - has a really good advantage */ - if (Context_Enter(context)){ - return NULL; - } - #if PY_VERSION_HEX >= 0x03090000 - /* begin packing for call... */ - PyObject* args[2]; - args[0] = arg1; - args[1] = arg2; - PyObject* call_result = PyObject_Vectorcall(method, args, 2, NULL); - #else - PyObject* call_result = PyObject_CallFunctionObjArgs(method, arg1, arg2, NULL); - #endif - if (Context_Exit(context)){ - Py_XDECREF(call_result); - return NULL; - } - return call_result; -} - - /* inlined from cpython/Modules/signalmodule.c * https://github.com/python/cpython/blob/v3.13.0a6/Modules/signalmodule.c#L1931-L1951 * private _Py_RestoreSignals has been moved to CPython internals in Python 3.13 @@ -305,11 +165,17 @@ void PyOS_AfterFork_Parent() { void PyOS_AfterFork_Child() { return; } - #endif -// TODO: all versions of _PyEval_EvalFrameDefault so we can get rid of _noop.noop -// which would be a massive performance enhancement and allow pyinstaller to compile 3.9 -> 3.14 + +#ifdef _WIN32 +/* For some strange reason this structure does not want to show up + * when compiling in debug mode on 3.13+ on windows so lets redefine it as a macro */ + +/* IDK How big to make this so will just leave it at 1 incase somehow accidently exposed */ +#ifndef __Pyx_MonitoringEventTypes_CyGen_count +#define __Pyx_MonitoringEventTypes_CyGen_count 1 +#endif /* __Pyx_MonitoringEventTypes_CyGen_count */ +#endif -#endif // __WINLOOP_COMPAT_H__ \ No newline at end of file diff --git a/winloop/includes/consts.pxi b/winloop/includes/consts.pxi index 1ab112b..82f3c32 100644 --- a/winloop/includes/consts.pxi +++ b/winloop/includes/consts.pxi @@ -1,7 +1,4 @@ cdef enum: - -# There is some Good News, Cython plans not to deprecate -# DEF Constant Macros for now lets keep these enums but we will revert later.. UV_STREAM_RECV_BUF_SIZE = 256000 # 250kb FLOW_CONTROL_HIGH_WATER = 64 # KiB @@ -13,8 +10,10 @@ cdef enum: DEBUG_STACK_DEPTH = 10 + __PROCESS_DEBUG_SLEEP_AFTER_FORK = 1 + LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5 SSL_READ_MAX_SIZE = 256 * 1024 diff --git a/winloop/includes/fork_handler.h b/winloop/includes/fork_handler.h index b0049c5..68873ba 100644 --- a/winloop/includes/fork_handler.h +++ b/winloop/includes/fork_handler.h @@ -53,4 +53,4 @@ int pthread_atfork( } #endif -#endif \ No newline at end of file +#endif diff --git a/winloop/includes/python.pxd b/winloop/includes/python.pxd index 521b212..94007e5 100644 --- a/winloop/includes/python.pxd +++ b/winloop/includes/python.pxd @@ -1,6 +1,3 @@ -from cpython.object cimport PyObject - - cdef extern from "Python.h": int PY_VERSION_HEX @@ -20,10 +17,6 @@ cdef extern from "Python.h": cdef enum: PyBUF_WRITE - # This is For noop._noop to optimize the time calling it - PyObject* PyObject_CallNoArgs(object func) - - cdef extern from "includes/compat.h": @@ -31,20 +24,8 @@ cdef extern from "includes/compat.h": int Context_Enter(object) except -1 int Context_Exit(object) except -1 - # Custom functions for making context.run faster. - # meaning more speed for all handle calls being made - object Context_RunNoArgs(object context, object method) - object Context_RunOneArg(object context, object method, object arg) - object Context_RunTwoArgs(object context, object method, object arg1, object arg2) - - void PyOS_BeforeFork() void PyOS_AfterFork_Parent() void PyOS_AfterFork_Child() void _Py_RestoreSignals() - - # TODO: Might consider our own version or duplicates - # of _PyEval_EvalFrameDefault() so we can stop using noop._noop - # which has been the only problem with compiling with pyinstaller currently... - # This way, no need for hooks! diff --git a/winloop/includes/stdlib.pxi b/winloop/includes/stdlib.pxi index 1611ceb..259842c 100644 --- a/winloop/includes/stdlib.pxi +++ b/winloop/includes/stdlib.pxi @@ -25,14 +25,6 @@ import warnings import weakref -from cpython.object cimport PyObject, PyTypeObject -from cpython.time cimport PyTime_AsSecondsDouble, PyTime_t -from libc.stdint cimport uintptr_t - - -# TODO: Request or Propose to CPython Maintainers to allow public use of many of these functions via Capsule -# for improved performance. - cdef aio_get_event_loop = asyncio.get_event_loop cdef aio_CancelledError = asyncio.CancelledError cdef aio_InvalidStateError = asyncio.InvalidStateError @@ -45,37 +37,37 @@ cdef aio_wait = asyncio.wait cdef aio_wrap_future = asyncio.wrap_future cdef aio_logger = asyncio.log.logger cdef aio_iscoroutine = asyncio.iscoroutine -cdef aio_iscoroutinefunction = asyncio.iscoroutinefunction cdef aio_BaseProtocol = asyncio.BaseProtocol - cdef aio_Protocol = asyncio.Protocol cdef aio_isfuture = getattr(asyncio, 'isfuture', None) cdef aio_get_running_loop = getattr(asyncio, '_get_running_loop', None) cdef aio_set_running_loop = getattr(asyncio, '_set_running_loop', None) cdef aio_debug_wrapper = getattr(asyncio.coroutines, 'debug_wrapper', None) -cdef aio_AbstractChildWatcher = getattr(asyncio, 'AbstractChildWatcher', None) +cdef aio_AbstractChildWatcher = getattr(asyncio, "AbstractChildWatcher", ()) cdef aio_Transport = asyncio.Transport cdef aio_FlowControlMixin = asyncio.transports._FlowControlMixin cdef col_deque = collections.deque cdef col_Iterable = collections.abc.Iterable cdef col_Counter = collections.Counter - -# cdef col_OrderedDict = collections.OrderedDict +cdef col_OrderedDict = collections.OrderedDict cdef cc_ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor cdef cc_Future = concurrent.futures.Future +# windows needs access to errno for exception handling. +cdef win_errno = errno + cdef errno_EBADF = errno.EBADF cdef errno_EINVAL = errno.EINVAL -# TODO: Maybe we should try hacking in the partial code into cython instead? cdef ft_partial = functools.partial cdef gc_disable = gc.disable cdef iter_chain = itertools.chain cdef inspect_isgenerator = inspect.isgenerator +cdef inspect_iscoroutinefunction = inspect.iscoroutinefunction cdef int has_IPV6_V6ONLY = hasattr(socket, 'IPV6_V6ONLY') cdef int IPV6_V6ONLY = getattr(socket, 'IPV6_V6ONLY', -1) @@ -85,18 +77,6 @@ cdef int SO_BROADCAST = getattr(socket, 'SO_BROADCAST') cdef int SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', -1) cdef int socket_AI_CANONNAME = getattr(socket, 'AI_CANONNAME') - -# NOTE: Recently Managed to hack these in with CPython's _socket.CAPI Capsule they are left in the code. -# it's avalible on all versions of python currently so we may move to using it soon. -# SEE: https://gist.github.com/Vizonex/d24b8d4c22027449b3ec175583a93aea -# WARNING: Idea is still theorical and not ready! - - -# it is very likely that an array.array utility hack will force it in correctly -# doing so will make any of these functions run faster & smoother. -# (Mainly typechecks and returntypes only) - - cdef socket_gaierror = socket.gaierror cdef socket_error = socket.error cdef socket_timeout = socket.timeout @@ -123,6 +103,8 @@ cdef int socket_EAI_SOCKTYPE = getattr(socket, 'EAI_SOCKTYPE', -1) cdef str os_name = os.name +cdef os_path_isabs = os.path.isabs +cdef os_path_join = os.path.join cdef os_environ = os.environ cdef os_dup = os.dup cdef os_set_inheritable = os.set_inheritable @@ -169,12 +151,11 @@ cdef int subprocess_STDOUT = subprocess.STDOUT cdef int subprocess_DEVNULL = subprocess.DEVNULL cdef subprocess_SubprocessError = subprocess.SubprocessError +cdef int signal_SIGABRT = signal.SIGABRT +cdef int signal_SIGINT = signal.SIGINT cdef int signal_NSIG = signal.NSIG cdef signal_signal = signal.signal -cdef signal_siginterrupt = getattr(signal, 'siginterrupt', None) -# "I'll use SIGABRT Unless some other developer finds problems with this" - Vizonex -cdef signal_SIGABRT = signal.SIGABRT -cdef signal_SIGINT = signal.SIGINT +cdef signal_siginterrupt = getattr(signal, "siginterrupt", None) cdef signal_set_wakeup_fd = signal.set_wakeup_fd cdef signal_default_int_handler = signal.default_int_handler cdef signal_SIG_DFL = signal.SIG_DFL @@ -195,8 +176,8 @@ cdef py_inf = float('inf') # Cython doesn't clean-up imported objects properly in Py3 mode, -# so we delete refs to all modules manually (except sys and errno) -del asyncio, concurrent, collections +# so we delete refs to all modules manually (except sys) +del asyncio, concurrent, collections, errno del functools, inspect, itertools, socket, os, threading del signal, subprocess, ssl del time, traceback, warnings, weakref diff --git a/winloop/includes/system.pxd b/winloop/includes/system.pxd index 14e0cc7..96554a7 100644 --- a/winloop/includes/system.pxd +++ b/winloop/includes/system.pxd @@ -1,6 +1,5 @@ from libc.stdint cimport int8_t, uint64_t - cdef extern from "includes/compat.h" nogil: int ntohl(int) @@ -81,3 +80,33 @@ cdef extern from "includes/fork_handler.h": void (*prepare)(), void (*parent)(), void (*child)()) + + +cdef extern from * nogil: + """ +#ifdef _WIN32 +static inline uint64_t +__win_atomic_fetch_add(uint64_t *ptr, uint64_t val){ + return *ptr = *(volatile uint64_t *)ptr + val; +} + +static inline uint64_t +__win_atomic_fetch_sub(uint64_t *ptr, uint64_t val){ + return *ptr = *(volatile uint64_t *)ptr - val; +} + +#define __atomic_fetch_add(ptr, val, memorder) \ + __win_atomic_fetch_add(ptr, val) + +#define __atomic_fetch_sub(ptr, val, memorder) \ + __win_atomic_fetch_sub(ptr, val) + +/* We need ATOMIC RELAXED still */ +#define __ATOMIC_RELAXED 0 +#endif /* _WIN32 */ + """ + uint64_t __atomic_fetch_add(uint64_t *ptr, uint64_t val, int memorder) + uint64_t __atomic_fetch_sub(uint64_t *ptr, uint64_t val, int memorder) + + cdef enum: + __ATOMIC_RELAXED diff --git a/winloop/includes/uv.pxd b/winloop/includes/uv.pxd index edd6a38..4ad45cb 100644 --- a/winloop/includes/uv.pxd +++ b/winloop/includes/uv.pxd @@ -1,14 +1,11 @@ -from libc.stdint cimport int64_t, uint16_t, uint32_t, uint64_t - +from libc.stdint cimport uint16_t, uint32_t, uint64_t, int64_t cdef extern from "includes/compat.h" nogil: int getuid() - int SIGCHLD int SO_REUSEPORT from . cimport system - # This is an internal enum UV_HANDLE_READABLE from uv-common.h, used only by # handles/pipe.pyx to temporarily workaround a libuv issue libuv/libuv#2058, # before there is a proper fix in libuv. In short, libuv disallowed feeding a @@ -59,11 +56,13 @@ cdef extern from "uv.h" nogil: cdef int UV_EAI_SERVICE cdef int UV_EAI_SOCKTYPE + # Need for windows's sake + cdef int SO_BROADCAST + cdef int SOL_SOCKET cdef int SO_ERROR cdef int SO_REUSEADDR # use has_SO_REUSEPORT and SO_REUSEPORT in stdlib.pxi instead - cdef int SO_BROADCAST cdef int AF_INET cdef int AF_INET6 cdef int AF_UNIX @@ -79,6 +78,7 @@ cdef extern from "uv.h" nogil: cdef int SIGINT cdef int SIGHUP + cdef int SIGCHLD cdef int SIGKILL cdef int SIGTERM @@ -231,8 +231,11 @@ cdef extern from "uv.h" nogil: const char* uv_strerror(int err) const char* uv_err_name(int err) + + # Needed on windows int uv_translate_sys_error(int sys_errno) + ctypedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg) with gil ctypedef void (*uv_close_cb)(uv_handle_t* handle) with gil @@ -485,6 +488,7 @@ cdef extern from "uv.h" nogil: UV_WRITABLE_PIPE = 0x20, UV_NONBLOCK_PIPE = 0x40 + ctypedef union uv_stdio_container_data_u: uv_stream_t* stream int fd @@ -492,8 +496,7 @@ cdef extern from "uv.h" nogil: ctypedef struct uv_stdio_container_t: uv_stdio_flags flags uv_stdio_container_data_u data - - + ctypedef unsigned char uv_uid_t ctypedef unsigned char uv_gid_t @@ -506,7 +509,6 @@ cdef extern from "uv.h" nogil: unsigned int flags int stdio_count uv_stdio_container_t* stdio - # On the windows version (To my knowledge this was called uid_t on unix systems, but on windows it is called uv_uid_t) uv_uid_t uid uv_gid_t gid @@ -516,5 +518,4 @@ cdef extern from "uv.h" nogil: int uv_process_kill(uv_process_t* handle, int signum) unsigned int uv_version() - int uv_pipe(uv_file fds[2], int read_flags, int write_flags) diff --git a/winloop/loop.pxd b/winloop/loop.pxd index f025de2..7e58862 100644 --- a/winloop/loop.pxd +++ b/winloop/loop.pxd @@ -1,15 +1,16 @@ # cython: language_level=3 -from libc.stdint cimport int64_t, uint32_t, uint64_t +from .includes cimport uv +from .includes cimport system + +from libc.stdint cimport uint64_t, uint32_t, int64_t -from .includes cimport system, uv include "includes/consts.pxi" cdef extern from *: - # TODO: We can get rid of vint soon since volatile is now supported by Cython ctypedef int vint "volatile int" @@ -127,9 +128,6 @@ cdef class Loop: readonly uint64_t _debug_exception_handler_cnt - shlex _shlex_parser - - cdef _init_debug_fields(self) cdef _on_wake(self) @@ -145,11 +143,11 @@ cdef class Loop: cdef inline _queue_write(self, UVStream stream) cdef _exec_queued_writes(self) - cdef inline Handle _call_soon(self, object callback, object args, object context) + cdef inline _call_soon(self, object callback, object args, object context) cdef inline _append_ready_handle(self, Handle handle) cdef inline _call_soon_handle(self, Handle handle) - cdef TimerHandle _call_later(self, uint64_t delay, object callback, object args, + cdef _call_later(self, uint64_t delay, object callback, object args, object context) cdef void _handle_exception(self, object ex) @@ -157,9 +155,9 @@ cdef class Loop: cdef inline _is_main_thread(self) cdef inline _new_future(self) - cdef inline int _check_signal(self, sig) except -1 - cdef inline int _check_closed(self) except -1 - cdef inline int _check_thread(self) except -1 + cdef inline _check_signal(self, sig) + cdef inline _check_closed(self) + cdef inline _check_thread(self) cdef _getaddrinfo(self, object host, object port, int family, int type, @@ -172,8 +170,8 @@ cdef class Loop: cdef _fileobj_to_fd(self, fileobj) cdef _ensure_fd_no_transport(self, fd) - cdef int _track_process(self, UVProcess proc) except -1 - cdef int _untrack_process(self, UVProcess proc) except -1 + cdef _track_process(self, UVProcess proc) + cdef _untrack_process(self, UVProcess proc) cdef _add_reader(self, fd, Handle handle) cdef _has_reader(self, fd) @@ -199,7 +197,7 @@ cdef class Loop: cdef _handle_signal(self, sig) cdef _read_from_self(self) - cdef inline int _ceval_process_signals(self) except -1 + cdef inline _ceval_process_signals(self) cdef _invoke_signals(self, bytes data) cdef _set_coroutine_debug(self, bint enabled) @@ -223,8 +221,6 @@ include "handles/pipe.pxd" include "handles/process.pxd" include "handles/fsevent.pxd" -include "shlex.pxd" - include "request.pxd" include "sslproto.pxd" diff --git a/winloop/loop.pyi b/winloop/loop.pyi index bac9e75..9c8c462 100644 --- a/winloop/loop.pyi +++ b/winloop/loop.pyi @@ -1,41 +1,41 @@ import asyncio import ssl +import sys from socket import AddressFamily, SocketKind, _Address, _RetAddress, socket from typing import ( IO, Any, + Awaitable, Callable, + Dict, + Generator, + List, + Optional, + Sequence, + Tuple, TypeVar, + Union, overload, ) -from collections.abc import Awaitable, Generator, Sequence -_T = TypeVar("_T") -_Context = dict[str, Any] +_T = TypeVar('_T') +_Context = Dict[str, Any] _ExceptionHandler = Callable[[asyncio.AbstractEventLoop, _Context], Any] -_SSLContext = bool | None | ssl.SSLContext +_SSLContext = Union[bool, None, ssl.SSLContext] _ProtocolT = TypeVar("_ProtocolT", bound=asyncio.BaseProtocol) class Loop: def call_soon( - self, callback: Callable[..., Any], *args: Any, context: Any | None = ... + self, callback: Callable[..., Any], *args: Any, context: Optional[Any] = ... ) -> asyncio.Handle: ... def call_soon_threadsafe( - self, callback: Callable[..., Any], *args: Any, context: Any | None = ... + self, callback: Callable[..., Any], *args: Any, context: Optional[Any] = ... ) -> asyncio.Handle: ... def call_later( - self, - delay: float, - callback: Callable[..., Any], - *args: Any, - context: Any | None = ..., + self, delay: float, callback: Callable[..., Any], *args: Any, context: Optional[Any] = ... ) -> asyncio.TimerHandle: ... def call_at( - self, - when: float, - callback: Callable[..., Any], - *args: Any, - context: Any | None = ..., + self, when: float, callback: Callable[..., Any], *args: Any, context: Optional[Any] = ... ) -> asyncio.TimerHandle: ... def time(self) -> float: ... def stop(self) -> None: ... @@ -48,52 +48,52 @@ class Loop: def create_future(self) -> asyncio.Future[Any]: ... def create_task( self, - coro: Awaitable[_T] | Generator[Any, None, _T], + coro: Union[Awaitable[_T], Generator[Any, None, _T]], *, - name: str | None = ..., + name: Optional[str] = ..., ) -> asyncio.Task[_T]: ... def set_task_factory( self, - factory: Callable[ - [asyncio.AbstractEventLoop, Generator[Any, None, _T]], asyncio.Future[_T] - ] - | None, + factory: Optional[ + Callable[[asyncio.AbstractEventLoop, Generator[Any, None, _T]], asyncio.Future[_T]] + ], ) -> None: ... def get_task_factory( self, - ) -> ( - Callable[ - [asyncio.AbstractEventLoop, Generator[Any, None, _T]], asyncio.Future[_T] - ] - | None - ): ... + ) -> Optional[ + Callable[[asyncio.AbstractEventLoop, Generator[Any, None, _T]], asyncio.Future[_T]] + ]: ... @overload def run_until_complete(self, future: Generator[Any, None, _T]) -> _T: ... @overload def run_until_complete(self, future: Awaitable[_T]) -> _T: ... async def getaddrinfo( self, - host: str | bytes | None, - port: str | bytes | int | None, + host: Optional[Union[str, bytes]], + port: Optional[Union[str, bytes, int]], *, family: int = ..., type: int = ..., proto: int = ..., flags: int = ..., - ) -> list[ - tuple[ + ) -> List[ + Tuple[ AddressFamily, SocketKind, int, str, - tuple[str, int] | tuple[str, int, int, int], + Union[Tuple[str, int], Tuple[str, int, int, int]], ] ]: ... async def getnameinfo( self, - sockaddr: tuple[str, int] | tuple[str, int, int] | tuple[str, int, int, int], + sockaddr: Union[ + Tuple[str, int], + Tuple[str, int, int], + Tuple[str, int, int, int] + ], flags: int = ..., - ) -> tuple[str, str]: ... + ) -> Tuple[str, str]: ... async def start_tls( self, transport: asyncio.BaseTransport, @@ -101,15 +101,15 @@ class Loop: sslcontext: ssl.SSLContext, *, server_side: bool = ..., - server_hostname: str | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + server_hostname: Optional[str] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., ) -> asyncio.BaseTransport: ... @overload async def create_server( self, protocol_factory: asyncio.events._ProtocolFactory, - host: str | Sequence[str] | None = ..., + host: Optional[Union[str, Sequence[str]]] = ..., port: int = ..., *, family: int = ..., @@ -117,10 +117,10 @@ class Loop: sock: None = ..., backlog: int = ..., ssl: _SSLContext = ..., - reuse_address: bool | None = ..., - reuse_port: bool | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + reuse_address: Optional[bool] = ..., + reuse_port: Optional[bool] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., start_serving: bool = ..., ) -> asyncio.AbstractServer: ... @overload @@ -135,10 +135,10 @@ class Loop: sock: socket = ..., backlog: int = ..., ssl: _SSLContext = ..., - reuse_address: bool | None = ..., - reuse_port: bool | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + reuse_address: Optional[bool] = ..., + reuse_port: Optional[bool] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., start_serving: bool = ..., ) -> asyncio.AbstractServer: ... @overload @@ -153,10 +153,10 @@ class Loop: proto: int = ..., flags: int = ..., sock: None = ..., - local_addr: tuple[str, int] | None = ..., - server_hostname: str | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + local_addr: Optional[Tuple[str, int]] = ..., + server_hostname: Optional[str] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., ) -> tuple[asyncio.BaseProtocol, _ProtocolT]: ... @overload async def create_connection( @@ -171,36 +171,36 @@ class Loop: flags: int = ..., sock: socket, local_addr: None = ..., - server_hostname: str | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + server_hostname: Optional[str] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., ) -> tuple[asyncio.BaseProtocol, _ProtocolT]: ... async def create_unix_server( self, protocol_factory: asyncio.events._ProtocolFactory, - path: str | None = ..., + path: Optional[str] = ..., *, backlog: int = ..., - sock: socket | None = ..., + sock: Optional[socket] = ..., ssl: _SSLContext = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., start_serving: bool = ..., ) -> asyncio.AbstractServer: ... async def create_unix_connection( self, protocol_factory: Callable[[], _ProtocolT], - path: str | None = ..., + path: Optional[str] = ..., *, ssl: _SSLContext = ..., - sock: socket | None = ..., - server_hostname: str | None = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + sock: Optional[socket] = ..., + server_hostname: Optional[str] = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., ) -> tuple[asyncio.BaseProtocol, _ProtocolT]: ... def default_exception_handler(self, context: _Context) -> None: ... - def get_exception_handler(self) -> _ExceptionHandler | None: ... - def set_exception_handler(self, handler: _ExceptionHandler | None) -> None: ... + def get_exception_handler(self) -> Optional[_ExceptionHandler]: ... + def set_exception_handler(self, handler: Optional[_ExceptionHandler]) -> None: ... def call_exception_handler(self, context: _Context) -> None: ... def add_reader(self, fd: Any, callback: Callable[..., Any], *args: Any) -> None: ... def remove_reader(self, fd: Any) -> None: ... @@ -209,23 +209,19 @@ class Loop: async def sock_recv(self, sock: socket, nbytes: int) -> bytes: ... async def sock_recv_into(self, sock: socket, buf: bytearray) -> int: ... async def sock_sendall(self, sock: socket, data: bytes) -> None: ... - async def sock_accept(self, sock: socket) -> tuple[socket, _RetAddress]: ... + async def sock_accept(self, sock: socket) -> Tuple[socket, _RetAddress]: ... async def sock_connect(self, sock: socket, address: _Address) -> None: ... async def sock_recvfrom(self, sock: socket, bufsize: int) -> bytes: ... - async def sock_recvfrom_into( - self, sock: socket, buf: bytearray, nbytes: int = ... - ) -> int: ... - async def sock_sendto( - self, sock: socket, data: bytes, address: _Address - ) -> None: ... + async def sock_recvfrom_into(self, sock: socket, buf: bytearray, nbytes: int = ...) -> int: ... + async def sock_sendto(self, sock: socket, data: bytes, address: _Address) -> None: ... async def connect_accepted_socket( self, protocol_factory: Callable[[], _ProtocolT], sock: socket, *, ssl: _SSLContext = ..., - ssl_handshake_timeout: float | None = ..., - ssl_shutdown_timeout: float | None = ..., + ssl_handshake_timeout: Optional[float] = ..., + ssl_shutdown_timeout: Optional[float] = ..., ) -> tuple[asyncio.BaseProtocol, _ProtocolT]: ... async def run_in_executor( self, executor: Any, func: Callable[..., _T], *args: Any @@ -234,7 +230,7 @@ class Loop: async def subprocess_shell( self, protocol_factory: Callable[[], _ProtocolT], - cmd: bytes | str, + cmd: Union[bytes, str], *, stdin: Any = ..., stdout: Any = ..., @@ -263,21 +259,21 @@ class Loop: async def create_datagram_endpoint( self, protocol_factory: Callable[[], _ProtocolT], - local_addr: tuple[str, int] | None = ..., - remote_addr: tuple[str, int] | None = ..., + local_addr: Optional[Tuple[str, int]] = ..., + remote_addr: Optional[Tuple[str, int]] = ..., *, family: int = ..., proto: int = ..., flags: int = ..., - reuse_address: bool | None = ..., - reuse_port: bool | None = ..., - allow_broadcast: bool | None = ..., - sock: socket | None = ..., + reuse_address: Optional[bool] = ..., + reuse_port: Optional[bool] = ..., + allow_broadcast: Optional[bool] = ..., + sock: Optional[socket] = ..., ) -> tuple[asyncio.BaseProtocol, _ProtocolT]: ... async def shutdown_asyncgens(self) -> None: ... async def shutdown_default_executor( self, - timeout: float | None = ..., + timeout: Optional[float] = ..., ) -> None: ... # Loop doesn't implement these, but since they are marked as abstract in typeshed, # we have to put them in so mypy thinks the base methods are overridden @@ -286,7 +282,7 @@ class Loop: transport: asyncio.BaseTransport, file: IO[bytes], offset: int = ..., - count: int | None = ..., + count: Optional[int] = ..., *, fallback: bool = ..., ) -> int: ... @@ -295,7 +291,7 @@ class Loop: sock: socket, file: IO[bytes], offset: int = ..., - count: int | None = ..., + count: Optional[int] = ..., *, - fallback: bool = ..., + fallback: bool = ... ) -> int: ... diff --git a/winloop/loop.pyx b/winloop/loop.pyx index 5f2a7c0..47e7ea7 100644 --- a/winloop/loop.pyx +++ b/winloop/loop.pyx @@ -1,55 +1,50 @@ -# cython: language_level=3, embedsignature=True, freethreading_compatible = True +# cython: language_level=3, embedsignature=True, freethreading_compatible=True import asyncio - cimport cython -from cpython.buffer cimport (Py_buffer, PyBUF_SIMPLE, PyBUF_WRITABLE, - PyBuffer_Release, PyObject_GetBuffer) -from cpython.bytes cimport (PyBytes_AS_STRING, PyBytes_AsString, - PyBytes_AsStringAndSize, PyBytes_CheckExact) -from cpython.exc cimport PyErr_CheckSignals, PyErr_Occurred, PyErr_SetObject -from cpython.object cimport Py_SIZE, PyObject -from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New -# (Winloop) We need some cleaver hacky techniques for + +from .includes.debug cimport UVLOOP_DEBUG +from .includes cimport uv +from .includes cimport system +from .includes.python cimport ( + PY_VERSION_HEX, + PyMem_RawMalloc, PyMem_RawFree, + PyMem_RawCalloc, PyMem_RawRealloc, + PyUnicode_EncodeFSDefault, + PyErr_SetInterrupt, + _Py_RestoreSignals, + Context_CopyCurrent, + Context_Enter, + Context_Exit, + PyMemoryView_FromMemory, PyBUF_WRITE, + PyMemoryView_FromObject, PyMemoryView_Check, + PyOS_AfterFork_Parent, PyOS_AfterFork_Child, + PyOS_BeforeFork, + PyUnicode_FromString +) +from .includes.flowcontrol cimport add_flowcontrol_defaults + +from libc.stdint cimport uint64_t +from libc.string cimport memset, strerror, memcpy +from libc cimport errno + +# Winloop Comment: We need some cleaver hacky techniques for # preventing slow spawnning processes for MSVC from cpython.pystate cimport (PyGILState_Ensure, PyGILState_Release, PyGILState_STATE) -from cpython.pythread cimport PyThread_get_thread_ident -from cpython.ref cimport Py_DECREF, Py_INCREF, Py_XDECREF, Py_XINCREF -from cpython.set cimport PySet_Add, PySet_Discard -from libc cimport errno -from libc.stdint cimport uint64_t, uintptr_t -from libc.string cimport memcpy, memset, strerror +from cpython cimport PyObject +from cpython cimport PyErr_CheckSignals, PyErr_Occurred +from cpython cimport PyThread_get_thread_ident +from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF +from cpython cimport ( + PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, + Py_buffer, PyBytes_AsString, PyBytes_CheckExact, + PyBytes_AsStringAndSize, + Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE +) +from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer -from .includes cimport system, uv -from .includes.debug cimport UVLOOP_DEBUG -from .includes.flowcontrol cimport add_flowcontrol_defaults -from .includes.python cimport (PY_VERSION_HEX, Context_CopyCurrent, - Context_Enter, Context_Exit, PyBUF_WRITE, - PyErr_SetInterrupt, PyMem_RawCalloc, - PyMem_RawFree, PyMem_RawMalloc, - PyMem_RawRealloc, PyMemoryView_Check, - PyMemoryView_FromMemory, - PyMemoryView_FromObject, PyObject_CallNoArgs, - PyOS_AfterFork_Child, PyOS_AfterFork_Parent, - PyOS_BeforeFork, PyUnicode_EncodeFSDefault, - PyUnicode_FromString, _Py_RestoreSignals, - Context_RunNoArgs, - Context_RunOneArg, - Context_RunTwoArgs - ) - -# NOTE: Keep if we need to revert at any point in time... -from ._noop import noop - -# NOTE: This has a theoretical chance of hepling to safely bypass the required _noop module... -# The only thing that will need simulations is hitting Ctrl+C on a keyboard which is not easy -# to simulate. For now I'll comment this out we can go back to it in a later winloop 0.2.XX version -# __noop_locals = {} -# exec("def noop(): return", {}, __noop_locals) -# cdef object noop = __noop_locals['noop'] -# # never need __noop_locals again... -# del __noop_locals +from . import _noop include "includes/stdlib.pxi" @@ -101,101 +96,28 @@ cdef inline socket_dec_io_ref(sock): cdef inline run_in_context(context, method): - # This method is internally used to workaround a reference issue that in - # certain circumstances, inlined context.run() will not hold a reference to - # the given method instance, which - if deallocated - will cause segfault. - # See also: edgedb/edgedb#2222 - Py_INCREF(method) + Context_Enter(context) try: - return Context_RunNoArgs(context, method) + return method() finally: - Py_DECREF(method) + Context_Exit(context) cdef inline run_in_context1(context, method, arg): - Py_INCREF(method) + Context_Enter(context) try: - return Context_RunOneArg(context, method, arg) + return method(arg) finally: - Py_DECREF(method) + Context_Exit(context) cdef inline run_in_context2(context, method, arg1, arg2): - Py_INCREF(method) + Context_Enter(context) try: - return Context_RunTwoArgs(context, method, arg1, arg2) + return method(arg1, arg2) finally: - Py_DECREF(method) - - -def list2cmdline(seq): - """ - Translate a sequence of arguments into a command line - string, using the same rules as the MS C runtime: - - 1) Arguments are delimited by white space, which is either a - space or a tab. - - 2) A string surrounded by double quotation marks is - interpreted as a single argument, regardless of white space - contained within. A quoted string can be embedded in an - argument. - - 3) A double quotation mark preceded by a backslash is - interpreted as a literal double quotation mark. - - 4) Backslashes are interpreted literally, unless they - immediately precede a double quotation mark. - - 5) If backslashes immediately precede a double quotation mark, - every pair of backslashes is interpreted as a literal - backslash. If the number of backslashes is odd, the last - backslash escapes the next double quotation mark as - described in rule 3. - """ - - # See - # http://msdn.microsoft.com/en-us/library/17w5ykft.aspx - # or search http://msdn.microsoft.com for - # "Parsing C++ Command-Line Arguments" - result = [] - needquote = False - for arg in map(os.fsdecode, seq): - bs_buf = [] - - # Add a space to separate this argument from the others - if result: - result.append(' ') - - needquote = (" " in arg) or ("\t" in arg) or not arg - if needquote: - result.append('"') - - for c in arg: - if c == '\\': - # Don't know if we need to double yet. - bs_buf.append(c) - elif c == '"': - # Double backslashes. - result.append('\\' * len(bs_buf)*2) - bs_buf = [] - result.append('\\"') - else: - # Normal char - if bs_buf: - result.extend(bs_buf) - bs_buf = [] - result.append(c) - - # Add remaining backslashes, if any. - if bs_buf: - result.extend(bs_buf) + Context_Exit(context) - if needquote: - result.extend(bs_buf) - result.append('"') - - return ''.join(result).replace('\\"', '"') # Used for deprecation and removal of `loop.create_datagram_endpoint()`'s # *reuse_address* parameter @@ -296,13 +218,6 @@ cdef class Loop: self._servers = set() - # Newer Class Member _shlex_parser was added to solve parsing shell related - # inputs, posix is set to false so that file-paths are not stripped away (Especially Windows Paths) - self._shlex_parser = shlex(posix=False) - self._shlex_parser.whitespace_split = True - self._shlex_parser.commenters = '' - - cdef inline _is_main_thread(self): cdef uint64_t main_thread_id = system.MAIN_THREAD_ID if system.MAIN_THREAD_ID_SET == 0: @@ -459,20 +374,13 @@ cdef class Loop: def __sighandler(self, signum, frame): self._signals.add(signum) - cdef inline int _ceval_process_signals(self) except -1: + cdef inline _ceval_process_signals(self): # Invoke CPython eval loop to let process signals. - if PyErr_CheckSignals() < 0: - return -1 - - # Might be gotten rid of soon, since we want to improve evaluation: - # SEE: https://github.com/Vizonex/Winloop/issues/58 - + PyErr_CheckSignals() # Calling a pure-Python function will invoke # _PyEval_EvalFrameDefault which will process # pending signal callbacks. - if PyObject_CallNoArgs(noop) == NULL: # Might raise ^C - return -1 - return 0 + _noop.noop() # Might raise ^C cdef _read_from_self(self): cdef bytes sigdata @@ -753,7 +661,7 @@ cdef class Loop: if len(self._queued_streams) == 0: self.handler_check__exec_writes.stop() - cdef inline Handle _call_soon(self, object callback, object args, object context): + cdef inline _call_soon(self, object callback, object args, object context): cdef Handle handle handle = new_Handle(self, callback, args, context) self._call_soon_handle(handle) @@ -768,7 +676,7 @@ cdef class Loop: if not self.handler_idle.running: self.handler_idle.start() - cdef TimerHandle _call_later(self, uint64_t delay, object callback, object args, + cdef _call_later(self, uint64_t delay, object callback, object args, object context): return TimerHandle(self, callback, args, delay, context) @@ -781,37 +689,29 @@ cdef class Loop: # Exit ASAP self._stop(None) - cdef inline int _check_signal(self, sig) except -1: - cdef int _sig + cdef inline _check_signal(self, sig): if not isinstance(sig, int): - PyErr_SetObject(TypeError, 'sig must be an int, not {!r}'.format(sig)) - return -1 - _sig = sig - if not (1 <= _sig < signal_NSIG): - PyErr_SetObject(ValueError, 'sig {} out of range(1, {})'.format(sig, signal_NSIG)) - return -1 - return 0 - - cdef inline int _check_closed(self) except -1: + raise TypeError('sig must be an int, not {!r}'.format(sig)) + + if not (1 <= sig < signal_NSIG): + raise ValueError( + 'sig {} out of range(1, {})'.format(sig, signal_NSIG)) + + cdef inline _check_closed(self): if self._closed == 1: - # code executes faster if thrown the CPython way... - PyErr_SetObject(RuntimeError, 'Event loop is closed') - return -1 - return 0 + raise RuntimeError('Event loop is closed') - cdef inline int _check_thread(self) except -1: + cdef inline _check_thread(self): if self._thread_id == 0: - return 0 + return cdef uint64_t thread_id thread_id = PyThread_get_thread_ident() if thread_id != self._thread_id: - PyErr_SetObject(RuntimeError, + raise RuntimeError( "Non-thread-safe operation invoked on an event loop other " "than the current one") - return -1 - return 0 cdef inline _new_future(self): return aio_Future(loop=self) @@ -819,11 +719,11 @@ cdef class Loop: cdef _track_transport(self, UVBaseTransport transport): self._transports[transport._fileno()] = transport - cdef int _track_process(self, UVProcess proc) except -1: - return PySet_Add(self._processes, proc) + cdef _track_process(self, UVProcess proc): + self._processes.add(proc) - cdef int _untrack_process(self, UVProcess proc) except -1: - return PySet_Discard(self._processes, proc) + cdef _untrack_process(self, UVProcess proc): + self._processes.discard(proc) cdef _fileobj_to_fd(self, fileobj): """Return a file descriptor from a file object. @@ -1248,7 +1148,7 @@ cdef class Loop: def _get_backend_id(self): """This method is used by uvloop tests and is not part of the API.""" - return int( uv.uv_backend_fd(self.uvloop)) + return uv.uv_backend_fd(self.uvloop) cdef _print_debug_info(self): cdef: @@ -1495,7 +1395,7 @@ cdef class Loop: def set_debug(self, enabled): self._debug = bool(enabled) if self.is_running(): - self.call_soon_threadsafe(self._set_coroutine_debug, self._debug) + self.call_soon_threadsafe(self._set_coroutine_debug, self._debug) def is_running(self): """Return whether the event loop is currently running.""" @@ -1713,16 +1613,6 @@ cdef class Loop: ssl_shutdown_timeout=ssl_shutdown_timeout, call_connection_made=False) - stream_buff = None - if hasattr(protocol, '_stream_reader'): - stream_reader = protocol._stream_reader - if stream_reader is not None: - stream_buff = getattr(stream_reader, '_buffer', None) - - if stream_buff is not None: - ssl_protocol._incoming.write(stream_buff) - stream_buff.clear() - # Pause early so that "ssl_protocol.data_received()" doesn't # have a chance to get called before "ssl_protocol.connection_made()". transport.pause_reading() @@ -2842,7 +2732,7 @@ cdef class Loop: return transport, protocol def run_in_executor(self, executor, func, *args): - if aio_iscoroutine(func) or aio_iscoroutinefunction(func): + if aio_iscoroutine(func) or inspect_iscoroutinefunction(func): raise TypeError("coroutines cannot be used with run_in_executor()") self._check_closed() @@ -2930,41 +2820,30 @@ cdef class Loop: shell=True, **kwargs): - cdef: - list args - object compsec - - # NOTE: shell always has to be true so no need to - # check it a second time. + cdef list args if not shell: raise ValueError("shell must be True") - args = [] - - if system.PLATFORM_IS_WINDOWS: - # CHANGED WINDOWS Shell see : https://github.com/libuv/libuv/pull/2627 for more details... - - # Winloop comment: args[0].split(' ') instead of args to pass some tests in test_process - + if not system.PLATFORM_IS_WINDOWS: + args = [cmd] + if shell: + args = [b'/bin/sh', b'-c'] + args + else: + # SEE: https://github.com/libuv/libuv/pull/2627 + # See subprocess.py for the mirror of this code. - comspec = os.environ.get("ComSpec") - if comspec: - system_root = os.environ.get("SystemRoot", '') - comspec = os.path.join(system_root, 'System32', 'cmd.exe') - if not os.path.isabs(comspec): + comspec = os_environ.get("ComSpec") + if not comspec: + system_root = os_environ.get("SystemRoot", '') + comspec = os_path_join(system_root, 'System32', 'cmd.exe') + if not os_path_isabs(comspec): raise FileNotFoundError('shell not found: neither %ComSpec% nor %SystemRoot% is set') - args.append(comspec) + args = [comspec] args.append('/c') - args.extend(self._shlex_parser.split(cmd)) - - else: - args.append(b'/bin/sh') - args.append(b'-c') - args.extend(self._shlex_parser.split(cmd)) - - return await self.__subprocess_run( - protocol_factory, args, shell=True, + args.append(cmd) + + return await self.__subprocess_run(protocol_factory, args, shell=True, **kwargs) @cython.iterable_coroutine @@ -3034,7 +2913,7 @@ cdef class Loop: return transp, proto def add_signal_handler(self, sig, callback, *args): - """Add a handler for a signal. + """Add a handler for a signal. UNIX only. Raise ValueError if the signal number is invalid or uncatchable. Raise RuntimeError if there is a problem setting up the handler. @@ -3048,7 +2927,7 @@ cdef class Loop: 'the main thread') if (aio_iscoroutine(callback) - or aio_iscoroutinefunction(callback)): + or inspect_iscoroutinefunction(callback)): raise TypeError( "coroutines cannot be used with add_signal_handler()") @@ -3082,15 +2961,16 @@ cdef class Loop: self._signal_handlers[sig] = h try: + # Register a dummy signal handler to ask Python to write the signal + # number in the wakeup file descriptor. if not system.PLATFORM_IS_WINDOWS: - # Register a dummy signal handler to ask Python to write the signal - # number in the wakeup file descriptor. signal_signal(sig, self.__sighandler) # Set SA_RESTART to limit EINTR occurrences. signal_siginterrupt(sig, False) else: - # XXX "WINDOWS DOESN'T Have a signal_siginterrupt so I'll do this until someone smarter than me wants a tackle at it" - Vizonex + # Windows doesn't have sig_interrupt function. + # Something else must be attempted instead. signal_signal(signal_SIGINT, self.__sighandler) except OSError as exc: @@ -3499,7 +3379,6 @@ include "handles/pipe.pyx" include "handles/process.pyx" include "handles/fsevent.pyx" -include "shlex.pyx" include "request.pyx" include "dns.pyx" include "sslproto.pyx" diff --git a/winloop/lru.pyx b/winloop/lru.pyx index 2bd64d4..ec27827 100644 --- a/winloop/lru.pyx +++ b/winloop/lru.pyx @@ -1,113 +1,14 @@ -from cpython.object cimport PyObject as PyObject - - -cdef extern from "Python.h": - """ -// Screw the compiler I'm hacking it in... -typedef struct _odictnode _ODictNode; - -struct _odictobject { - PyDictObject od_dict; /* the underlying dict */ - _ODictNode *od_first; /* first node in the linked list, if any */ - _ODictNode *od_last; /* last node in the linked list, if any */ - /* od_fast_nodes, od_fast_nodes_size and od_resize_sentinel are managed - * by _odict_resize(). - * Note that we rely on implementation details of dict for both. */ - _ODictNode **od_fast_nodes; /* hash table that mirrors the dict table */ - Py_ssize_t od_fast_nodes_size; - void *od_resize_sentinel; /* changes if odict should be resized */ - - size_t od_state; /* incremented whenever the LL changes */ - PyObject *od_inst_dict; /* OrderedDict().__dict__ */ - PyObject *od_weakreflist; /* holds weakrefs to the odict */ -}; - - - -struct _odictnode { - PyObject *key; - Py_hash_t hash; - _ODictNode *next; - _ODictNode *prev; -}; - -// Incase Removed in the future I'll hack in the Full header file as well. - -#ifndef Py_ODICTOBJECT_H -#define Py_ODICTOBJECT_H -#ifdef __cplusplus -extern "C" { -#endif - - -/* OrderedDict */ -/* This API is optional and mostly redundant. */ - -#ifndef Py_LIMITED_API - -typedef struct _odictobject PyODictObject; - -PyAPI_DATA(PyTypeObject) PyODict_Type; -PyAPI_DATA(PyTypeObject) PyODictIter_Type; -PyAPI_DATA(PyTypeObject) PyODictKeys_Type; -PyAPI_DATA(PyTypeObject) PyODictItems_Type; -PyAPI_DATA(PyTypeObject) PyODictValues_Type; - -#define PyODict_Check(op) PyObject_TypeCheck(op, &PyODict_Type) -#define PyODict_CheckExact(op) Py_IS_TYPE(op, &PyODict_Type) -#define PyODict_SIZE(op) PyDict_GET_SIZE((op)) - -PyAPI_FUNC(PyObject *) PyODict_New(void); -PyAPI_FUNC(int) PyODict_SetItem(PyObject *od, PyObject *key, PyObject *item); -PyAPI_FUNC(int) PyODict_DelItem(PyObject *od, PyObject *key); - -/* wrappers around PyDict* functions */ -#define PyODict_GetItem(od, key) PyDict_GetItem(_PyObject_CAST(od), key) -#define PyODict_GetItemWithError(od, key) \ - PyDict_GetItemWithError(_PyObject_CAST(od), key) -#define PyODict_Contains(od, key) PyDict_Contains(_PyObject_CAST(od), key) -#define PyODict_Size(od) PyDict_Size(_PyObject_CAST(od)) -#define PyODict_GetItemString(od, key) \ - PyDict_GetItemString(_PyObject_CAST(od), key) - -#endif - -#ifdef __cplusplus -} -#endif -#endif /* !Py_ODICTOBJECT_H */ - """ - # Sad how ODict never really caught anybody's attention - # but it's OrderedDict Under the hood... - - ctypedef struct PyODictObject: - pass - - ctypedef class _collections.OrderedDict [object PyODictObject, check_size ignore]: - pass - - # OrderedDict cannot be defined in any of it's C Methods due to GCC - # SEE: https://github.com/Vizonex/Winloop/issues/64 - - bint PyODict_Check(object op) - bint PyODict_CheckExact(object op) - Py_ssize_t PyODict_SIZE(object op) - int PyODict_Contains(object op, object key) except -1 - OrderedDict PyODict_New() - - int PyODict_SetItem(object od, object key, object item) except -1 - int PyODict_DelItem(object od, object key) except -1 - PyObject* PyODict_GetItem(object od, object key) - object PyODict_GetItemWithError(object od, object key) +cdef object _LRU_MARKER = object() @cython.final cdef class LruCache: cdef: - OrderedDict _dict - Py_ssize_t _maxsize + object _dict + int _maxsize object _dict_move_to_end + object _dict_get # We use an OrderedDict for LRU implementation. Operations: # @@ -127,28 +28,25 @@ cdef class LruCache: # entries dict, whereas the unused one will group in the # beginning of it. - def __init__(self, *, Py_ssize_t maxsize): + def __init__(self, *, maxsize): if maxsize <= 0: raise ValueError( f'maxsize is expected to be greater than 0, got {maxsize}') - self._dict = PyODict_New() - self._dict_move_to_end = getattr(self._dict, "move_to_end") + self._dict = col_OrderedDict() + self._dict_move_to_end = self._dict.move_to_end + self._dict_get = self._dict.get self._maxsize = maxsize - cdef get(self, object key, object default): - cdef PyObject* _o - cdef object o - _o = PyODict_GetItem(self._dict, key) - if _o == NULL: + cdef get(self, key, default): + o = self._dict_get(key, _LRU_MARKER) + if o is _LRU_MARKER: return default - o = _o - Py_INCREF(o) self._dict_move_to_end(key) # last=True return o - cdef inline bint needs_cleanup(self): - return PyODict_SIZE(self._dict) > self._maxsize + cdef inline needs_cleanup(self): + return len(self._dict) > self._maxsize cdef inline cleanup_one(self): k, _ = self._dict.popitem(last=False) @@ -161,21 +59,21 @@ cdef class LruCache: def __setitem__(self, key, o): if key in self._dict: - PyODict_SetItem(self._dict, key, o) + self._dict[key] = o self._dict_move_to_end(key) # last=True else: - PyODict_SetItem(self._dict, key, o) + self._dict[key] = o while self.needs_cleanup(): self.cleanup_one() def __delitem__(self, key): - PyODict_DelItem(self._dict, key) + del self._dict[key] def __contains__(self, key): - return PyODict_Contains(self._dict, key) + return key in self._dict def __len__(self): - return PyODict_SIZE(self._dict) + return len(self._dict) def __iter__(self): return iter(self._dict) diff --git a/winloop/server.pyx b/winloop/server.pyx index 845bcfd..dd69636 100644 --- a/winloop/server.pyx +++ b/winloop/server.pyx @@ -1,5 +1,3 @@ -import asyncio - cdef class Server: def __cinit__(self, Loop loop): @@ -113,7 +111,7 @@ cdef class Server: try: await self._serving_forever_fut - except asyncio.CancelledError: + except aio_CancelledError: try: self.close() await self.wait_closed() diff --git a/winloop/sslproto.pxd b/winloop/sslproto.pxd index 3da10f0..edc0f50 100644 --- a/winloop/sslproto.pxd +++ b/winloop/sslproto.pxd @@ -53,13 +53,13 @@ cdef class SSLProtocol: object _sslobj object _sslobj_read object _sslobj_write + object _sslobj_pending object _incoming object _incoming_write object _outgoing object _outgoing_read char* _ssl_buffer size_t _ssl_buffer_len - object _ssl_buffer_view SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -84,55 +84,61 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle - cdef _set_app_protocol(self, app_protocol) - cdef _wakeup_waiter(self, exc=*) - cdef _get_extra_info(self, name, default=*) - cdef _set_state(self, SSLProtocolState new_state) + # Instead of doing python calls, c methods *_impl are called directly + # from stream.pyx + + cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size) + cdef inline buffer_updated_impl(self, size_t nbytes) + + cdef inline _set_app_protocol(self, app_protocol) + cdef inline _wakeup_waiter(self, exc=*) + cdef inline _get_extra_info(self, name, default=*) + cdef inline _set_state(self, SSLProtocolState new_state) # Handshake flow - cdef _start_handshake(self) - cdef _check_handshake_timeout(self) - cdef _do_handshake(self) - cdef _on_handshake_complete(self, handshake_exc) + cdef inline _start_handshake(self) + cdef inline _check_handshake_timeout(self) + cdef inline _do_handshake(self) + cdef inline _on_handshake_complete(self, handshake_exc) # Shutdown flow - cdef _start_shutdown(self, object context=*) - cdef _check_shutdown_timeout(self) - cdef _do_read_into_void(self, object context) - cdef _do_flush(self, object context=*) - cdef _do_shutdown(self, object context=*) - cdef _on_shutdown_complete(self, shutdown_exc) - cdef _abort(self, exc) + cdef inline _start_shutdown(self, object context=*) + cdef inline _check_shutdown_timeout(self) + cdef inline _do_read_into_void(self, object context) + cdef inline _do_flush(self, object context=*) + cdef inline _do_shutdown(self, object context=*) + cdef inline _on_shutdown_complete(self, shutdown_exc) + cdef inline _abort(self, exc) # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) - cdef _do_write(self) - cdef _process_outgoing(self) + cdef inline _write_appdata(self, list_of_data, object context) + cdef inline _do_write(self) + cdef inline _process_outgoing(self) # Incoming flow - cdef _do_read(self) - cdef _do_read__buffered(self) - cdef _do_read__copied(self) - cdef _call_eof_received(self, object context=*) + cdef inline _do_read(self) + cdef inline _do_read__buffered(self) + cdef inline _do_read__copied(self) + cdef inline _call_eof_received(self, object context=*) # Flow control for writes from APP socket - cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) - cdef _set_write_buffer_limits(self, high=*, low=*) + cdef inline _control_app_writing(self, object context=*) + cdef inline size_t _get_write_buffer_size(self) + cdef inline _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket - cdef _pause_reading(self) - cdef _resume_reading(self, object context) + cdef inline _pause_reading(self) + cdef inline _resume_reading(self, object context) # Flow control for reads from SSL socket - cdef _control_ssl_reading(self) - cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) - cdef _fatal_error(self, exc, message=*) + cdef inline _control_ssl_reading(self) + cdef inline _set_read_buffer_limits(self, high=*, low=*) + cdef inline size_t _get_read_buffer_size(self) + cdef inline _fatal_error(self, exc, message=*) diff --git a/winloop/sslproto.pyx b/winloop/sslproto.pyx index 42bb764..f76474e 100644 --- a/winloop/sslproto.pyx +++ b/winloop/sslproto.pyx @@ -204,11 +204,8 @@ cdef class SSLProtocol: self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) if not self._ssl_buffer: raise MemoryError() - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE) def __dealloc__(self): - self._ssl_buffer_view = None PyMem_RawFree(self._ssl_buffer) self._ssl_buffer = NULL self._ssl_buffer_len = 0 @@ -358,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - def get_buffer(self, n): + cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -367,11 +364,11 @@ cdef class SSLProtocol: if not self._ssl_buffer: raise MemoryError() self._ssl_buffer_len = want - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, want, PyBUF_WRITE) - return self._ssl_buffer_view - def buffer_updated(self, nbytes): + buf[0] = self._ssl_buffer + buf_size[0] = self._ssl_buffer_len + + cdef buffer_updated_impl(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) @@ -387,6 +384,18 @@ cdef class SSLProtocol: elif self._state == SHUTDOWN: self._do_shutdown() + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + cdef: + char* buf + size_t buf_size + + self.get_buffer_impl(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): + self.buffer_updated_impl(nbytes) + def eof_received(self): """Called when the other end of the low-level stream is half-closed. @@ -480,6 +489,7 @@ cdef class SSLProtocol: server_hostname=self._server_hostname) self._sslobj_read = self._sslobj.read self._sslobj_write = self._sslobj.write + self._sslobj_pending = self._sslobj.pending except Exception as ex: self._on_handshake_complete(ex) else: @@ -696,7 +706,10 @@ cdef class SSLProtocol: if not self._ssl_writing_paused: data = self._outgoing_read() if len(data): - self._transport.write(data) + if isinstance(self._transport, UVStream): + (self._transport).write(data) + else: + self._transport.write(data) # Incoming flow @@ -719,48 +732,91 @@ cdef class SSLProtocol: cdef _do_read__buffered(self): cdef: - Py_buffer pybuf - bint pybuf_inited = False - size_t wants, offset = 0 - int count = 1 - object buf + Py_ssize_t total_pending = (self._incoming.pending + + self._sslobj_pending()) + # Ask for a little extra in case when decrypted data is bigger + # than original + object app_buffer = self._app_protocol_get_buffer( + total_pending + 256) + Py_ssize_t app_buffer_size = len(app_buffer) + + if app_buffer_size == 0: + return - buf = self._app_protocol_get_buffer(self._get_read_buffer_size()) - wants = len(buf) + cdef: + Py_ssize_t last_bytes_read = -1 + Py_ssize_t total_bytes_read = 0 + Py_buffer pybuf + bint pybuf_initialized = False try: - count = self._sslobj_read(wants, buf) - - if count > 0: - offset = count - if offset < wants: - PyObject_GetBuffer(buf, &pybuf, PyBUF_WRITABLE) - pybuf_inited = True - while offset < wants: - buf = PyMemoryView_FromMemory( - (pybuf.buf) + offset, - wants - offset, + # SSLObject.read may not return all available data in one go. + # We have to keep calling read until it throw SSLWantReadError. + # However, throwing SSLWantReadError is very expensive even in + # the master trunk of cpython. + # See https://github.com/python/cpython/issues/123954 + + # One way to reduce reliance on SSLWantReadError is to check + # self._incoming.pending > 0 and SSLObject.pending() > 0. + # SSLObject.read may still throw SSLWantReadError even when + # self._incoming.pending > 0 and SSLObject.pending() == 0, + # but this should happen relatively rarely, only when ssl frame + # is partially received. + + # This optimization works really well especially for peers + # exchanging small messages and wanting to have minimal latency. + + # self._incoming.pending means how much data hasn't + # been processed by ssl yet (read: "still encrypted"). The final + # unencrypted data size maybe different. + + # self._sslobj.pending() means how much data has been already + # decrypted and can be directly read with SSLObject.read. + + # Run test_create_server_ssl_over_ssl to reproduce different cases + # for this method. + while total_pending > 0: + if total_bytes_read > 0: + if not pybuf_initialized: + PyObject_GetBuffer(app_buffer, &pybuf, PyBUF_WRITABLE) + pybuf_initialized = True + + app_buffer = PyMemoryView_FromMemory( + (pybuf.buf) + total_bytes_read, + app_buffer_size - total_bytes_read, PyBUF_WRITE) - count = self._sslobj_read(wants - offset, buf) - if count > 0: - offset += count - else: - break - else: + + last_bytes_read = self._sslobj_read( + app_buffer_size - total_bytes_read, app_buffer) + total_bytes_read += last_bytes_read + + if last_bytes_read == 0: + break + + # User buffer may not fit all available data. + if total_bytes_read == app_buffer_size: self._loop._call_soon_handle( new_MethodHandle(self._loop, "SSLProtocol._do_read", - self._do_read, + self._do_read, None, # current context is good self)) + break + + total_pending = (self._incoming.pending + + self._sslobj_pending()) except ssl_SSLAgainErrors as exc: pass finally: - if pybuf_inited: + if pybuf_initialized: PyBuffer_Release(&pybuf) - if offset > 0: - self._app_protocol_buffer_updated(offset) - if not count: + + if total_bytes_read > 0: + self._app_protocol_buffer_updated(total_bytes_read) + + # SSLObject.read() may return 0 instead of throwing SSLWantReadError + # This indicates that we reached EOF + if last_bytes_read == 0: # close_notify self._call_eof_received() self._start_shutdown() @@ -772,7 +828,8 @@ cdef class SSLProtocol: bint zero = True, one = False try: - while True: + while (self._incoming.pending > 0 or + self._sslobj_pending() > 0): chunk = self._sslobj_read(SSL_READ_MAX_SIZE) if not chunk: break