Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
from can import typechecking
from can.message import Message

try:
from .precision_tools.precision_tools import ScheduledExecutor

_has_precision_tools = True
except (ImportError, FileNotFoundError):
_has_precision_tools = False


if TYPE_CHECKING:
from can.bus import BusABC

Expand Down Expand Up @@ -386,3 +394,103 @@ def _run(self) -> None:
delay_ns = msg_due_time_ns - time.perf_counter_ns()
if delay_ns > 0:
time.sleep(delay_ns / NANOSECONDS_IN_SECOND)


class ScheduledExecutorCyclicSendTask(
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
):
"""
使用PrecisionSleep.dll(通过 ScheduledExecutor)的循环发送任务。
"""

_executor: ScheduledExecutor | None = None
_executor_lock = threading.Lock()

def __init__(
self,
bus: "BusABC",
lock: threading.Lock,
messages: Sequence[Message] | Message,
period: float,
duration: float | None = None,
on_error: Callable[[Exception], bool] | None = None,
autostart: bool = True,
modifier_callback: Callable[[Message], None] | None = None,
) -> None:
super().__init__(messages, period, duration)
self.bus = bus
self.send_lock = lock
self.stopped = True
self.on_error = on_error
self.modifier_callback = modifier_callback
self._task_id: int | None = None
self._msg_index = 0

with ScheduledExecutorCyclicSendTask._executor_lock:
if ScheduledExecutorCyclicSendTask._executor is None:
# 为所有任务使用一个共享的执行器
# 实测单线程比多线程周期发送更稳定
ScheduledExecutorCyclicSendTask._executor = ScheduledExecutor(1)

if autostart:
self.start()

def _task_callback(self, task_id: int, user_data: object) -> None:
"""由 ScheduledExecutor 执行的回调函数。"""
if self.stopped:
return

if self.end_time is not None and time.perf_counter() >= self.end_time:
self.stop()
return

try:
message = self.messages[self._msg_index]
if self.modifier_callback is not None:
self.modifier_callback(message)
with self.send_lock:
self.bus.send(message)
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)
if self.on_error is None or not self.on_error(exc):
self.stop()
# 重新抛出异常以便可能被执行器的错误处理捕获
raise

self._msg_index = (self._msg_index + 1) % len(self.messages)

def start(self) -> None:
if not self.stopped:
return # 已经在运行

self.stopped = False
self.end_time = (
time.perf_counter() + self.duration if self.duration else None
)
self._msg_index = 0

executor = ScheduledExecutorCyclicSendTask._executor
if executor is None or executor.is_shutdown():
# 如果初始化正确,这种情况不应该发生
raise RuntimeError("ScheduledExecutor 不可用。")

period_ms = self.period * 1000
if period_ms <= 0:
raise ValueError("周期必须为正数。")

self._task_id = executor.schedule_at_fixed_rate(
self._task_callback, initial_delay_ms=0, period_ms=period_ms
)

def stop(self) -> None:
if self.stopped:
return

self.stopped = True
executor = ScheduledExecutorCyclicSendTask._executor
if self._task_id is not None and executor and not executor.is_shutdown():
executor.cancel_task(self._task_id)
self._task_id = None

# 注意:我们不需要在这里关闭共享的执行器。
# 它可以在更高的层级进行管理,例如,当总线关闭时。
26 changes: 20 additions & 6 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import contextlib
import logging
import sys
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterator, Sequence
Expand All @@ -17,9 +18,16 @@
from typing_extensions import Self

import can.typechecking
from can.broadcastmanager import CyclicSendTaskABC, ThreadBasedCyclicSendTask
from can.broadcastmanager import (
CyclicSendTaskABC,
ThreadBasedCyclicSendTask,
_has_precision_tools,
)
from can.message import Message

if _has_precision_tools:
from can.broadcastmanager import ScheduledExecutorCyclicSendTask

LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -319,11 +327,17 @@ def _send_periodic_internal(
:meth:`~can.broadcastmanager.CyclicTask.stop` method.
"""
if not hasattr(self, "_lock_send_periodic"):
# Create a send lock for this bus, but not for buses which override this method
self._lock_send_periodic = ( # pylint: disable=attribute-defined-outside-init
threading.Lock()
)
task = ThreadBasedCyclicSendTask(
# Create a lock for each bus instance to prevent issues with sharing the bus
# via multiple threads.
self._lock_send_periodic = threading.Lock()

task_class: type[ThreadBasedCyclicSendTask]
if sys.platform == "win32" and _has_precision_tools:
task_class = ScheduledExecutorCyclicSendTask
else:
task_class = ThreadBasedCyclicSendTask

task = task_class(
bus=self,
lock=self._lock_send_periodic,
messages=msgs,
Expand Down
Binary file added can/precision_tools/PrecisionSleep.dll
Binary file not shown.
Loading