diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index 1fea9ac50..6f01d91d3 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -22,6 +22,14 @@ from can import typechecking from can.message import Message +try: + from .precision_tools.precision_tools import ScheduledExecutor + + _has_precision_tools = True +except (ImportError, FileNotFoundError): + _has_precision_tools = False + + if TYPE_CHECKING: from can.bus import BusABC @@ -386,3 +394,103 @@ def _run(self) -> None: delay_ns = msg_due_time_ns - time.perf_counter_ns() if delay_ns > 0: time.sleep(delay_ns / NANOSECONDS_IN_SECOND) + + +class ScheduledExecutorCyclicSendTask( + LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC +): + """ + 使用PrecisionSleep.dll(通过 ScheduledExecutor)的循环发送任务。 + """ + + _executor: ScheduledExecutor | None = None + _executor_lock = threading.Lock() + + def __init__( + self, + bus: "BusABC", + lock: threading.Lock, + messages: Sequence[Message] | Message, + period: float, + duration: float | None = None, + on_error: Callable[[Exception], bool] | None = None, + autostart: bool = True, + modifier_callback: Callable[[Message], None] | None = None, + ) -> None: + super().__init__(messages, period, duration) + self.bus = bus + self.send_lock = lock + self.stopped = True + self.on_error = on_error + self.modifier_callback = modifier_callback + self._task_id: int | None = None + self._msg_index = 0 + + with ScheduledExecutorCyclicSendTask._executor_lock: + if ScheduledExecutorCyclicSendTask._executor is None: + # 为所有任务使用一个共享的执行器 + # 实测单线程比多线程周期发送更稳定 + ScheduledExecutorCyclicSendTask._executor = ScheduledExecutor(1) + + if autostart: + self.start() + + def _task_callback(self, task_id: int, user_data: object) -> None: + """由 ScheduledExecutor 执行的回调函数。""" + if self.stopped: + return + + if self.end_time is not None and time.perf_counter() >= self.end_time: + self.stop() + return + + try: + message = self.messages[self._msg_index] + if self.modifier_callback is not None: + self.modifier_callback(message) + with self.send_lock: + self.bus.send(message) + except Exception as exc: # pylint: disable=broad-except + log.exception(exc) + if self.on_error is None or not self.on_error(exc): + self.stop() + # 重新抛出异常以便可能被执行器的错误处理捕获 + raise + + self._msg_index = (self._msg_index + 1) % len(self.messages) + + def start(self) -> None: + if not self.stopped: + return # 已经在运行 + + self.stopped = False + self.end_time = ( + time.perf_counter() + self.duration if self.duration else None + ) + self._msg_index = 0 + + executor = ScheduledExecutorCyclicSendTask._executor + if executor is None or executor.is_shutdown(): + # 如果初始化正确,这种情况不应该发生 + raise RuntimeError("ScheduledExecutor 不可用。") + + period_ms = self.period * 1000 + if period_ms <= 0: + raise ValueError("周期必须为正数。") + + self._task_id = executor.schedule_at_fixed_rate( + self._task_callback, initial_delay_ms=0, period_ms=period_ms + ) + + def stop(self) -> None: + if self.stopped: + return + + self.stopped = True + executor = ScheduledExecutorCyclicSendTask._executor + if self._task_id is not None and executor and not executor.is_shutdown(): + executor.cancel_task(self._task_id) + self._task_id = None + + # 注意:我们不需要在这里关闭共享的执行器。 + # 它可以在更高的层级进行管理,例如,当总线关闭时。 diff --git a/can/bus.py b/can/bus.py index 03425caaa..719ddbae1 100644 --- a/can/bus.py +++ b/can/bus.py @@ -4,6 +4,7 @@ import contextlib import logging +import sys import threading from abc import ABC, abstractmethod from collections.abc import Callable, Iterator, Sequence @@ -17,9 +18,16 @@ from typing_extensions import Self import can.typechecking -from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask +from can.broadcastmanager import ( + CyclicSendTaskABC, + ThreadBasedCyclicSendTask, + _has_precision_tools, +) from can.message import Message +if _has_precision_tools: + from can.broadcastmanager import ScheduledExecutorCyclicSendTask + LOG = logging.getLogger(__name__) @@ -319,11 +327,17 @@ def _send_periodic_internal( :meth:`~can.broadcastmanager.CyclicTask.stop` method. """ if not hasattr(self, "_lock_send_periodic"): - # Create a send lock for this bus, but not for buses which override this method - self._lock_send_periodic = ( # pylint: disable=attribute-defined-outside-init - threading.Lock() - ) - task = ThreadBasedCyclicSendTask( + # Create a lock for each bus instance to prevent issues with sharing the bus + # via multiple threads. + self._lock_send_periodic = threading.Lock() + + task_class: type[ThreadBasedCyclicSendTask] + if sys.platform == "win32" and _has_precision_tools: + task_class = ScheduledExecutorCyclicSendTask + else: + task_class = ThreadBasedCyclicSendTask + + task = task_class( bus=self, lock=self._lock_send_periodic, messages=msgs, diff --git a/can/precision_tools/PrecisionSleep.dll b/can/precision_tools/PrecisionSleep.dll new file mode 100644 index 000000000..1d950f05b Binary files /dev/null and b/can/precision_tools/PrecisionSleep.dll differ diff --git a/can/precision_tools/precision_tools.py b/can/precision_tools/precision_tools.py new file mode 100644 index 000000000..f8cbf2db2 --- /dev/null +++ b/can/precision_tools/precision_tools.py @@ -0,0 +1,307 @@ +# -*- coding: utf-8 -*- +""" +用于本地 ScheduledExecutor 和 PrecisionSleep DLL 的 Python 封装。 + +该模块提供两个主要类: +1. ScheduledExecutor: C++ 调度执行器的 Pythonic 封装, + 模仿了 Java 的 ScheduledThreadPoolExecutor 接口。 +2. PrecisionSleep: 高精度睡眠函数的封装。 + +注意:此模块依赖于名为 PrecisionSleep.dll 的本地 DLL。 +PrecisionSleep.dll仓库开源地址:https://github.com/fshoocn/precision_tools +""" + +import ctypes +import os +import sys +import threading +import time +from pathlib import Path +from typing import Callable, Dict, Optional + +""" +""" + +# 为任务回调定义函数指针类型 +# 第一个参数是任务ID (unsigned long long),第二个参数是用户数据 (py_object) +TaskCallbackFunc = ctypes.CFUNCTYPE(None, ctypes.c_ulonglong, ctypes.py_object) + +def _find_dll(dll_name: str = "PrecisionSleep.dll") -> Optional[Path]: + """ + 在通用编译目录或系统路径中查找 DLL。 + 在相对于当前脚本的典型 CMake 构建文件夹中搜索。 + """ + # 在脚本目录和父目录中搜索 + search_paths = [Path.cwd()] + for i in range(3): + search_paths.append(search_paths[-1].parent) + + # 常见的编译目录名称 + build_dirs = [ + "build", + "cmake-build-release", + "cmake-build-debug", + "cmake-build-release-visual-studio-amd64", + "out/build/x64-release", + "out/build/x64-debug", + # 当前precision_tools.py文件所在目录 + Path(__file__).parent + ] + + for base_path in search_paths: + # 检查基本路径本身 + if (base_path / dll_name).is_file(): + return base_path / dll_name + + # 检查通用编译目录 + for build_dir in build_dirs: + dll_path = base_path / build_dir / dll_name + if dll_path.is_file(): + return dll_path + + # 回退到系统范围搜索 + if sys.platform == "win32": + try: + # 使用系统的 DLL 搜索路径 + loader = ctypes.WinDLL(dll_name) + return Path(loader._name) + except (OSError, FileNotFoundError): + pass + + return None + + +class ScheduledExecutor: + """ + 一个用于本地 C++ 调度线程池执行器的 Python 封装。 + + 该类管理本地执行器的生命周期,提供调度任务的方法, + 并确保在 Python 和 C++ 之间安全地处理回调。 + """ + _dll_path = _find_dll() + if not _dll_path: + raise FileNotFoundError( + "找不到 PrecisionSleep.dll。 " + "请确保它位于系统 PATH 或常见的生成目录中。" + ) + + _dll = ctypes.CDLL(str(_dll_path)) + + def __init__(self, thread_pool_size: int = 4): + """ + 初始化调度执行器。 + :param thread_pool_size: 线程池中的线程数。 + """ + self._setup_function_signatures() + + self._executor = self._dll.ScheduledExecutor_create(thread_pool_size) + if not self._executor: + raise RuntimeError("创建本地调度执行器失败。") + + if not self._dll.ScheduledExecutor_initialize(self._executor): + self._dll.ScheduledExecutor_destroy(self._executor) + raise RuntimeError("初始化本地调度执行器失败。") + + self._active = True + # ctypes 的返回类型不能直接在类型提示中使用,因此我们使用 'object' + self._tasks: Dict[int, tuple] = {} + self._task_lock = threading.Lock() + + def _setup_function_signatures(self): + """为所有 DLL 函数设置 ctypes 的 argtypes 和 restypes。""" + # ScheduledExecutor 函数 + self._dll.ScheduledExecutor_create.argtypes = [ctypes.c_int] + self._dll.ScheduledExecutor_create.restype = ctypes.c_void_p + + self._dll.ScheduledExecutor_initialize.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_initialize.restype = ctypes.c_bool + + self._dll.ScheduledExecutor_shutdown.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_shutdown.restype = None + + self._dll.ScheduledExecutor_destroy.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_destroy.restype = None + + self._dll.ScheduledExecutor_isShutdown.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_isShutdown.restype = ctypes.c_bool + + self._dll.ScheduledExecutor_schedule.argtypes = [ctypes.c_void_p, TaskCallbackFunc, ctypes.py_object, ctypes.c_double] + self._dll.ScheduledExecutor_schedule.restype = ctypes.c_ulonglong + + self._dll.ScheduledExecutor_scheduleAtFixedRate.argtypes = [ctypes.c_void_p, TaskCallbackFunc, ctypes.py_object, ctypes.c_double, ctypes.c_double] + self._dll.ScheduledExecutor_scheduleAtFixedRate.restype = ctypes.c_ulonglong + + self._dll.ScheduledExecutor_cancelTask.argtypes = [ctypes.c_void_p, ctypes.c_ulonglong] + self._dll.ScheduledExecutor_cancelTask.restype = ctypes.c_bool + + self._dll.ScheduledExecutor_getThreadPoolSize.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_getThreadPoolSize.restype = ctypes.c_int + + self._dll.ScheduledExecutor_getPendingTaskCount.argtypes = [ctypes.c_void_p] + self._dll.ScheduledExecutor_getPendingTaskCount.restype = ctypes.c_ulonglong + + def schedule(self, task: Callable[[int, object], None], delay_ms: float, user_data=None) -> int: + """ + 调度一个一次性任务,在给定延迟后运行。 + :param task: 要执行的可调用对象。应接受两个参数: task_id (int) 和 user_data (任意对象)。 + :param delay_ms: 延迟时间(毫秒)。 + :param user_data: 可选的用户自定义数据,将传递给回调函数。 + :return: 已调度任务的 ID。 + """ + if not self._active: + raise RuntimeError("执行器已关闭。") + + # 创建包装器,将任务ID与用户数据传递给用户回调 + def wrapper(task_id, _user_data): + try: + task(task_id, _user_data) + except Exception as e: + print(f"任务 {task_id} 执行出错: {e}") + + c_callback = TaskCallbackFunc(wrapper) + task_id = self._dll.ScheduledExecutor_schedule(self._executor, c_callback, user_data, delay_ms) + + with self._task_lock: + self._tasks[task_id] = (c_callback, user_data) + + return task_id + + def schedule_at_fixed_rate(self, task: Callable[[int, object], None], initial_delay_ms: float, period_ms: float, user_data=None) -> int: + """ + 调度一个周期性任务,在初始延迟后以固定周期运行。 + :param task: 要周期性执行的可调用对象。应接受两个参数: task_id (int) 和 user_data (任意对象)。 + :param initial_delay_ms: 初始延迟(毫秒)。 + :param period_ms: 周期(毫秒)。 + :param user_data: 可选的用户自定义数据,将传递给回调函数。 + :return: 已调度任务的 ID。 + """ + if not self._active: + raise RuntimeError("执行器已关闭。") + + # 创建包装器,将任务ID与用户数据传递给用户回调 + def wrapper(task_id, _user_data): + try: + task(task_id, _user_data) + except Exception as e: + print(f"任务 {task_id} 执行出错: {e}") + + c_callback = TaskCallbackFunc(wrapper) + task_id = self._dll.ScheduledExecutor_scheduleAtFixedRate(self._executor, c_callback, user_data, initial_delay_ms, period_ms) + + with self._task_lock: + self._tasks[task_id] = (c_callback, user_data) + + return task_id + + def cancel_task(self, task_id: int) -> bool: + """ + 取消一个已调度的任务。 + :param task_id: 要取消的任务的 ID。 + :return: 如果任务成功取消,则为 True,否则为 False。 + """ + if not self._active: + return False + + result = self._dll.ScheduledExecutor_cancelTask(self._executor, task_id) + if result: + with self._task_lock: + self._tasks.pop(task_id, None) + return result + + def get_thread_pool_size(self) -> int: + """返回配置的线程池大小。""" + return self._dll.ScheduledExecutor_getThreadPoolSize(self._executor) + + def get_pending_task_count(self) -> int: + """返回待处理任务的大约数量。""" + return self._dll.ScheduledExecutor_getPendingTaskCount(self._executor) + + def is_shutdown(self) -> bool: + """检查执行器是否已关闭。""" + return self._dll.ScheduledExecutor_isShutdown(self._executor) + + def shutdown(self): + """ + 关闭执行器,允许正在运行的任务完成。 + 不会接受新任务。 + """ + if self._active: + self._dll.ScheduledExecutor_shutdown(self._executor) + self._active = False + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + # 本地 C++ 析构函数将被调用,它应该等待任务完成。 + # 在这里销毁句柄。 + if self._executor: + self._dll.ScheduledExecutor_destroy(self._executor) + self._executor = None + with self._task_lock: + self._tasks.clear() + + def __del__(self): + if self._active: + self.shutdown() + if self._executor: + self._dll.ScheduledExecutor_destroy(self._executor) + + +class PrecisionSleep: + """ + 本地 PrecisionSleep 函数的 Python 封装。 + """ + _dll_path = _find_dll() + if not _dll_path: + raise FileNotFoundError( + "找不到 PrecisionSleep.dll。 " + "请确保它位于系统 PATH 或常见的生成目录中。" + ) + + _dll = ctypes.CDLL(str(_dll_path)) + _initialized = False + + def __init__(self): + """初始化 PrecisionSleep 系统。""" + self._setup_function_signatures() + if not PrecisionSleep._initialized: + if not self._dll.PrecisionSleep_initialize(): + raise RuntimeError("初始化 PrecisionSleep 失败。") + PrecisionSleep._initialized = True + + def _setup_function_signatures(self): + """为所有 DLL 函数设置 ctypes 的 argtypes 和 restypes。""" + self._dll.PrecisionSleep_initialize.argtypes = [] + self._dll.PrecisionSleep_initialize.restype = ctypes.c_bool + + self._dll.PrecisionSleep_cleanup.argtypes = [] + self._dll.PrecisionSleep_cleanup.restype = None + + self._dll.PrecisionSleep_sleep.argtypes = [ctypes.c_double] + self._dll.PrecisionSleep_sleep.restype = ctypes.c_bool + + @staticmethod + def sleep(milliseconds: float) -> bool: + """ + 以高精度睡眠指定的持续时间。 + :param milliseconds: 睡眠持续时间(毫秒)。 + :return: 成功时为 True,失败时为 False。 + """ + if not PrecisionSleep._initialized: + raise RuntimeError("PrecisionSleep 未初始化。") + return PrecisionSleep._dll.PrecisionSleep_sleep(milliseconds) + + @staticmethod + def cleanup(): + """清理 PrecisionSleep 系统资源。""" + if PrecisionSleep._initialized: + PrecisionSleep._dll.PrecisionSleep_cleanup() + PrecisionSleep._initialized = False + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.cleanup() \ No newline at end of file diff --git a/doc/changelog.d/1729.added.md b/doc/changelog.d/1729.added.md new file mode 100644 index 000000000..c5a1aed0c --- /dev/null +++ b/doc/changelog.d/1729.added.md @@ -0,0 +1 @@ +增加ScheduledExecutorCyclicSendTask,优化在Windows下发送周期消息的稳定性和准确性 \ No newline at end of file