From c7c2fade03b2e423286554a3c2fcbaec029491d0 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 10:35:13 -0800 Subject: [PATCH 1/6] add proper Python packaging with pyproject.toml Add build-system section, move dev deps to optional, add MANIFEST.in for C++/Cython source distribution, and export BASEDIR/INCLUDE_PATH from __init__.py for consumers. Co-Authored-By: Claude Opus 4.6 --- MANIFEST.in | 1 + msgq/__init__.py | 6 +++++- pyproject.toml | 43 ++++++++++++++++++++++--------------------- 3 files changed, 28 insertions(+), 22 deletions(-) create mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..15db22e67 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +recursive-include msgq *.h *.cc *.pyx *.pxd diff --git a/msgq/__init__.py b/msgq/__init__.py index 574e100a8..e7620248d 100644 --- a/msgq/__init__.py +++ b/msgq/__init__.py @@ -1,4 +1,8 @@ -# must be built with scons +import os + +BASEDIR = os.path.dirname(os.path.abspath(__file__)) +INCLUDE_PATH = os.path.abspath(os.path.join(BASEDIR, "../")) + from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event from msgq.ipc_pyx import MultiplePublishersError, IpcError diff --git a/pyproject.toml b/pyproject.toml index efbb89ad7..05e208f74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,33 +1,34 @@ [project] name = "msgq" -version = "0.0.1" -description = "Code powering the comma.ai panda" -readme = "README.md" +version = "0.1.0" +description = "Lock-free IPC message queue and VisionIPC" requires-python = ">=3.11,<3.13" license = {text = "MIT"} authors = [{name = "comma.ai"}] -classifiers = [ - "Natural Language :: English", - "Programming Language :: Python :: 3", - "Topic :: System :: Hardware", -] dependencies = [ - "setuptools", # for distutils - "Cython", - "scons", - "ruff", - "parameterized", - "coverage", "numpy", - "pytest", - "pytest-retry", - "cppcheck", - "cpplint", - "codespell", - "ty", - "lefthook", ] +[project.optional-dependencies] +dev = [ + "setuptools", "Cython", "scons", "ruff", + "parameterized", "coverage", "numpy", + "pytest", "pytest-retry", + "cppcheck", "cpplint", "codespell", "ty", "lefthook", +] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +include-package-data = true + +[tool.setuptools.package-data] +"msgq" = ["*.h", "*.cc", "*.pyx", "*.pxd"] +"msgq.visionipc" = ["*.h", "*.cc", "*.pyx", "*.pxd"] +"msgq.logger" = ["*.h"] + # https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml [tool.ruff] lint.select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"] From d5fc68dfcf02d7c05b34f25a28615b0342bb6d27 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 10:45:44 -0800 Subject: [PATCH 2/6] guard Cython imports for build-time import compatibility Allow `import msgq` to succeed even when Cython extensions aren't built yet, so SConstruct can access BASEDIR/INCLUDE_PATH during build configuration. Co-Authored-By: Claude Opus 4.6 --- msgq/__init__.py | 93 +++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/msgq/__init__.py b/msgq/__init__.py index e7620248d..b5e4e6e5a 100644 --- a/msgq/__init__.py +++ b/msgq/__init__.py @@ -3,63 +3,66 @@ BASEDIR = os.path.dirname(os.path.abspath(__file__)) INCLUDE_PATH = os.path.abspath(os.path.join(BASEDIR, "../")) -from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ - set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event -from msgq.ipc_pyx import MultiplePublishersError, IpcError +try: + from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ + set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event + from msgq.ipc_pyx import MultiplePublishersError, IpcError +except ImportError: + pass # Cython extensions not yet built (e.g., during scons build configuration) +else: + from typing import Optional, List, Union -from typing import Optional, List, Union + assert MultiplePublishersError + assert IpcError + assert toggle_fake_events + assert set_fake_prefix + assert get_fake_prefix + assert delete_fake_prefix + assert wait_for_one_event -assert MultiplePublishersError -assert IpcError -assert toggle_fake_events -assert set_fake_prefix -assert get_fake_prefix -assert delete_fake_prefix -assert wait_for_one_event + NO_TRAVERSAL_LIMIT = 2**64-1 -NO_TRAVERSAL_LIMIT = 2**64-1 + context = Context() -context = Context() + def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle: + ident = identifier if identifier is not None else get_fake_prefix() + handle = SocketEventHandle(endpoint, ident, override) + if override: + handle.enabled = enable -def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle: - ident = identifier if identifier is not None else get_fake_prefix() - handle = SocketEventHandle(endpoint, ident, override) - if override: - handle.enabled = enable + return handle - return handle + def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket: + sock = PubSocket() + sock.connect(context, endpoint, segment_size) + return sock -def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket: - sock = PubSocket() - sock.connect(context, endpoint, segment_size) - return sock + def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", + conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket: + sock = SubSocket() + sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size) -def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", - conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket: - sock = SubSocket() - sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size) + if timeout is not None: + sock.setTimeout(timeout) - if timeout is not None: - sock.setTimeout(timeout) + if poller is not None: + poller.registerSocket(sock) + return sock - if poller is not None: - poller.registerSocket(sock) - return sock + def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]: + """Receive all message currently available on the queue""" + ret: List[bytes] = [] + while 1: + if wait_for_one and len(ret) == 0: + dat = sock.receive() + else: + dat = sock.receive(non_blocking=True) -def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]: - """Receive all message currently available on the queue""" - ret: List[bytes] = [] - while 1: - if wait_for_one and len(ret) == 0: - dat = sock.receive() - else: - dat = sock.receive(non_blocking=True) + if dat is None: + break - if dat is None: - break + ret.append(dat) - ret.append(dat) - - return ret + return ret From 194c6c3f35dca8c9126f09a089094ce7f9357195 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 10:48:31 -0800 Subject: [PATCH 3/6] build Cython extensions during pip install Add setup.py with Extension definitions for ipc_pyx and visionipc_pyx Cython extensions so they compile during `pip install -e .`. Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 2 +- setup.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 setup.py diff --git a/pyproject.toml b/pyproject.toml index 05e208f74..d75632f83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dev = [ ] [build-system] -requires = ["setuptools"] +requires = ["setuptools", "Cython", "numpy"] build-backend = "setuptools.build_meta" [tool.setuptools] diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..3ebecb006 --- /dev/null +++ b/setup.py @@ -0,0 +1,47 @@ +import os +import platform +import numpy as np +from setuptools import setup +from Cython.Build import cythonize +from setuptools import Extension + +msgq_dir = os.path.dirname(os.path.abspath(__file__)) + +# Common C++ compile args +cpp_args = ["-std=c++17", "-fPIC", "-O2"] + +# msgq core C++ sources +msgq_cc_sources = [ + "msgq/ipc.cc", + "msgq/event.cc", + "msgq/impl_msgq.cc", + "msgq/impl_fake.cc", + "msgq/msgq.cc", +] + +# visionipc C++ sources +vipc_cc_sources = [ + "msgq/visionipc/visionipc.cc", + "msgq/visionipc/visionipc_server.cc", + "msgq/visionipc/visionipc_client.cc", + "msgq/visionipc/visionbuf.cc", +] + +extensions = [ + Extension( + "msgq.ipc_pyx", + sources=["msgq/ipc_pyx.pyx"] + msgq_cc_sources, + language="c++", + extra_compile_args=cpp_args, + include_dirs=[msgq_dir, np.get_include()], + ), + Extension( + "msgq.visionipc.visionipc_pyx", + sources=["msgq/visionipc/visionipc_pyx.pyx"] + vipc_cc_sources + msgq_cc_sources, + language="c++", + extra_compile_args=cpp_args, + include_dirs=[msgq_dir, np.get_include()], + ), +] + +setup(ext_modules=cythonize(extensions, language_level="3")) From 2ee626d1335ca27d178b3bd5c6a684f25b05fd90 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 12:34:29 -0800 Subject: [PATCH 4/6] fix: add ty suppression for Cython attribute warnings The try/except ImportError pattern in __init__.py (needed for scons build configuration) makes ty think Cython-exported symbols might not exist. Suppress possibly-missing-attribute since these are all from compiled Cython modules. Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index d75632f83..27ea44e3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ exclude = ["site_scons/"] [tool.ty.rules] # Cython modules are compiled at build time, not available for static analysis unresolved-import = "ignore" +possibly-missing-attribute = "ignore" [tool.pytest.ini_options] addopts = "--durations=10" From dee57264e4e615a861c274af956e1ca24dc15028 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 12:39:52 -0800 Subject: [PATCH 5/6] fix: remove unused platform import from setup.py Co-Authored-By: Claude Opus 4.6 --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index 3ebecb006..d8a3e7931 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ import os -import platform import numpy as np from setuptools import setup from Cython.Build import cythonize From 32b2404f68eab3c9c0869777a055b272e1ebf0e5 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 21 Feb 2026 13:25:29 -0800 Subject: [PATCH 6/6] clean up __init__.py: remove try/except ImportError guard Cython extensions are always built by pip install now, so the try/except guard is no longer needed. Co-Authored-By: Claude Opus 4.6 --- msgq/__init__.py | 86 +++++++++++++++++++++--------------------------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/msgq/__init__.py b/msgq/__init__.py index b5e4e6e5a..54cc1427a 100644 --- a/msgq/__init__.py +++ b/msgq/__init__.py @@ -1,68 +1,56 @@ import os +from typing import Optional, List, Union BASEDIR = os.path.dirname(os.path.abspath(__file__)) INCLUDE_PATH = os.path.abspath(os.path.join(BASEDIR, "../")) -try: - from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ - set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event - from msgq.ipc_pyx import MultiplePublishersError, IpcError -except ImportError: - pass # Cython extensions not yet built (e.g., during scons build configuration) -else: - from typing import Optional, List, Union +from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ + set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event +from msgq.ipc_pyx import MultiplePublishersError, IpcError - assert MultiplePublishersError - assert IpcError - assert toggle_fake_events - assert set_fake_prefix - assert get_fake_prefix - assert delete_fake_prefix - assert wait_for_one_event +NO_TRAVERSAL_LIMIT = 2**64-1 - NO_TRAVERSAL_LIMIT = 2**64-1 +context = Context() - context = Context() +def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle: + ident = identifier if identifier is not None else get_fake_prefix() + handle = SocketEventHandle(endpoint, ident, override) + if override: + handle.enabled = enable - def fake_event_handle(endpoint: str, identifier: Optional[Union[str, bytes]] = None, override: bool = True, enable: bool = False) -> SocketEventHandle: - ident = identifier if identifier is not None else get_fake_prefix() - handle = SocketEventHandle(endpoint, ident, override) - if override: - handle.enabled = enable + return handle - return handle +def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket: + sock = PubSocket() + sock.connect(context, endpoint, segment_size) + return sock - def pub_sock(endpoint: str, segment_size: int = 0) -> PubSocket: - sock = PubSocket() - sock.connect(context, endpoint, segment_size) - return sock +def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", + conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket: + sock = SubSocket() + sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size) - def sub_sock(endpoint: str, poller: Optional[Poller] = None, addr: str = "127.0.0.1", - conflate: bool = False, timeout: Optional[int] = None, segment_size: int = 0) -> SubSocket: - sock = SubSocket() - sock.connect(context, endpoint, addr.encode('utf8'), conflate, segment_size) + if timeout is not None: + sock.setTimeout(timeout) - if timeout is not None: - sock.setTimeout(timeout) + if poller is not None: + poller.registerSocket(sock) + return sock - if poller is not None: - poller.registerSocket(sock) - return sock +def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]: + """Receive all message currently available on the queue""" + ret: List[bytes] = [] + while 1: + if wait_for_one and len(ret) == 0: + dat = sock.receive() + else: + dat = sock.receive(non_blocking=True) - def drain_sock_raw(sock: SubSocket, wait_for_one: bool = False) -> List[bytes]: - """Receive all message currently available on the queue""" - ret: List[bytes] = [] - while 1: - if wait_for_one and len(ret) == 0: - dat = sock.receive() - else: - dat = sock.receive(non_blocking=True) + if dat is None: + break - if dat is None: - break + ret.append(dat) - ret.append(dat) - - return ret + return ret