Skip to content

Latest commit

 

History

History
540 lines (360 loc) · 26.4 KB

File metadata and controls

540 lines (360 loc) · 26.4 KB

rs_ctrl_os

rs_ctrl_os 是一个用于构建分布式节点控制系统的小型运行时库,提供:

  • 节点发现:基于 UDP 多播的心跳机制(Heartbeat + ServiceRegistry
  • 消息通信:基于 ZeroMQ 的 pub/sub 抽象(PubSubManager
  • 配置管理:TOML 配置加载 + 动态热更新(ConfigManager / load_config_typed
  • 时间同步:简单的主从时钟同步(TimeSynchronizer
  • 统一错误/日志RsCtrlError + tracing 日志初始化

适合需要在局域网内跑多进程/多节点,进行“互相发现 + 消息分发 + 动态配置”的系统。

目录


快速上手(Rust)

0. 准备环境

  • 安装 ZeroMQ 库(不同平台命令略有差异,以下是常见示例):
    • Debian/Ubuntu: sudo apt-get install libzmq3-dev
    • Fedora: sudo dnf install zeromq zeromq-devel
    • macOS(Homebrew): brew install zeromq
  • 安装 Rust 稳定版(建议用 rustup)。

1. 安装 crate

在你的 Cargo.toml 中添加依赖:

[dependencies]
rs_ctrl_os = "0.5.0"

或:

cargo add rs_ctrl_os

2. 跑一个最简单的 pub/sub 示例

准备一个最小的配置(也可以直接用仓库里的 example_config.toml):

[static_config]
my_id = "node1"
host = "127.0.0.1"
port = 5555
is_master = true
publish_hz = 1000
subscribe_hz = 1000
dynamic_load_enable = true

[static_config.subscribers]
local_sub = "node1"

[static_config.publishers]
control = "self"

# 可选:当 discovery 未找到目标时,用此地址直连(适合无多播环境)
[static_config.static_nodes]
# node1 = "127.0.0.1:5555"

[dynamic]
message_prefix = "hello"
interval_ms = 200

在项目根目录运行:

cargo run --example pub_node -- example_config.toml

如果你改动配置文件里的 [dynamic](比如改前缀、改间隔)并保存,进程会自动加载新的动态配置:

  • message_prefix 会改变打印出来的文本前缀;
  • interval_ms 会改变发送/接收的频率。

3. 跑两个进程:一个 pub,一个 sub

仓库自带:

  • examples/pub_node.rs
  • examples/sub_node.rs
  • pub_config.tomlsub_config.toml

先开一个终端作为发布端:

cargo run --example pub_node -- pub_config.toml

再开另一个终端作为订阅端:

cargo run --example sub_node -- sub_config.toml

C / C++ API(预编译静态库)

v0.5.0 起,本库提供 稳定 C ABIinclude/rs_ctrl_os.h)与 staticlib 产物 librs_ctrl_os.a,便于在 C/C++ 中链接。官方 Release 不发布 librs_ctrl_os.so;需要动态库时请自行在 Cargo.toml[lib] crate-type 中加入 "cdylib" 后从源码编译。

预编译包(GitHub Releases)

推送 v* 形式的 tag 后,CI 会上传 glibc 下的两个压缩包:

资产 说明
rs_ctrl_os-<版本>-glibc-x86_64.tar.gz x86_64-unknown-linux-gnu
rs_ctrl_os-<版本>-glibc-aarch64.tar.gz aarch64-unknown-linux-gnu

每个包内含:librs_ctrl_os.ainclude/rs_ctrl_os.hSHA256SUMS不包含 libzmq(由系统/SDK 提供 -lzmq)。

glibc:制品在 Ubuntu 22.04/24.04 类环境构建;若目标机 glibc 更旧,请在本机 cargo build --release 自行生成 .a

集成流程(建议顺序)

  1. rs_ctrl_os_init_logging()(可选,便于看 tracing 日志)。
  2. rs_ctrl_os_config_open(path) — 打开含 [static_config][dynamic] 的 TOML;失败时查 rs_ctrl_os_last_error
  3. rs_ctrl_os_config_get_* 读静态字段;用 rs_ctrl_os_config_get_dynamic_toml[dynamic] 表对应的 TOML 文本(与磁盘风格一致,不含 [dynamic] 行;每次调用分配新串,用完 rs_ctrl_os_str_free)。
  4. rs_ctrl_os_time_sync_new(可选)→ rs_ctrl_os_discovery_start(可传入 time sync 指针)。
  5. rs_ctrl_os_pubsub_new(cfg, registry)一调用就接管 registry 所有权(内部 Box::from_raw):无论成功或失败,都不要再对原 registry 指针调用 rs_ctrl_os_registry_destroy;失败时该指针已无效,需重新 discovery_start 再试。
  6. 循环内:publish_raw / try_recv_raw;收到消息时释放 str_free / payload_free
  7. 退出:pubsub_destroytime_sync_destroyconfig_destroy(顺序与创建相反亦可,但不要对已交给 pubsub 的 registry 再 destroy)。

[dynamic] 与热更新:谁在做什么?

已经在 rs_ctrl_os(FFI 内部)做完的(与 C 是否“会 TOML”无关):

  • rs_ctrl_os_config_open 创建 ConfigManager<toml::Value>,等价于 Rust 侧的 ConfigManager::new
  • dynamic_load_enable=true 时,Rust 里 notify 监听配置文件;文件变更后 重新读取并解析 TOML 的 [dynamic],更新内存中的 toml::Value
    C 不需要、也不应该自己去 fopen / 解析整份 TOML 来做热重载。

C 侧仍然要做的(仅此一层,和“重载”是两件事):

  • 框架 只负责「存好并热更新 [dynamic] 这一段表」,不定义你业务里有哪些键(电机参数、相机参数等),因此 没有「按字段名的 C getter」这种强类型 API。
  • 你的进程若要根据 message_prefixinterval_ms驱动自己的逻辑,需要 轮询 get_dynamic_toml,拿到 当前快照的 TOML 文本,再用 tomlc99 / libtoml 或手写解析读出字段。
    这是在读 业务配置,不是在做 文件监听与重载;后者 已全部在 rcos 里实现

一句话:TOML 的 监听 + 重载 + 解析进内存 = rcos;C 只负责 从快照里取出自己关心的字段

链接(C 程序)

librs_ctrl_os.a 内含 bundled libzmq 的 C++ 对象,用 gcc 链纯 C 时需显式 stdc++

gcc -std=c11 -O2 -o myapp myapp.c \
  ./librs_ctrl_os.a \
  -lzmq -lstdc++ -lpthread -ldl -lm

示例工程 c_examples/(CMake)

仓库 c_examples/ 提供 CMake 工程,默认构建两个可执行文件:

目标 说明
rcos_minimal C++11,最短链路(minimal.cpp
rcos_tutorial_c C11,较完整:get_dynamic_toml、简易 TOML 行解析、publish_raw、轮询 try_recv_raw、时间戳(tutorial_node.c
rcos_pubsub_chat C11,双进程收发:pubsub_chat.cpub/sub 两种模式),配套 pubsub_chat_*.toml

在仓库根先 cargo build --release,再:

cd c_examples
cmake -S . -B build -DRCOS_ROOT=.. -DRCOS_LIB=../target/release/librs_ctrl_os.a
cmake --build build
./build/rcos_minimal ../example_config.toml
./build/rcos_tutorial_c ../example_config.toml 120   # 第二个参数:主循环 tick 次数(默认 600)

自测动态配置:终端 A 运行 ./build/rcos_tutorial_c ../example_config.toml;终端 B 编辑同一文件里 [dynamic]message_prefixinterval_ms 并保存;当 dynamic_load_enable=true 时,A 的日志/输出会反映新的 TOML 片段。

CMake 变量RCOS_ROOT(含 include/)、RCOS_LIB.a 路径);未设 RCOS_LIB 时会尝试 RCOS_ROOT/target/releasedebug。Linux 上 CMake 会尽量选 gcc/g++,且对示例目标链接 stdc++

从源码构建 Rust 库

cargo build --release
# 静态库:target/release/librs_ctrl_os.a

.cargo/config.tomlCXX=g++ / CC=gcc 用于稳定编译 zmq-sys;可按本机环境覆盖。

C API 速查

主题 函数
日志 rs_ctrl_os_init_logging
配置 config_open / config_destroy / config_get_dynamic_toml / config_get_my_id
时间 time_sync_new / now_ms / is_synced / destroy
发现 discovery_start / registry_destroy(仅失败或未交给 pubsub 时)
总线 pubsub_new / destroy / publish_raw / try_recv_raw / set_sub_topics / set_*_hz
内存 str_free / payload_free;错误串 last_error

示例

Rust examples/

  • examples/pub_node.rs / examples/sub_node.rs:单 pub + 单 sub,pub 使用 ConfigManager 热重载,sub 使用 load_config_typed 一次性加载。
  • examples/multi_pub_node.rs / examples/multi_sub_node.rs:多子话题 pub/sub,multi_sub_node 使用 set_sub_topics 过滤子话题;两者均通过 try_recv_raw 接收原始 payload 并自行反序列化。

实际项目参考:

  • can_bridge:CAN 总线网关,将 Linux SocketCAN 与 ZeroMQ 打通,实现 CAN ↔ 分布式消息的双向桥接。基于 rs_ctrl_os 构建,典型用法包括 ConfigManager 热重载、start_discovery + PubSubManager 收发、以及用 publish_topic / publish_raw 传输不同 payload。

运行示例(在项目根目录):

# 简单 pub/sub
cargo run --example pub_node -- example_config.toml

# 多 pub / 多 sub
cargo run --example multi_pub_node -- multi_pub_config.toml
cargo run --example multi_sub_node -- multi_sub_config.toml

C 示例工程 c_examples/(CMake)

cargo build --release
cmake -S c_examples -B c_examples/build -DCMAKE_BUILD_TYPE=Release
cmake --build c_examples/build

其中 rcos_pubsub_chat 可以用两进程验证收发:

# 终端 1
./c_examples/build/rcos_pubsub_chat pub c_examples/pubsub_chat_pub.toml 500

# 终端 2
./c_examples/build/rcos_pubsub_chat sub c_examples/pubsub_chat_sub.toml

框架能力与边界

框架负责什么

能力 说明
StaticBase 与 [static_config] 节点 ID、host、port、是否 master、publishers/subscribers 拓扑、static_nodes(IP fallback)、publish_hz/subscribe_hz、dynamic_load_enable 等运行所需的基础配置
配置加载机制 从 TOML 解析 [static_config],提供 load_config_rcosload_config_typedConfigManager 等 API
dynamic 热更新 dynamic_load_enable=true 时,监听配置文件变化并热重载 [dynamic] 内容
消息通道 ZMQ pub/sub、发现、时间同步、频率限速、原始字节透传(publish_raw / try_recv_raw

框架不负责什么

边界 说明
[dynamic] 的结构与语义 框架只负责「加载并热更新」[dynamic]不定义其字段。每个应用自行定义 D: Deserialize,例如 CAN 接口列表、电机参数、相机参数等
业务数据内容 图像、点云等大体量数据应通过 topic 传输,不应塞进 TOML。配置中只放「如何连接、参数、schema 版本」等元信息
业务协议与编码 消息 payload 的序列化方式(bincode / raw / JPEG 等)由应用选择;框架提供 publish_topic(bincode)、publish_raw(透传)两种能力

如何区分框架配置与业务配置

  • **[static_config]**:框架强依赖,必须存在。包含 my_idhostportis_masterpublish_hzsubscribe_hzdynamic_load_enablepublisherssubscribersstatic_nodes(可选)等。
  • **[dynamic]**:业务自由定义。框架不解析其具体字段,只负责按你提供的 D 反序列化并(可选)热更新。
    例如:can_bridge 定义 interfacesdevices;相机节点定义 camera_idresolution;点云节点定义 voxel_size 等。

框架在背后完成的工作

以下能力由框架自动完成,应用通常无需关心实现细节:

模块 后台行为
节点发现 启动两个线程:发送端每 1 秒向 224.0.0.100:9999 广播本节点 Heartbeat;接收端持续收取其它节点心跳,更新 ServiceRegistry,超过 10 秒未收到则从注册表剔除。
时间同步 接收端收到 is_master=true 的心跳时,提取其 clock_time_ms,计算本地与 master 的时钟偏移并低通滤波。now_corrected_ms() 内部使用该偏移修正当前时间。
ConfigManager 热更新 dynamic_load_enable=true 时,通过 notify 监听配置文件。文件变化时自动重读并解析 [dynamic],更新内部 RwLockget_dynamic_clone() 返回最新值。
发布频率控制 publish_hz > 0 时,publish_topic / publish_raw 内部按 topic_key 记录上次发送时间,超过最小间隔的请求会被静默丢弃(限频)。
订阅频率控制 subscribe_hz > 0 时,try_recv_raw / try_recv_specific 内部按 local_name 记录上次轮询时间,未到间隔则直接返回 None,避免过度轮询。
订阅连接建立 初始化时,若 discovery 和 static_nodes 均未提供目标地址,订阅进入 pending_substry_recv_raw 内部自动 tick(),优先从 ServiceRegistry 解析,其次从 static_nodesnode_id -> "host:port")fallback,无需手动调用 tick()
子话题过滤 若通过 set_sub_topics 设置了白名单,框架在 try_recv_raw 中只返回白名单内的 sub_topic,其它消息静默丢弃。

API 参考(详细)

1. 初始化

init_logging()

初始化 tracing 日志,默认 INFO 级别。应在 main 入口处调用一次。

use rs_ctrl_os::init_logging;
init_logging();

2. 配置管理

load_config_rcos(path) -> Result<(StaticBase, toml::Value)>

从 TOML 加载配置,返回框架静态配置 + 原始 [dynamic]toml::Value)。
适用于需要手动反序列化 [dynamic] 或只需 static_config 的场景。

  • path:配置文件路径(impl AsRef<Path>
  • 返回(StaticBase, toml::Value)[dynamic] 缺失时返回空表

load_config_typed::<D>(path) -> Result<(StaticBase, D)>

一次性加载配置,返回强类型 (StaticBase, D)。无文件监听,无热更新开销。

  • D:需实现 Deserialize,对应 [dynamic] 结构
  • 适用:不需要热重载的节点

ConfigManager<D>

带热重载的配置管理器。当 dynamic_load_enable=true 时,通过 notify 监听文件变化并自动重载 [dynamic]

方法 签名 说明
new new(config_path: &Path) -> Result<Self> 加载配置并(可选)启动文件监听
static_cfg static_cfg(&self) -> &StaticBase 获取静态配置引用
get_dynamic_clone get_dynamic_clone(&self) -> D 获取当前 [dynamic] 的克隆(热更新后为最新值)
config_path config_path(&self) -> &Path 配置文件路径

D 约束Clone + Deserialize + Send + Sync + 'static

StaticBase

框架静态配置结构体,从 TOML [static_config] 解析。

字段 类型 默认 说明
my_id String - 本节点唯一 ID
host String - 本节点监听地址(如 127.0.0.1
port u16 - 本节点监听端口
is_master bool false 是否作为时间同步 master
publishers HashMap<String, String> {} topic_key -> "node_id""self"(本地绑定)
subscribers HashMap<String, String> {} local_name -> target_node_id
static_nodes HashMap<String, String> {} node_id -> "host:port",discovery 失败时的 fallback
publish_hz i64 - 发布频率上限:>0 限频,0 不限,<0 禁止发布
subscribe_hz i64 - 订阅轮询频率:>0 限频,0 不限,<0 禁止订阅
dynamic_load_enable bool true 是否启用 [dynamic] 热更新

3. 节点发现

start_discovery(...) -> Result<ServiceRegistry>

pub fn start_discovery(
    my_id: &str,
    my_host: &str,
    my_port: u16,
    is_master: bool,
    time_sync: Option<Arc<TimeSynchronizer>>,
) -> Result<ServiceRegistry>

启动 UDP 多播发现(224.0.0.100:9999)。后台线程每 1 秒广播心跳,接收其他节点心跳并更新注册表。

  • time_sync:传入 TimeSynchronizer 时,会从 master 心跳中提取 clock_time_ms 进行时间同步
  • 返回:共享的 ServiceRegistry,供 PubSubManager 解析订阅目标地址

ServiceRegistry

节点注册表,内部维护 node_id -> (host, port, timestamp)

方法 签名 说明
new new() -> Self 创建空注册表
register register(&self, hb: &Heartbeat) 注册/更新节点(框架内部使用)
get_address get_address(&self, node_id: &str) -> Option<(String, u16)> 根据 node_id 获取 (host, port)
cleanup cleanup(&self, timeout_secs: u64) 剔除超时未心跳的节点(框架内部每轮调用)

Heartbeat

心跳消息结构(JSON 序列化,用于发现协议)。通过 rs_ctrl_os::discovery::Heartbeat 访问(未在 crate 根重导出)。

pub struct Heartbeat {
    pub node_id: String,
    pub host: String,
    pub port: u16,
    pub timestamp: u64,
    pub clock_time_ms: u64,
    pub is_master: bool,
}

4. ZeroMQ Pub/Sub(PubSubManager)

创建

pub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>

根据 static_configpublishers/subscribers 绑定 PUB、连接 SUB。target = "self" 的 topic 共用本机 PUB socket。

频率与过滤

方法 签名 说明
set_publish_hz set_publish_hz(&mut self, hz: i64) 覆盖发布频率。new() 已从 static_config 注入,通常无需调用;仅在运行时需修改时使用
set_subscribe_hz set_subscribe_hz(&mut self, hz: i64) 覆盖订阅轮询频率。同上,一般依赖配置即可
set_sub_topics set_sub_topics(&mut self, local_name: &str, topics: &[S]) -> Result<()> local_name 设置 sub_topic 白名单,仅返回列表内的消息;空列表表示不过滤
tick tick(&mut self) -> Result<()> 尝试为 pending_subs 建立连接;try_recv_raw 内部会自动调用,一般无需手动调用

发布

方法 签名 说明
publish_topic publish_topic<T: Serialize>(&mut self, topic_key: &str, sub_topic: &str, data: &T) -> Result<()> Bincode 序列化后发送,适合结构化小消息
publish_raw publish_raw(&mut self, topic_key: &str, sub_topic: &str, payload: &[u8]) -> Result<()> 透传原始字节,适合图像、点云等已编码数据

消息格式:ZMQ 三帧 multipart [节点ID, sub_topic, payload]

频率控制:当 publish_hz > 0 时,按 topic_key 限频,超频请求静默丢弃。

接收

方法 签名 说明
try_recv_raw try_recv_raw(&mut self, local_name: &str) -> Result<Option<(String, Vec<u8>)>> 非阻塞接收,返回 (sub_topic, payload);内部自动 tick() 并做频率限制
try_recv_specific try_recv_specific<T: Deserialize>(&mut self, local_name: &str, target_sub: &str) -> Result<Option<T>> 仅当 sub_topic == target_sub 时用 bincode 反序列化为 T,否则返回 None

频率控制:当 subscribe_hz > 0 时,按 local_name 限频,未到间隔返回 None


5. 时间同步(TimeSynchronizer)

主从时钟同步,从 is_master=true 的心跳中提取 clock_time_ms 计算偏移。

方法 签名 说明
new new() -> Self 创建同步器,初始未同步
update_from_master update_from_master(&self, master_id: &str, master_ts_ms: u64) 由发现模块内部调用,应用一般不直接使用
now_corrected_ms now_corrected_ms(&self) -> u64 返回经偏移修正的当前时间(毫秒)
is_synced is_synced(&self) -> bool 是否已与 master 同步

用法:将 Arc::new(TimeSynchronizer::new()) 传入 start_discovery,之后用 now_corrected_ms() 获取协调后的时间戳。


6. 错误类型

use rs_ctrl_os::{Result, RsCtrlError};
变体 说明
Config(String) 配置加载/解析错误
Comms(String) 通信错误(如 topic 未找到)
Serialization(String) 序列化错误
Discovery(String) 发现模块错误
NodeNotFound(String) 注册表中无此 node_id
Io(std::io::Error) IO 错误
Zmq(zmq::Error) ZeroMQ 错误
Bincode(Box<bincode::ErrorKind>) Bincode 序列化/反序列化错误

所有 API 返回 rs_ctrl_os::Result<T>,可用 ? 传播。


(附)路径依赖 / git 依赖

你也可以通过路径依赖 / git 依赖方式在本地使用:

[dependencies]
rs_ctrl_os = { path = "./rs_ctrl_os" }

错误处理

库统一使用:

use rs_ctrl_os::{Result, RsCtrlError};

RsCtrlError 覆盖了:

  • 配置错误:Config(String)
  • 通信错误:Comms(String)
  • 序列化错误:Serialization(String)
  • 发现错误:Discovery(String)
  • 节点未找到:NodeNotFound(String)
  • IO 错误:Io(std::io::Error)
  • ZeroMQ 错误:Zmq(zmq::Error)
  • Bincode 序列化错误:Bincode(Box<bincode::ErrorKind>)

绝大多数 API 都返回 rs_ctrl_os::Result<T>,便于在上层直接用 ? 传播。


许可证

本项目采用 MIT 许可证发布。
详见 LICENSE 文件。