From 70a43eebc687d0d62cd2e09d3ff38a4058ac99cc Mon Sep 17 00:00:00 2001 From: Nathan Gill Date: Sat, 9 May 2026 21:33:48 +0100 Subject: [PATCH 1/5] fix: switch `shepherd-run/status` for `user/status` - `shepherd-run/status` should be used for watchdog --- crates/shepherd-common/src/config.rs | 6 ++++++ crates/shepherd-run/src/runner.rs | 4 ++-- crates/shepherd-ws/src/main.rs | 11 +++-------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/shepherd-common/src/config.rs b/crates/shepherd-common/src/config.rs index 378fd01..e34e518 100644 --- a/crates/shepherd-common/src/config.rs +++ b/crates/shepherd-common/src/config.rs @@ -190,6 +190,8 @@ pub struct ChannelConfig { pub robot_log: String, #[serde(default = "default_channel_camera")] pub camera: String, + #[serde(default = "default_user_status")] + pub user_status: String, } fn default_channel_robot_control() -> String { @@ -201,6 +203,9 @@ fn default_channel_robot_log() -> String { fn default_channel_camera() -> String { "camera".to_string() } +fn default_user_status() -> String { + "user/status".to_string() +} impl Default for ChannelConfig { fn default() -> Self { @@ -208,6 +213,7 @@ impl Default for ChannelConfig { robot_control: default_channel_robot_control(), robot_log: default_channel_robot_log(), camera: default_channel_camera(), + user_status: default_user_status(), } } } diff --git a/crates/shepherd-run/src/runner.rs b/crates/shepherd-run/src/runner.rs index 8a2688d..68b1c97 100644 --- a/crates/shepherd-run/src/runner.rs +++ b/crates/shepherd-run/src/runner.rs @@ -3,7 +3,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use anyhow::{Result, anyhow}; use base64::Engine; use hopper::{Pipe, PipeMode}; -use shepherd_common::{Mode, RunState, Zone, config::Config, status_for}; +use shepherd_common::{Mode, RunState, Zone, config::Config}; use shepherd_mqtt::{ MqttAsyncClient, MqttClient, messages::{ControlMessage, ControlMessageType, RunStatusMessage}, @@ -184,7 +184,7 @@ impl Runner { // could be used to tell when robot is started/stopped mqttc .publish( - status_for(&self.config.run.service_id), + &self.config.channel.user_status, RunStatusMessage { state: next }, ) .await?; diff --git a/crates/shepherd-ws/src/main.rs b/crates/shepherd-ws/src/main.rs index b81bf40..42d00c6 100644 --- a/crates/shepherd-ws/src/main.rs +++ b/crates/shepherd-ws/src/main.rs @@ -62,19 +62,14 @@ async fn _main(config: Config) -> Result<()> { // set up subscription for all mqtt messages let mqtt_sender = msg_sender.clone(); let mqtt_log_handle = log_handle.clone(); + let mqtt_user_status = config.channel.user_status.clone(); mqtt_client .subscribe_raw("#", move |t, v| { let mqtt_sender = mqtt_sender.clone(); let mqtt_log_handle = mqtt_log_handle.clone(); + let mqtt_user_status = mqtt_user_status.clone(); async move { - dispatch_mqtt_message( - mqtt_sender, - mqtt_log_handle, - t, - "shepherd-run/status".to_string(), - v, - ) - .await + dispatch_mqtt_message(mqtt_sender, mqtt_log_handle, t, mqtt_user_status, v).await } }) .await?; From 048f226b5bac2044275cf0a7b5af0b5b87fadeca Mon Sep 17 00:00:00 2001 From: Nathan Gill Date: Sat, 9 May 2026 21:36:52 +0100 Subject: [PATCH 2/5] feat(shepherd-watch): init crate --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + crates/shepherd-watch/Cargo.toml | 14 ++++++++++++++ crates/shepherd-watch/src/main.rs | 3 +++ 4 files changed, 30 insertions(+) create mode 100644 crates/shepherd-watch/Cargo.toml create mode 100644 crates/shepherd-watch/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index a79c1f9..6684a6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,6 +1310,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "shepherd-watch" +version = "0.1.0" +dependencies = [ + "anyhow", + "shepherd-common", + "shepherd-mqtt", + "tokio", + "tokio-tungstenite", + "tracing", +] + [[package]] name = "shepherd-ws" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a9477f4..9836d8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/shepherd-common", "crates/shepherd-mqtt", "crates/shepherd-run", + "crates/shepherd-watch", "crates/shepherd-ws", ] diff --git a/crates/shepherd-watch/Cargo.toml b/crates/shepherd-watch/Cargo.toml new file mode 100644 index 0000000..411b09b --- /dev/null +++ b/crates/shepherd-watch/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "shepherd-watch" +version = "0.1.0" +edition.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +shepherd-common = { path = "../shepherd-common" } +shepherd-mqtt = { path = "../shepherd-mqtt" } +tokio.workspace = true +tokio-tungstenite.workspace = true +tracing.workspace = true diff --git a/crates/shepherd-watch/src/main.rs b/crates/shepherd-watch/src/main.rs new file mode 100644 index 0000000..ad379d6 --- /dev/null +++ b/crates/shepherd-watch/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("hello world"); +} From 73129ac73f3c79b809bd0e94e565776fb28d7b71 Mon Sep 17 00:00:00 2001 From: Nathan Gill Date: Sat, 9 May 2026 22:07:34 +0100 Subject: [PATCH 3/5] feat(mqtt): send service status messages on connect/disconnect --- crates/shepherd-common/src/lib.rs | 8 ------ crates/shepherd-mqtt/src/client.rs | 42 ++++++++++++++++++++++++---- crates/shepherd-mqtt/src/lib.rs | 8 ++++++ crates/shepherd-mqtt/src/messages.rs | 12 ++++++++ crates/shepherd-watch/src/main.rs | 11 +++++++- 5 files changed, 67 insertions(+), 14 deletions(-) diff --git a/crates/shepherd-common/src/lib.rs b/crates/shepherd-common/src/lib.rs index d733c80..fb455e9 100644 --- a/crates/shepherd-common/src/lib.rs +++ b/crates/shepherd-common/src/lib.rs @@ -3,14 +3,6 @@ use serde::{Deserialize, Serialize}; pub mod args; pub mod config; -/// Generate a status channel name from a service ID -pub fn status_for(service_id: S) -> String -where - S: AsRef, -{ - format!("{}/status", service_id.as_ref()) -} - #[derive(Debug, Default, PartialEq, Serialize, Deserialize, Copy, Clone)] #[serde(rename_all = "lowercase")] pub enum RunState { diff --git a/crates/shepherd-mqtt/src/client.rs b/crates/shepherd-mqtt/src/client.rs index 404ee81..9ebbb18 100644 --- a/crates/shepherd-mqtt/src/client.rs +++ b/crates/shepherd-mqtt/src/client.rs @@ -2,11 +2,15 @@ use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use bytes::Bytes; use futures::future::join_all; -use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; +use rumqttc::{AsyncClient, Event, EventLoop, LastWill, MqttOptions, Packet, QoS}; use tokio::sync::Mutex; use tracing::{debug, warn}; -use crate::{Wildcard, messages::MqttMessage}; +use crate::{ + Wildcard, + messages::{MqttMessage, ServiceStatus, StatusMessage}, + status_for, +}; pub type MqttHandler = Box< dyn Fn(String, Bytes) -> Pin> + Send>> + Send + Sync, @@ -106,6 +110,8 @@ impl MqttAsyncClient { pub struct MqttEventLoop { event_loop: EventLoop, + client: AsyncClient, + service_id: String, registry: Arc>, } @@ -147,8 +153,23 @@ impl MqttEventLoop { Self::dispatch(registry, topic, payload).await; }); } - Event::Incoming(Packet::Connect(c)) => { - debug!("mqtt client connected with id '{}'", c.client_id); + Event::Incoming(Packet::ConnAck(_)) => { + debug!("mqtt client connected"); + + // generate a birth message + let birth_topic = status_for(&self.service_id); + let birth_message = serde_json::to_vec(&StatusMessage { + status: ServiceStatus::Online, + }) + .expect("birth message generation failed"); + + if let Err(e) = self + .client + .publish(birth_topic, QoS::AtLeastOnce, true, birth_message) + .await + { + warn!("failed to send birth message: {e}"); + } } Event::Incoming(Packet::Disconnect) => { debug!("mqtt client disconnected"); @@ -169,8 +190,17 @@ impl MqttClient { where S: AsRef, { + // generate a last will for this client + let last_will_topic = status_for(service_id.as_ref()); + let last_will_message = serde_json::to_vec(&StatusMessage { + status: ServiceStatus::Offline, + }) + .expect("last will generation failed"); // this should never be able to fail + let last_will = LastWill::new(last_will_topic, last_will_message, QoS::AtLeastOnce, true); + let mut mqttoptions = MqttOptions::new(service_id.as_ref(), hostname.as_ref(), port); mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqttoptions.set_last_will(last_will); let (client, event_loop) = AsyncClient::new(mqttoptions, 10); @@ -179,12 +209,14 @@ impl MqttClient { debug!("initialised new mqtt client"); let wc = MqttAsyncClient { - client, + client: client.clone(), registry: registry.clone(), }; let we = MqttEventLoop { event_loop, + client, + service_id: service_id.as_ref().to_string(), registry: registry.clone(), }; diff --git a/crates/shepherd-mqtt/src/lib.rs b/crates/shepherd-mqtt/src/lib.rs index e26f555..4cd0821 100644 --- a/crates/shepherd-mqtt/src/lib.rs +++ b/crates/shepherd-mqtt/src/lib.rs @@ -4,3 +4,11 @@ mod util; pub use client::*; pub use util::*; + +/// Generate a status channel name from a service ID +pub fn status_for(service_id: S) -> String +where + S: AsRef, +{ + format!("{}/status", service_id.as_ref()) +} diff --git a/crates/shepherd-mqtt/src/messages.rs b/crates/shepherd-mqtt/src/messages.rs index 396a41a..3e1a4b3 100644 --- a/crates/shepherd-mqtt/src/messages.rs +++ b/crates/shepherd-mqtt/src/messages.rs @@ -23,3 +23,15 @@ pub struct ControlMessage { pub struct RunStatusMessage { pub state: shepherd_common::RunState, } + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum ServiceStatus { + Online, + Offline, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StatusMessage { + pub status: ServiceStatus, +} diff --git a/crates/shepherd-watch/src/main.rs b/crates/shepherd-watch/src/main.rs index ad379d6..dbc1477 100644 --- a/crates/shepherd-watch/src/main.rs +++ b/crates/shepherd-watch/src/main.rs @@ -1,3 +1,12 @@ -fn main() { +use anyhow::Result; +use shepherd_common::{args::call_with_args, config::Config}; + +async fn _main(config: Config) -> Result<()> { println!("hello world"); + Ok(()) +} + +#[tokio::main] +async fn main() { + call_with_args("shepherd-watch", _main).await; } From 904d1f70b719e3a874c1647dac7a2adf46052406 Mon Sep 17 00:00:00 2001 From: Nathan Gill Date: Sat, 9 May 2026 22:11:26 +0100 Subject: [PATCH 4/5] rename `user/status` to `user/state` for status wildcards --- crates/shepherd-common/src/config.rs | 6 +++--- crates/shepherd-run/src/runner.rs | 2 +- crates/shepherd-ws/src/main.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/shepherd-common/src/config.rs b/crates/shepherd-common/src/config.rs index e34e518..377b9ed 100644 --- a/crates/shepherd-common/src/config.rs +++ b/crates/shepherd-common/src/config.rs @@ -191,7 +191,7 @@ pub struct ChannelConfig { #[serde(default = "default_channel_camera")] pub camera: String, #[serde(default = "default_user_status")] - pub user_status: String, + pub user_state: String, } fn default_channel_robot_control() -> String { @@ -204,7 +204,7 @@ fn default_channel_camera() -> String { "camera".to_string() } fn default_user_status() -> String { - "user/status".to_string() + "user/state".to_string() } impl Default for ChannelConfig { @@ -213,7 +213,7 @@ impl Default for ChannelConfig { robot_control: default_channel_robot_control(), robot_log: default_channel_robot_log(), camera: default_channel_camera(), - user_status: default_user_status(), + user_state: default_user_status(), } } } diff --git a/crates/shepherd-run/src/runner.rs b/crates/shepherd-run/src/runner.rs index 68b1c97..e7e48dc 100644 --- a/crates/shepherd-run/src/runner.rs +++ b/crates/shepherd-run/src/runner.rs @@ -184,7 +184,7 @@ impl Runner { // could be used to tell when robot is started/stopped mqttc .publish( - &self.config.channel.user_status, + &self.config.channel.user_state, RunStatusMessage { state: next }, ) .await?; diff --git a/crates/shepherd-ws/src/main.rs b/crates/shepherd-ws/src/main.rs index 42d00c6..2c12a1c 100644 --- a/crates/shepherd-ws/src/main.rs +++ b/crates/shepherd-ws/src/main.rs @@ -62,7 +62,7 @@ async fn _main(config: Config) -> Result<()> { // set up subscription for all mqtt messages let mqtt_sender = msg_sender.clone(); let mqtt_log_handle = log_handle.clone(); - let mqtt_user_status = config.channel.user_status.clone(); + let mqtt_user_status = config.channel.user_state.clone(); mqtt_client .subscribe_raw("#", move |t, v| { let mqtt_sender = mqtt_sender.clone(); From 7d721d2fffcc7904fc9f92e78ed903f9d0cb1704 Mon Sep 17 00:00:00 2001 From: Nathan Gill Date: Sat, 9 May 2026 23:24:58 +0100 Subject: [PATCH 5/5] feat(watch): add prototype watchdog service --- Cargo.lock | 2 + crates/shepherd-app/src/control.rs | 6 +- crates/shepherd-app/src/upload.rs | 2 +- crates/shepherd-common/src/config.rs | 38 ++++++ crates/shepherd-mqtt/src/client.rs | 10 +- crates/shepherd-mqtt/src/messages.rs | 10 +- crates/shepherd-run/src/runner.rs | 1 + crates/shepherd-watch/Cargo.toml | 2 + crates/shepherd-watch/src/main.rs | 182 ++++++++++++++++++++++++++- 9 files changed, 242 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6684a6e..3b0b4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,6 +1315,8 @@ name = "shepherd-watch" version = "0.1.0" dependencies = [ "anyhow", + "futures-util", + "serde_json", "shepherd-common", "shepherd-mqtt", "tokio", diff --git a/crates/shepherd-app/src/control.rs b/crates/shepherd-app/src/control.rs index d4952f6..bf19c8f 100644 --- a/crates/shepherd-app/src/control.rs +++ b/crates/shepherd-app/src/control.rs @@ -32,7 +32,7 @@ async fn start( state .mqttc - .publish(state.robot_control, msg) + .publish(state.robot_control, msg, false) .await .map_err(|e| { ShepherdError( @@ -53,7 +53,7 @@ async fn stop(State(state): State) -> ShepherdResult<()> { state .mqttc - .publish(state.robot_control, msg) + .publish(state.robot_control, msg, false) .await .map_err(|e| { ShepherdError( @@ -74,7 +74,7 @@ async fn reset(State(state): State) -> ShepherdResult<()> { state .mqttc - .publish(state.robot_control, msg) + .publish(state.robot_control, msg, false) .await .map_err(|e| { ShepherdError( diff --git a/crates/shepherd-app/src/upload.rs b/crates/shepherd-app/src/upload.rs index f0af70b..5307c39 100644 --- a/crates/shepherd-app/src/upload.rs +++ b/crates/shepherd-app/src/upload.rs @@ -141,7 +141,7 @@ async fn upload_file( state .mqttc - .publish(state.robot_control, msg) + .publish(state.robot_control, msg, false) .await .map_err(|e| { ShepherdError( diff --git a/crates/shepherd-common/src/config.rs b/crates/shepherd-common/src/config.rs index 377b9ed..8d888f5 100644 --- a/crates/shepherd-common/src/config.rs +++ b/crates/shepherd-common/src/config.rs @@ -19,6 +19,8 @@ pub struct Config { #[serde(default)] pub ws: WsConfig, #[serde(default)] + pub watch: WatchConfig, + #[serde(default)] pub channel: ChannelConfig, #[serde(default)] pub path: PathConfig, @@ -182,6 +184,36 @@ impl Default for WsConfig { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WatchConfig { + #[serde(default = "default_watch_service_id")] + pub service_id: String, + #[serde(default = "default_watch_host")] + pub host: String, + #[serde(default = "default_watch_port")] + pub port: u16, +} + +fn default_watch_service_id() -> String { + "shepherd-watch".to_string() +} +fn default_watch_host() -> String { + "0.0.0.0".to_string() +} +fn default_watch_port() -> u16 { + 1010 +} + +impl Default for WatchConfig { + fn default() -> Self { + Self { + service_id: default_watch_service_id(), + host: default_watch_host(), + port: default_watch_port(), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ChannelConfig { #[serde(default = "default_channel_robot_control")] @@ -192,6 +224,8 @@ pub struct ChannelConfig { pub camera: String, #[serde(default = "default_user_status")] pub user_state: String, + #[serde(default = "default_status")] + pub status: String, } fn default_channel_robot_control() -> String { @@ -206,6 +240,9 @@ fn default_channel_camera() -> String { fn default_user_status() -> String { "user/state".to_string() } +fn default_status() -> String { + "status".to_string() +} impl Default for ChannelConfig { fn default() -> Self { @@ -214,6 +251,7 @@ impl Default for ChannelConfig { robot_log: default_channel_robot_log(), camera: default_channel_camera(), user_state: default_user_status(), + status: default_status(), } } } diff --git a/crates/shepherd-mqtt/src/client.rs b/crates/shepherd-mqtt/src/client.rs index 9ebbb18..a43c0a7 100644 --- a/crates/shepherd-mqtt/src/client.rs +++ b/crates/shepherd-mqtt/src/client.rs @@ -80,7 +80,7 @@ impl MqttAsyncClient { Ok(()) } - pub async fn publish(&self, topic: S, msg: T) -> anyhow::Result<()> + pub async fn publish(&self, topic: S, msg: T, retain: bool) -> anyhow::Result<()> where T: MqttMessage, S: AsRef, @@ -88,18 +88,18 @@ impl MqttAsyncClient { let b = serde_json::to_vec(&msg) .map_err(|e| anyhow::anyhow!("failed to serialize message: {e}"))?; - self.publish_raw(topic, b).await?; + self.publish_raw(topic, b, retain).await?; Ok(()) } - pub async fn publish_raw(&self, topic: S, msg: V) -> anyhow::Result<()> + pub async fn publish_raw(&self, topic: S, msg: V, retain: bool) -> anyhow::Result<()> where S: AsRef, V: Into>, { self.client - .publish(topic.as_ref(), QoS::AtLeastOnce, false, msg) + .publish(topic.as_ref(), QoS::AtLeastOnce, retain, msg) .await?; debug!("client published to topic '{}'", topic.as_ref()); @@ -159,6 +159,7 @@ impl MqttEventLoop { // generate a birth message let birth_topic = status_for(&self.service_id); let birth_message = serde_json::to_vec(&StatusMessage { + service: self.service_id.clone(), status: ServiceStatus::Online, }) .expect("birth message generation failed"); @@ -193,6 +194,7 @@ impl MqttClient { // generate a last will for this client let last_will_topic = status_for(service_id.as_ref()); let last_will_message = serde_json::to_vec(&StatusMessage { + service: service_id.as_ref().to_string(), status: ServiceStatus::Offline, }) .expect("last will generation failed"); // this should never be able to fail diff --git a/crates/shepherd-mqtt/src/messages.rs b/crates/shepherd-mqtt/src/messages.rs index 3e1a4b3..410850c 100644 --- a/crates/shepherd-mqtt/src/messages.rs +++ b/crates/shepherd-mqtt/src/messages.rs @@ -24,14 +24,20 @@ pub struct RunStatusMessage { pub state: shepherd_common::RunState, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum ServiceStatus { Online, Offline, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct StatusMessage { + pub service: String, pub status: ServiceStatus, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct StatusSummary { + pub statuses: Vec, +} diff --git a/crates/shepherd-run/src/runner.rs b/crates/shepherd-run/src/runner.rs index e7e48dc..58a162b 100644 --- a/crates/shepherd-run/src/runner.rs +++ b/crates/shepherd-run/src/runner.rs @@ -186,6 +186,7 @@ impl Runner { .publish( &self.config.channel.user_state, RunStatusMessage { state: next }, + false, ) .await?; diff --git a/crates/shepherd-watch/Cargo.toml b/crates/shepherd-watch/Cargo.toml index 411b09b..30289ca 100644 --- a/crates/shepherd-watch/Cargo.toml +++ b/crates/shepherd-watch/Cargo.toml @@ -7,6 +7,8 @@ license.workspace = true [dependencies] anyhow.workspace = true +futures-util.workspace = true +serde_json.workspace = true shepherd-common = { path = "../shepherd-common" } shepherd-mqtt = { path = "../shepherd-mqtt" } tokio.workspace = true diff --git a/crates/shepherd-watch/src/main.rs b/crates/shepherd-watch/src/main.rs index dbc1477..030edc0 100644 --- a/crates/shepherd-watch/src/main.rs +++ b/crates/shepherd-watch/src/main.rs @@ -1,8 +1,188 @@ +use std::{collections::HashMap, sync::Arc}; + use anyhow::Result; +use futures_util::{SinkExt, StreamExt}; use shepherd_common::{args::call_with_args, config::Config}; +use shepherd_mqtt::{ + MqttAsyncClient, MqttClient, + messages::{ServiceStatus, StatusMessage, StatusSummary}, +}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{ + Mutex, + broadcast::{self, Receiver, Sender}, + }, +}; +use tokio_tungstenite::{accept_async, tungstenite::Message}; +use tracing::{debug, error, info, warn}; + +async fn create_summary(statuses: Arc>>) -> Result { + let statuses = statuses.lock().await; + let status_arr: Vec = statuses + .iter() + .map(|(service, status)| StatusMessage { + service: service.clone(), + status: status.clone(), + }) + .collect(); + Ok(serde_json::to_string(&StatusSummary { + statuses: status_arr, + })?) +} + +async fn handle_websocket( + stream: TcpStream, + summary: String, + mut status_receiver: Receiver, +) -> Result<()> { + let addr = stream.peer_addr()?; + debug!("new websocket connection from {:?}", addr); + let (mut ws_tx, mut ws_rx) = accept_async(stream).await?.split(); + + // send an initial summary + ws_tx.send(Message::text(summary)).await?; + + loop { + tokio::select! { + // forward summary messages + msg = status_receiver.recv() => { + match msg { + Ok(s) => ws_tx.send(Message::text(s)).await?, + Err(e) => return Err(e)?, + } + } + + msg = ws_rx.next() => { + // detect if connection has been closed + match msg { + Some(Ok(msg)) => if msg.is_close() { + info!("closed connection from {:?}", addr); + return Ok(()) + } + None => { + info!("closed connection from {:?}", addr); + return Ok(()) + } + _ => {} + } + } + } + } +} + +async fn handle_status_message( + statuses: Arc>>, + status_sender: Sender, + mqtt_client: MqttAsyncClient, + status_topic: String, + message: StatusMessage, +) -> Result<()> { + info!("status for {}: {:?}", message.service, message.status); + + // update status table, generate summary array + let status_arr: Vec = { + let mut statuses = statuses.lock().await; + statuses.insert(message.service, message.status); + + statuses + .iter() + .map(|(service, status)| StatusMessage { + service: service.clone(), + status: status.clone(), + }) + .collect() + + // drop the lock here before sending + }; + + let summary = StatusSummary { + statuses: status_arr, + }; + + match serde_json::to_string(&summary) { + Ok(summary) => { + let _ = status_sender.send(summary.clone()); + } + Err(e) => { + warn!("failed to serialise status summary: {e}"); + } + } + + let _ = mqtt_client.publish(status_topic, summary, true).await; + + Ok(()) +} async fn _main(config: Config) -> Result<()> { - println!("hello world"); + let (status_sender, _) = broadcast::channel::(64); + let statuses: Arc>> = Arc::new(Mutex::new(HashMap::new())); + + let (mut mqtt_client, mut mqtt_event_loop) = MqttClient::new( + &config.watch.service_id, + &config.mqtt.broker, + config.mqtt.port, + ); + + // TODO: wrap these in a context object + let mqtt_statuses = statuses.clone(); + let mqtt_status_sender = status_sender.clone(); + let mqtt_status = config.channel.status.clone(); + let mqtt_mqtt_client = mqtt_client.clone(); + mqtt_client + .subscribe("+/status", move |_, v| { + let mqtt_statuses = mqtt_statuses.clone(); + let mqtt_status_sender = mqtt_status_sender.clone(); + let mqtt_status = mqtt_status.clone(); + let mqtt_mqtt_client = mqtt_mqtt_client.clone(); + + async move { + handle_status_message( + mqtt_statuses, + mqtt_status_sender, + mqtt_mqtt_client, + mqtt_status, + v, + ) + .await + } + }) + .await?; + + // run mqtt event loop independently + let mqtt_loop = tokio::spawn(async move { + loop { + if let Err(e) = mqtt_event_loop.run().await { + error!("mqtt loop exited: {e}"); + } + } + }); + + let listener = + TcpListener::bind(format!("{}:{}", &config.watch.host, config.watch.port)).await?; + + tokio::select! { + res = async { + loop { + match listener.accept().await { + Ok((stream, _)) => { + // default to a blank string if serialisation failed + let summary = create_summary(statuses.clone()).await.unwrap_or("".to_string()); + tokio::spawn(handle_websocket(stream, summary, status_sender.subscribe()) ); + } + Err(e) => return Err(e), + } + } + } => { + warn!("websocket handler exited {:?}", res); + res? + } + + _ = mqtt_loop => { + error!("mqtt client exited?"); + } + } + Ok(()) }