rs_ctrl_os 是一个用于构建分布式节点控制系统的小型运行时库,提供:
- 节点发现:基于 UDP 多播的心跳机制(
Heartbeat+ServiceRegistry) - 消息通信:基于 ZeroMQ 的 pub/sub 抽象(
PubSubManager) - 配置管理:TOML 配置加载 + 动态热更新(
ConfigManager/load_config_typed) - 时间同步:简单的主从时钟同步(
TimeSynchronizer) - 统一错误/日志:
RsCtrlError+tracing日志初始化
适合需要在局域网内跑多进程/多节点,进行“互相发现 + 消息分发 + 动态配置”的系统。
| 能力 | 说明 |
|---|---|
| StaticBase 与 [static_config] | 节点 ID、host、port、是否 master、publishers/subscribers 拓扑、static_nodes(IP fallback)、publish_hz/subscribe_hz、dynamic_load_enable 等运行所需的基础配置 |
| 配置加载机制 | 从 TOML 解析 [static_config],提供 load_config_rcos、load_config_typed、ConfigManager 等 API |
| dynamic 热更新 | 当 dynamic_load_enable=true 时,监听配置文件变化并热重载 [dynamic] 内容 |
| 消息通道 | ZMQ pub/sub、发现、时间同步、频率限速、原始字节透传(publish_raw / try_recv_raw) |
| 边界 | 说明 |
|---|---|
| [dynamic] 的结构与语义 | 框架只负责「加载并热更新」[dynamic],不定义其字段。每个应用自行定义 D: Deserialize,例如 CAN 接口列表、电机参数、相机参数等 |
| 业务数据内容 | 图像、点云等大体量数据应通过 topic 传输,不应塞进 TOML。配置中只放「如何连接、参数、schema 版本」等元信息 |
| 业务协议与编码 | 消息 payload 的序列化方式(bincode / raw / JPEG 等)由应用选择;框架提供 publish_topic(bincode)、publish_raw(透传)两种能力 |
[static_config]:框架强依赖,必须存在。包含my_id、host、port、is_master、publish_hz、subscribe_hz、dynamic_load_enable、publishers、subscribers、static_nodes(可选)等。[dynamic]:业务自由定义。框架不解析其具体字段,只负责按你提供的D反序列化并(可选)热更新。
例如:can_bridge 定义interfaces、devices;相机节点定义camera_id、resolution;点云节点定义voxel_size等。
以下能力由框架自动完成,应用通常无需关心实现细节:
| 模块 | 后台行为 |
|---|---|
| 节点发现 | 启动两个线程:发送端每 1 秒向 224.0.0.100:9999 广播本节点 Heartbeat;接收端持续收取其它节点心跳,更新 ServiceRegistry,超过 10 秒未收到则从注册表剔除。 |
| 时间同步 | 接收端收到 is_master=true 的心跳时,提取其 clock_time_ms,计算本地与 master 的时钟偏移并低通滤波。now_corrected_ms() 内部使用该偏移修正当前时间。 |
| ConfigManager 热更新 | 当 dynamic_load_enable=true 时,通过 notify 监听配置文件。文件变化时自动重读并解析 [dynamic],更新内部 RwLock,get_dynamic_clone() 返回最新值。 |
| 发布频率控制 | 当 publish_hz > 0 时,publish_topic / publish_raw 内部按 topic_key 记录上次发送时间,超过最小间隔的请求会被静默丢弃(限频)。 |
| 订阅频率控制 | 当 subscribe_hz > 0 时,try_recv_raw / try_recv_specific 内部按 local_name 记录上次轮询时间,未到间隔则直接返回 None,避免过度轮询。 |
| 订阅连接建立 | 初始化时,若 discovery 和 static_nodes 均未提供目标地址,订阅进入 pending_subs。try_recv_raw 内部自动 tick(),优先从 ServiceRegistry 解析,其次从 static_nodes(node_id -> "host:port")fallback,无需手动调用 tick()。 |
| 子话题过滤 | 若通过 set_sub_topics 设置了白名单,框架在 try_recv_raw 中只返回白名单内的 sub_topic,其它消息静默丢弃。 |
- 安装 ZeroMQ 库(不同平台命令略有差异,以下是常见示例):
- Debian/Ubuntu:
sudo apt-get install libzmq3-dev - Fedora:
sudo dnf install zeromq zeromq-devel - macOS(Homebrew):
brew install zeromq
- Debian/Ubuntu:
- 安装 Rust 稳定版(建议用
rustup)。
克隆本项目:
git clone https://github.com/LycanW/rs_ctrl_os.git
cd rs_ctrl_os或者使用 cargo 添加依赖:
cargo add rs_ctrl_os准备一个最小的配置(你也可以用仓库里的 example_config.toml):
[static_config]
my_id = "node1"
host = "127.0.0.1"
port = 5555
is_master = true
publish_hz = 1000
subscribe_hz = 1000
dynamic_load_enable = true
[static_config.subscribers]
local_sub = "node1"
[static_config.publishers]
control = "self"
# 可选:当 discovery 未找到目标时,用此地址直连(适合无多播环境)
[static_config.static_nodes]
# node1 = "127.0.0.1:5555"
[dynamic]
message_prefix = "hello"
interval_ms = 200在项目根目录运行:
cargo run --example pub_node -- example_config.tomlpub_node 会持续往 control topic 发布消息。注意:当前 pub_node 示例仅发布、不订阅,因此不会打印收到的消息。若需同时收发,可参考 examples/ 自行扩展,或运行两个进程(见下文)。
如果你改动配置文件里的 [dynamic](比如改前缀、改间隔)并保存,进程会自动加载新的动态配置:
message_prefix会改变打印出来的文本前缀;interval_ms会改变发送/接收的频率。
有时候你想在两个不同的进程里测试 pub/sub。可以用仓库里的:
examples/pub_node.rsexamples/sub_node.rs
以及对应的 pub_config.toml、sub_config.toml。
配置说明:sub_config.toml 中 local_sub 指向的 target_node_id 需与 pub_node 的 my_id 一致,才能收到 pub 的消息。仓库中的 sub_config.toml 可能指向其他节点(如 gateway_node_01),用于不同场景。若要 pub/sub 互通,可将 [static_config.subscribers] 改为 local_sub = "pub_node",并配置 static_nodes 或依赖 discovery 解析地址。
先打开一个终端作为发布端:
cargo run --example pub_node -- pub_config.toml再打开另一个终端作为订阅端:
cargo run --example sub_node -- sub_config.toml此时:
pub_node会按照pub_config.toml的[dynamic]配置,持续往 ZeroMQ PUB socket 上发消息。sub_node通过 discovery 或static_nodes连接pub_node,从local_sub收消息并打印。
你可以动态修改 pub_config.toml 的 [dynamic],比如:
[dynamic]
message_prefix = "pub1"
interval_ms = 200改成:
[dynamic]
message_prefix = "PUB-UPDATED"
interval_ms = 1000保存之后,几百毫秒到一两秒内你会看到:
sub_node打印出的消息前缀从pub1变成PUB-UPDATED;- 输出频率从 200ms 一条变成大约 1 秒一条。
初始化 tracing 日志,默认 INFO 级别。应在 main 入口处调用一次。
use rs_ctrl_os::init_logging;
init_logging();从 TOML 加载配置,返回框架静态配置 + 原始 [dynamic](toml::Value)。
适用于需要手动反序列化 [dynamic] 或只需 static_config 的场景。
- path:配置文件路径(
impl AsRef<Path>) - 返回:
(StaticBase, toml::Value),[dynamic]缺失时返回空表
一次性加载配置,返回强类型 (StaticBase, D)。无文件监听,无热更新开销。
- D:需实现
Deserialize,对应[dynamic]结构 - 适用:不需要热重载的节点
带热重载的配置管理器。当 dynamic_load_enable=true 时,通过 notify 监听文件变化并自动重载 [dynamic]。
| 方法 | 签名 | 说明 |
|---|---|---|
new |
new(config_path: &Path) -> Result<Self> |
加载配置并(可选)启动文件监听 |
static_cfg |
static_cfg(&self) -> &StaticBase |
获取静态配置引用 |
get_dynamic_clone |
get_dynamic_clone(&self) -> D |
获取当前 [dynamic] 的克隆(热更新后为最新值) |
config_path |
config_path(&self) -> &Path |
配置文件路径 |
D 约束:Clone + Deserialize + Send + Sync + 'static
框架静态配置结构体,从 TOML [static_config] 解析。
| 字段 | 类型 | 默认 | 说明 |
|---|---|---|---|
my_id |
String |
- | 本节点唯一 ID |
host |
String |
- | 本节点监听地址(如 127.0.0.1) |
port |
u16 |
- | 本节点监听端口 |
is_master |
bool |
false |
是否作为时间同步 master |
publishers |
HashMap<String, String> |
{} |
topic_key -> "node_id" 或 "self"(本地绑定) |
subscribers |
HashMap<String, String> |
{} |
local_name -> target_node_id |
static_nodes |
HashMap<String, String> |
{} |
node_id -> "host:port",discovery 失败时的 fallback |
publish_hz |
i64 |
- | 发布频率上限:>0 限频,0 不限,<0 禁止发布 |
subscribe_hz |
i64 |
- | 订阅轮询频率:>0 限频,0 不限,<0 禁止订阅 |
dynamic_load_enable |
bool |
true |
是否启用 [dynamic] 热更新 |
pub fn start_discovery(
my_id: &str,
my_host: &str,
my_port: u16,
is_master: bool,
time_sync: Option<Arc<TimeSynchronizer>>,
) -> Result<ServiceRegistry>启动 UDP 多播发现(224.0.0.100:9999)。后台线程每 1 秒广播心跳,接收其他节点心跳并更新注册表。
- time_sync:传入
TimeSynchronizer时,会从 master 心跳中提取clock_time_ms进行时间同步 - 返回:共享的
ServiceRegistry,供PubSubManager解析订阅目标地址
节点注册表,内部维护 node_id -> (host, port, timestamp)。
| 方法 | 签名 | 说明 |
|---|---|---|
new |
new() -> Self |
创建空注册表 |
register |
register(&self, hb: &Heartbeat) |
注册/更新节点(框架内部使用) |
get_address |
get_address(&self, node_id: &str) -> Option<(String, u16)> |
根据 node_id 获取 (host, port) |
cleanup |
cleanup(&self, timeout_secs: u64) |
剔除超时未心跳的节点(框架内部每轮调用) |
心跳消息结构(JSON 序列化,用于发现协议)。通过 rs_ctrl_os::discovery::Heartbeat 访问(未在 crate 根重导出)。
pub struct Heartbeat {
pub node_id: String,
pub host: String,
pub port: u16,
pub timestamp: u64,
pub clock_time_ms: u64,
pub is_master: bool,
}pub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>根据 static_config 的 publishers/subscribers 绑定 PUB、连接 SUB。target = "self" 的 topic 共用本机 PUB socket。
| 方法 | 签名 | 说明 |
|---|---|---|
set_publish_hz |
set_publish_hz(&mut self, hz: i64) |
覆盖发布频率。new() 已从 static_config 注入,通常无需调用;仅在运行时需修改时使用 |
set_subscribe_hz |
set_subscribe_hz(&mut self, hz: i64) |
覆盖订阅轮询频率。同上,一般依赖配置即可 |
set_sub_topics |
set_sub_topics(&mut self, local_name: &str, topics: &[S]) -> Result<()> |
为 local_name 设置 sub_topic 白名单,仅返回列表内的消息;空列表表示不过滤 |
tick |
tick(&mut self) -> Result<()> |
尝试为 pending_subs 建立连接;try_recv_raw 内部会自动调用,一般无需手动调用 |
| 方法 | 签名 | 说明 |
|---|---|---|
publish_topic |
publish_topic<T: Serialize>(&mut self, topic_key: &str, sub_topic: &str, data: &T) -> Result<()> |
Bincode 序列化后发送,适合结构化小消息 |
publish_raw |
publish_raw(&mut self, topic_key: &str, sub_topic: &str, payload: &[u8]) -> Result<()> |
透传原始字节,适合图像、点云等已编码数据 |
消息格式:ZMQ 三帧 multipart [节点ID, sub_topic, payload]
频率控制:当 publish_hz > 0 时,按 topic_key 限频,超频请求静默丢弃。
| 方法 | 签名 | 说明 |
|---|---|---|
try_recv_raw |
try_recv_raw(&mut self, local_name: &str) -> Result<Option<(String, Vec<u8>)>> |
非阻塞接收,返回 (sub_topic, payload);内部自动 tick() 并做频率限制 |
try_recv_specific |
try_recv_specific<T: Deserialize>(&mut self, local_name: &str, target_sub: &str) -> Result<Option<T>> |
仅当 sub_topic == target_sub 时用 bincode 反序列化为 T,否则返回 None |
频率控制:当 subscribe_hz > 0 时,按 local_name 限频,未到间隔返回 None。
主从时钟同步,从 is_master=true 的心跳中提取 clock_time_ms 计算偏移。
| 方法 | 签名 | 说明 |
|---|---|---|
new |
new() -> Self |
创建同步器,初始未同步 |
update_from_master |
update_from_master(&self, master_id: &str, master_ts_ms: u64) |
由发现模块内部调用,应用一般不直接使用 |
now_corrected_ms |
now_corrected_ms(&self) -> u64 |
返回经偏移修正的当前时间(毫秒) |
is_synced |
is_synced(&self) -> bool |
是否已与 master 同步 |
用法:将 Arc::new(TimeSynchronizer::new()) 传入 start_discovery,之后用 now_corrected_ms() 获取协调后的时间戳。
use rs_ctrl_os::{Result, RsCtrlError};| 变体 | 说明 |
|---|---|
Config(String) |
配置加载/解析错误 |
Comms(String) |
通信错误(如 topic 未找到) |
Serialization(String) |
序列化错误 |
Discovery(String) |
发现模块错误 |
NodeNotFound(String) |
注册表中无此 node_id |
Io(std::io::Error) |
IO 错误 |
Zmq(zmq::Error) |
ZeroMQ 错误 |
Bincode(Box<bincode::ErrorKind>) |
Bincode 序列化/反序列化错误 |
所有 API 返回 rs_ctrl_os::Result<T>,可用 ? 传播。
在你的 Cargo.toml 中添加依赖:
[dependencies]
rs_ctrl_os = "0.4.2"或者也可以
cargo add rs_ctrl_os你也可以通过路径依赖 / git 依赖方式在本地使用:
[dependencies]
rs_ctrl_os = { path = "./rs_ctrl_os" }下面是一个简单的单进程 pub/sub 示例,展示主要 API 的使用方式。
use rs_ctrl_os::init_logging;
fn main() {
init_logging();
// ...
}# example_config.toml
[static_config]
my_id = "node1"
host = "127.0.0.1"
port = 5555
is_master = true
publish_hz = 1000
subscribe_hz = 1000
dynamic_load_enable = true
[static_config.subscribers]
local_sub = "node1"
[static_config.publishers]
control = "self"
[dynamic]
message_prefix = "hello"方式一:需要热重载时,用 ConfigManager
use std::path::Path;
use serde::Deserialize;
use rs_ctrl_os::ConfigManager;
#[derive(Clone, Deserialize)]
struct DynamicCfg {
message_prefix: String,
interval_ms: u64,
}
fn main() -> rs_ctrl_os::Result<()> {
rs_ctrl_os::init_logging();
let manager: ConfigManager<DynamicCfg> =
ConfigManager::new(Path::new("example_config.toml"))?;
let static_cfg = manager.static_cfg().clone();
// 通过 manager.get_dynamic_clone() 获取最新 dynamic(文件变化时自动更新)
Ok(())
}方式二:不需要热重载时,用 load_config_typed
use serde::Deserialize;
use rs_ctrl_os::load_config_typed;
#[derive(Clone, Deserialize)]
struct DynamicCfg {
message_prefix: String,
}
fn main() -> rs_ctrl_os::Result<()> {
rs_ctrl_os::init_logging();
let (static_cfg, dynamic) = load_config_typed::<DynamicCfg>("example_config.toml")?;
// 一次性加载,无 watcher 开销
Ok(())
}use std::sync::Arc;
use rs_ctrl_os::{start_discovery, TimeSynchronizer};
fn main() -> rs_ctrl_os::Result<()> {
init_logging();
// ... 加载配置
let time_sync = Arc::new(TimeSynchronizer::new());
let registry = start_discovery(
&static_cfg.my_id,
&static_cfg.host,
static_cfg.port,
static_cfg.is_master,
Some(time_sync.clone()),
)?;
// registry 会在后台线程持续更新
Ok(())
}以下片段需与步骤 1(方式一 ConfigManager)、步骤 3 组合使用,才能获得 manager、static_cfg、registry、time_sync。
use rs_ctrl_os::PubSubManager;
use std::thread;
use std::time::Duration;
fn main() -> rs_ctrl_os::Result<()> {
init_logging();
// ... 步骤 1(ConfigManager)+ 步骤 3(start_discovery),得到 manager, static_cfg, registry, time_sync
let mut bus = PubSubManager::new(&static_cfg, registry)?;
loop {
let dyn_cfg = manager.get_dynamic_clone();
let ts_ms = time_sync.now_corrected_ms();
let payload = format!(
"{} from {} at {} ms",
dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
);
// topic_key = "control",sub_topic = "demo"(bincode 序列化)
bus.publish_topic("control", "demo", &payload)?;
// 图像/点云等二进制可用 publish_raw 透传,无需 bincode
// bus.publish_raw("camera", "frame", &jpeg_bytes)?;
if let Some(received) = bus.try_recv_specific::<String>("local_sub", "demo")? {
println!("Received: {received}");
}
// 简单示例:按 subscribe_hz 驱动主循环节奏
let interval = if static_cfg.subscribe_hz > 0 {
Duration::from_secs_f64(1.0 / static_cfg.subscribe_hz as f64)
} else {
Duration::from_millis(100)
};
thread::sleep(interval);
}
}examples/pub_node.rs/examples/sub_node.rs:单 pub + 单 sub,pub 使用ConfigManager热重载,sub 使用load_config_typed一次性加载。examples/multi_pub_node.rs/examples/multi_sub_node.rs:多子话题 pub/sub,multi_sub_node使用set_sub_topics过滤子话题;两者均通过try_recv_raw接收原始 payload 并自行反序列化。
can_bridge:CAN 总线网关,将 Linux SocketCAN 与 ZeroMQ 打通,实现 CAN ↔ 分布式消息的双向桥接。基于 rs_ctrl_os 构建,典型用法包括:
- 使用
ConfigManager加载配置并热重载[dynamic](接口、设备、控制开关等) start_discovery+PubSubManager实现传感器数据发布(sensor_mit/sensor_dji/sensor_imu)与控制指令订阅(ctrl_mit/ctrl_dji)publish_topic发布解析后的传感器 JSON,try_recv_raw接收控制命令[static_config]完全遵循 rs_ctrl_os 规范
可作为将 rs_ctrl_os 应用于机器人/嵌入式桥接场景的参考。
运行示例(在项目根目录):
# 简单 pub/sub
cargo run --example pub_node -- example_config.toml
# 多 pub / 多 sub
cargo run --example multi_pub_node -- multi_pub_config.toml
cargo run --example multi_sub_node -- multi_sub_config.toml库统一使用:
use rs_ctrl_os::{Result, RsCtrlError};RsCtrlError 覆盖了:
- 配置错误:
Config(String) - 通信错误:
Comms(String) - 序列化错误:
Serialization(String) - 发现错误:
Discovery(String) - 节点未找到:
NodeNotFound(String) - IO 错误:
Io(std::io::Error) - ZeroMQ 错误:
Zmq(zmq::Error) - Bincode 序列化错误:
Bincode(Box<bincode::ErrorKind>)
绝大多数 API 都返回 rs_ctrl_os::Result<T>,便于在上层直接用 ? 传播。
本项目采用 MIT 许可证发布。
详见 LICENSE 文件。