Skip to content
171 changes: 143 additions & 28 deletions src/commands/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::utils::{config, horizon, notifications, print as p, soroban, stream::SorobanEventStream};
use crate::utils::{
config, horizon, notifications, print as p, soroban,
stream::{EventStreamFilters, SorobanEventStream},
};
use anyhow::Result;
use clap::Args;
use std::sync::{
Expand All @@ -20,6 +23,18 @@ pub struct MonitorArgs {
#[arg(long)]
pub follow: bool,

/// Soroban event type filter (contract, system, diagnostic)
#[arg(long = "type")]
pub event_type: Option<String>,

/// Topic filter: comma-separated segment matchers (* wildcards supported)
#[arg(long)]
pub topic: Option<String>,

/// Match emitted event value (substring match on JSON payload)
#[arg(long)]
pub value: Option<String>,

/// Wallet name from starforge config to monitor
#[arg(long)]
pub wallet: Option<String>,
Expand All @@ -28,6 +43,10 @@ pub struct MonitorArgs {
#[arg(long)]
pub threshold: Option<f64>,

/// Alert when wallet XLM balance drops below this amount (watchman)
#[arg(long)]
pub balance_alert: Option<f64>,

/// Network to use (overrides config)
#[arg(long)]
pub network: Option<String>,
Expand All @@ -49,60 +68,117 @@ pub fn handle(args: MonitorArgs) -> Result<()> {
println!();

match (&args.contract, &args.wallet) {
(Some(contract_id), None) => {
monitor_contract(contract_id, args.events.as_deref(), network, args.interval)
}
(None, Some(wallet_name)) => {
monitor_wallet(wallet_name, args.threshold, network, args.interval)
}
(Some(contract_id), None) => monitor_contract(
contract_id,
args.events.as_deref(),
args.event_type.as_deref(),
args.topic.as_deref(),
args.value.as_deref(),
network,
args.interval,
args.follow,
),
(None, Some(wallet_name)) => monitor_wallet(
wallet_name,
args.threshold,
args.balance_alert,
network,
args.interval,
),
_ => anyhow::bail!("Specify either --contract or --wallet (but not both)"),
}
}

fn monitor_contract(
contract_id: &str,
events_filter: Option<&str>,
event_type: Option<&str>,
topic: Option<&str>,
value: Option<&str>,
network: &str,
interval: u64,
follow: bool,
) -> Result<()> {
config::validate_contract_id(contract_id)?;

let filter_set: Option<Vec<String>> = events_filter.map(|s| {
let legacy_filter_set: Option<Vec<String>> = events_filter.map(|s| {
s.split(',')
.map(|x| x.trim().to_lowercase())
.filter(|x| !x.is_empty())
.collect()
});

let mut stream_filters = EventStreamFilters::default();
if let Some(t) = event_type {
let normalized = t.trim().to_lowercase();
if !normalized.is_empty() {
stream_filters.event_type = Some(normalized);
}
}
if let Some(topic_filter) = topic {
let segments: Vec<String> = topic_filter
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if !segments.is_empty() {
stream_filters.topic_segments = Some(segments);
}
}
if let Some(value_match) = value {
let trimmed = value_match.trim();
if !trimmed.is_empty() {
stream_filters.value_match = Some(trimmed.to_string());
}
}

let rpc_url = soroban::rpc_url(network);

notifications::info(&format!(
"Streaming contract events from {}.",
rpc_url
));

let mut stream =
SorobanEventStream::new(rpc_url, contract_id.to_string()).with_poll_interval(interval);
loop {
let batch = stream.next_batch()?;
for event in batch {
let as_text = event.value.to_string();
if let Some(ref filters) = filter_set {
let mut matches = false;
for f in filters {
if as_text.to_lowercase().contains(f) {
matches = true;
break;
let running = Arc::new(AtomicBool::new(true));
{
let running = Arc::clone(&running);
ctrlc::set_handler(move || {
running.store(false, Ordering::SeqCst);
})?;
}

let mut stream = SorobanEventStream::new(rpc_url, contract_id.to_string())
.with_poll_interval(interval)
.with_filters(stream_filters);

let mut printed_any = false;

while running.load(Ordering::SeqCst) {
match stream.next_batch() {
Ok(batch) => {
for event in batch {
let as_text = event.value.to_string();
let topic_text = event.topic.join(",");
let matches_legacy = legacy_filter_set.as_ref().is_none_or(|filters| {
filters.iter().any(|f| {
as_text.to_lowercase().contains(f)
|| topic_text.to_lowercase().contains(f)
})
});

if matches_legacy {
printed_any = true;
notifications::success(&format!(
"Ledger {} event {}: {}",
event.ledger, event.id, as_text
));
}
printed_any = true;
notifications::success(&format!(
"Ledger {} event {}: {}",
event.ledger, event.id, as_text
));
}

if !follow {
if !printed_any {
notifications::warn("No matching events in the latest batch.");
}
break;
}
stream.sleep();
Expand All @@ -126,6 +202,7 @@ fn monitor_contract(
fn monitor_wallet(
wallet_name: &str,
threshold: Option<f64>,
balance_alert: Option<f64>,
network: &str,
interval: u64,
) -> Result<()> {
Expand All @@ -137,8 +214,20 @@ fn monitor_wallet(
.ok_or_else(|| anyhow::anyhow!("Wallet '{}' not found", wallet_name))?;

let threshold = threshold.unwrap_or(0.0);
if threshold <= 0.0 {
notifications::warn("No --threshold provided; will print balance changes only.");
if threshold <= 0.0 && balance_alert.is_none() {
notifications::warn(
"No --threshold or --balance-alert provided; will print balance changes only.",
);
}

if let Some(alert_level) = balance_alert {
if alert_level <= 0.0 {
anyhow::bail!("--balance-alert must be greater than zero");
}
notifications::info(&format!(
"Watchman enabled: alert when balance drops below {:.7} XLM.",
alert_level
));
}

notifications::info(&format!(
Expand All @@ -147,7 +236,17 @@ fn monitor_wallet(
));

let mut last_balance: Option<f64> = None;
loop {
let mut low_balance_alerted = false;

let running = Arc::new(AtomicBool::new(true));
{
let running = Arc::clone(&running);
ctrlc::set_handler(move || {
running.store(false, Ordering::SeqCst);
})?;
}

while running.load(Ordering::SeqCst) {
let account = horizon::fetch_account(&wallet.public_key, network)?;
let native = account
.balances
Expand All @@ -171,6 +270,22 @@ fn monitor_wallet(
));
}

if let Some(alert_level) = balance_alert {
if native < alert_level {
if !low_balance_alerted {
notifications::alert(&format!(
"Balance {:.7} XLM dropped below watchman threshold {:.7} XLM",
native, alert_level
));
low_balance_alerted = true;
}
} else {
low_balance_alerted = false;
}
}

std::thread::sleep(std::time::Duration::from_secs(interval.max(1)));
}

Ok(())
}
33 changes: 33 additions & 0 deletions src/utils/notifications.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use colored::*;
use std::process::Command;

pub fn info(message: &str) {
println!(" {} {}", "•".bright_blue(), message);
Expand All @@ -11,3 +12,35 @@ pub fn success(message: &str) {
pub fn warn(message: &str) {
eprintln!(" {} {}", "!".yellow().bold(), message);
}

/// Terminal alert with optional OS notification (watchman).
pub fn alert(message: &str) {
eprintln!(
"\n {} {}\n",
"⚠ ALERT:".red().bold(),
message.bright_white().bold()
);
print!("\x07");
let _ = std::io::Write::flush(&mut std::io::stdout());
try_system_notification(message);
}

fn try_system_notification(message: &str) {
let escaped = message.replace('\\', "\\\\").replace('"', "\\\"");

#[cfg(target_os = "macos")]
{
let script = format!(
"display notification \"{}\" with title \"StarForge\"",
escaped
);
let _ = Command::new("osascript").args(["-e", &script]).status();
}

#[cfg(target_os = "linux")]
{
let _ = Command::new("notify-send")
.args(["StarForge", message])
.status();
}
}
Loading
Loading