From 5e28c250b5aa89101df06c17ebf4f84af739db29 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Mon, 30 Mar 2026 01:11:01 -0400 Subject: [PATCH 1/2] admin: add sentinel commands for validator multicast publisher management Add sentinel subcommand to doublezero-admin with two commands: - find-validator-multicast-publishers: cross-reference onchain IBRL users with validator metadata to show publisher status - create-validator-multicast-publishers: bulk-create multicast publisher users for IBRL validators via 3-step onchain flow Shared library crate (doublezero-sentinel) provides readers, writers, and pure logic with unit tests. DZClient gets rpc_client() and payer_keypair() accessors for sentinel command integration. --- CHANGELOG.md | 1 + Cargo.lock | 151 ++++- Cargo.toml | 3 + controlplane/doublezero-admin/Cargo.toml | 1 + .../doublezero-admin/src/cli/command.rs | 5 +- controlplane/doublezero-admin/src/cli/mod.rs | 1 + .../doublezero-admin/src/cli/sentinel.rs | 601 ++++++++++++++++++ controlplane/doublezero-admin/src/main.rs | 9 + crates/sentinel/Cargo.toml | 28 + crates/sentinel/src/dz_ledger_reader.rs | 207 ++++++ crates/sentinel/src/dz_ledger_writer.rs | 195 ++++++ crates/sentinel/src/lib.rs | 6 + crates/sentinel/src/multicast_create.rs | 333 ++++++++++ crates/sentinel/src/multicast_find.rs | 162 +++++ crates/sentinel/src/output.rs | 40 ++ .../sentinel/src/validator_metadata_reader.rs | 93 +++ smartcontract/sdk/rs/src/client.rs | 8 + 17 files changed, 1839 insertions(+), 5 deletions(-) create mode 100644 controlplane/doublezero-admin/src/cli/sentinel.rs create mode 100644 crates/sentinel/Cargo.toml create mode 100644 crates/sentinel/src/dz_ledger_reader.rs create mode 100644 crates/sentinel/src/dz_ledger_writer.rs create mode 100644 crates/sentinel/src/lib.rs create mode 100644 crates/sentinel/src/multicast_create.rs create mode 100644 crates/sentinel/src/multicast_find.rs create mode 100644 crates/sentinel/src/output.rs create mode 100644 crates/sentinel/src/validator_metadata_reader.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7554cd8045..89cd06d80c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file. - Allow incremental multicast group addition without disconnecting - Reset SIGPIPE to SIG_DFL at the start of main() in all 3 CLI binaries (doublezero, doublezero-geolocation, doublezero-admin) so the process exits silently like standard CLI tools - Support `--type outbound-icmp` in geolocation `user add-target`, `remove-target`, and `get` commands + - Add sentinel admin commands to find and create multicast publishers for IBRL validators - SDK - Add Go SDK for shred subscription program with read-only account deserialization (epoch state, seat assignments, pricing, settlement, validator client rewards), PDA derivation helpers, RPC fetchers, compatibility tests, and a fetch example CLI - Add `GeoLocationTargetTypeOutboundIcmp` to Go geolocation SDK with deserialization and round-trip test support diff --git a/Cargo.lock b/Cargo.lock index 220c1c4377..dd576f1b6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1151,6 +1151,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -1599,6 +1609,7 @@ dependencies = [ "console", "doublezero-config", "doublezero-program-common", + "doublezero-sentinel", "doublezero-serviceability", "doublezero_cli", "doublezero_sdk", @@ -1694,6 +1705,27 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "doublezero-sentinel" +version = "0.0.1" +dependencies = [ + "anyhow", + "async-trait", + "borsh 1.5.7", + "clap", + "doublezero-serviceability", + "doublezero_sdk", + "mockall", + "reqwest", + "serde", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-sdk", + "tabled", + "tokio", +] + [[package]] name = "doublezero-serviceability" version = "0.15.0" @@ -1908,6 +1940,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -2613,6 +2654,22 @@ dependencies = [ "webpki-roots 1.0.5", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.17" @@ -2632,9 +2689,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2 0.6.0", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -3273,6 +3332,12 @@ dependencies = [ "sketches-ddsketch", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3347,6 +3412,23 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -4241,17 +4323,22 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2", "http 1.3.1", "http-body", "http-body-util", "hyper", "hyper-rustls", + "hyper-tls", "hyper-util", "js-sys", "log", + "mime", + "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -4262,6 +4349,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.2", "tower", "tower-http", @@ -4401,7 +4489,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.2.0", ] [[package]] @@ -4420,7 +4508,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" dependencies = [ - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "jni", "log", @@ -4429,7 +4517,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki 0.103.3", - "security-framework", + "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs 0.26.11", "windows-sys 0.59.0", @@ -4518,6 +4606,19 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.2.0" @@ -4525,7 +4626,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -8504,6 +8605,27 @@ dependencies = [ "syn 2.0.102", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tabled" version = "0.20.0" @@ -8763,6 +8885,16 @@ dependencies = [ "syn 2.0.102", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -9458,6 +9590,17 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" +[[package]] +name = "windows-registry" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" +dependencies = [ + "windows-link 0.1.1", + "windows-result", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index d3e4a374e6..70c7e88e12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "smartcontract/programs/doublezero-geolocation", "smartcontract/programs/common", "e2e/docker/ledger/fork-accounts", + "crates/sentinel", ] default-members = [] exclude = [ @@ -69,6 +70,7 @@ log = "0" metrics = "0" metrics-util = "0" mockall = "0" +reqwest = "0" regex = "1" serde = "1" serde_bytes = "0" @@ -98,6 +100,7 @@ tokio = { version = "1", default-features = false, features = [ "signal", ] } doublezero-config = { path = "config" } +doublezero-sentinel = { path = "crates/sentinel" } doublezero_cli = { path = "smartcontract/cli" } doublezero-program-common = { path = "smartcontract/programs/common" } doublezero_sdk = { path = "smartcontract/sdk/rs" } diff --git a/controlplane/doublezero-admin/Cargo.toml b/controlplane/doublezero-admin/Cargo.toml index ab5bb6c3c0..961af009ad 100644 --- a/controlplane/doublezero-admin/Cargo.toml +++ b/controlplane/doublezero-admin/Cargo.toml @@ -44,3 +44,4 @@ doublezero_cli.workspace = true doublezero-config.workspace = true doublezero-serviceability.workspace = true doublezero-program-common.workspace = true +doublezero-sentinel.workspace = true diff --git a/controlplane/doublezero-admin/src/cli/command.rs b/controlplane/doublezero-admin/src/cli/command.rs index 7c4ad46947..8d3826ed50 100644 --- a/controlplane/doublezero-admin/src/cli/command.rs +++ b/controlplane/doublezero-admin/src/cli/command.rs @@ -1,4 +1,4 @@ -use super::multicast::MulticastCliCommand; +use super::{multicast::MulticastCliCommand, sentinel::SentinelCliCommand}; use crate::cli::{ accesspass::AccessPassCliCommand, config::ConfigCliCommand, contributor::ContributorCliCommand, device::DeviceCliCommand, exchange::ExchangeCliCommand, globalconfig::GlobalConfigCliCommand, @@ -66,6 +66,9 @@ pub enum Command { /// Manage multicast #[command()] Multicast(MulticastCliCommand), + /// Sentinel admin commands + #[command()] + Sentinel(SentinelCliCommand), /// Export all data to files #[command()] Export(ExportCliCommand), diff --git a/controlplane/doublezero-admin/src/cli/mod.rs b/controlplane/doublezero-admin/src/cli/mod.rs index 02ca83a5a6..3985558bed 100644 --- a/controlplane/doublezero-admin/src/cli/mod.rs +++ b/controlplane/doublezero-admin/src/cli/mod.rs @@ -10,5 +10,6 @@ pub mod location; pub mod multicast; pub mod multicastgroup; pub mod permission; +pub mod sentinel; pub mod tenant; pub mod user; diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs new file mode 100644 index 0000000000..61a8f536ea --- /dev/null +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -0,0 +1,601 @@ +use std::{collections::HashMap, io::Write, net::Ipv4Addr}; + +use clap::{Args, Subcommand}; +use doublezero_sdk::{DZClient, UserType}; +use doublezero_sentinel::{ + dz_ledger_reader::{self, DzLedgerReader, DzUser, RpcDzLedgerReader}, + dz_ledger_writer::build_create_multicast_publisher_instructions, + multicast_create::{find_candidates, CandidateFilters}, + multicast_find::{apply_filters, FindFilters}, + output::{print_table, OutputOptions}, + validator_metadata_reader::{ + HttpValidatorMetadataReader, ValidatorMetadataReader, DEFAULT_VALIDATOR_METADATA_URL, + }, +}; +use doublezero_serviceability::pda::get_tenant_pda; +use serde::Serialize; +use solana_client::rpc_client::RpcClient; +use solana_sdk::{ + commitment_config::CommitmentConfig, + instruction::Instruction, + pubkey::Pubkey, + signature::{Keypair, Signer}, + transaction::Transaction, +}; +use tabled::Tabled; + +#[derive(Args, Debug)] +pub struct SentinelCliCommand { + #[command(subcommand)] + pub command: SentinelCommands, +} + +#[derive(Debug, Subcommand)] +pub enum SentinelCommands { + /// Find IBRL validators and their multicast publisher status. + FindValidatorMulticastPublishers(FindValidatorMulticastPublishersCommand), + /// Create multicast publisher users for IBRL validators that don't have one yet. + CreateValidatorMulticastPublishers(CreateValidatorMulticastPublishersCommand), +} + +// --------------------------------------------------------------------------- +// Find command +// --------------------------------------------------------------------------- + +#[derive(Serialize, Tabled)] +struct ValidatorPublisherRow { + #[tabled(rename = "OWNER")] + owner: String, + #[tabled(rename = "CLIENT IP")] + client_ip: String, + #[tabled(rename = "DEVICE")] + device: String, + #[tabled(rename = "VOTE ACCOUNT")] + vote_account: String, + #[tabled(rename = "STAKE (SOL)")] + stake_sol: String, + #[tabled(rename = "CLIENT")] + client: String, + #[tabled(rename = "VERSION")] + version: String, + #[tabled(rename = "PUB")] + is_publisher: String, +} + +#[derive(Serialize, Tabled)] +struct SummaryRow { + #[tabled(rename = "CLIENT")] + client: String, + #[tabled(rename = "VALIDATORS")] + validators: usize, + #[tabled(rename = "ON DZ")] + on_dz: usize, + #[tabled(rename = "NOT ON DZ")] + not_on_dz: usize, + #[tabled(rename = "PUB")] + publishers: usize, + #[tabled(rename = "NOT PUB")] + not_publishers: usize, +} + +/// Find IBRL validators and their multicast publisher status. +#[derive(Debug, Args)] +pub struct FindValidatorMulticastPublishersCommand { + /// Filter by multicast group (pubkey or code, e.g. "edge-solana-shreds"). + #[arg(long, value_name = "KEY_OR_CODE")] + multicast_group: Option, + + /// Only show validators that are already a publisher. + #[arg(long)] + is_publisher: bool, + + /// Only show validators that are NOT a publisher. + #[arg(long)] + not_publisher: bool, + + /// Minimum activated stake in SOL to include. + #[arg(long, value_name = "SOL")] + min_stake: Option, + + /// Maximum activated stake in SOL to include. + #[arg(long, value_name = "SOL")] + max_stake: Option, + + /// Filter by validator client name (e.g. "JitoLabs", "AgaveBam", "Frankendancer"). + #[arg(long, value_name = "NAME")] + client: Option, + + /// Include validators not yet connected to DZ. + #[arg(long)] + include_not_on_dz: bool, + + /// Show a summary breakdown by client type instead of per-validator rows. + #[arg(long)] + summary: bool, + + /// Validator metadata service URL. + #[arg(long, value_name = "URL", default_value = DEFAULT_VALIDATOR_METADATA_URL)] + validator_metadata_url: String, + + #[command(flatten)] + output: OutputOptions, +} + +impl FindValidatorMulticastPublishersCommand { + pub async fn execute(self, dzclient: &DZClient) -> eyre::Result<()> { + let program_id = *dzclient.get_program_id(); + let rpc_client = dzclient.rpc_client(); + + let codes = dz_ledger_reader::fetch_device_codes(rpc_client, &program_id).ok(); + + let validator_metadata = HttpValidatorMetadataReader { + api_url: self.validator_metadata_url.clone(), + }; + let dz_ledger = RpcDzLedgerReader::new( + RpcClient::new_with_commitment( + dzclient.get_rpc().clone(), + CommitmentConfig::confirmed(), + ), + program_id, + ); + + // Resolve multicast group filter (pubkey or code). + let multicast_group_pk = match &self.multicast_group { + Some(key_or_code) => Some( + dz_ledger_reader::resolve_multicast_group(key_or_code, &dz_ledger) + .await + .map_err(|e| eyre::eyre!(e))?, + ), + None => None, + }; + + // Derive the solana tenant PDA to scope user queries. + let (solana_tenant_pk, _) = get_tenant_pda(&program_id, "solana"); + let default_tenant_pk = Pubkey::default(); + + eprintln!("Fetching DZ Ledger users and validator metadata..."); + let (all_users_unfiltered, validators) = tokio::try_join!( + async { + dz_ledger + .fetch_all_dz_users() + .await + .map_err(|e| eyre::eyre!(e)) + }, + async { + validator_metadata + .fetch_validators() + .await + .map_err(|e| eyre::eyre!(e)) + }, + )?; + + // Scope to solana tenant (or default/unset tenant). + let all_users: Vec<_> = all_users_unfiltered + .into_iter() + .filter(|u| u.tenant_pk == solana_tenant_pk || u.tenant_pk == default_tenant_pk) + .collect(); + + let ibrl_users: Vec<_> = all_users + .iter() + .filter(|u| { + u.user_type == UserType::IBRL || u.user_type == UserType::IBRLWithAllocatedIP + }) + .collect(); + let ibrl_ips: std::collections::HashSet = + ibrl_users.iter().map(|u| u.client_ip).collect(); + + // Build per-IP set of multicast groups the IP publishes to. + let mut publisher_groups_by_ip: HashMap> = + HashMap::new(); + for u in all_users + .iter() + .filter(|u| u.user_type == UserType::Multicast) + { + for pk in &u.publishers { + publisher_groups_by_ip + .entry(u.client_ip) + .or_default() + .insert(*pk); + } + } + + // User type breakdown. + let ibrl_count = all_users + .iter() + .filter(|u| u.user_type == UserType::IBRL) + .count(); + let ibrl_ip_count = all_users + .iter() + .filter(|u| u.user_type == UserType::IBRLWithAllocatedIP) + .count(); + let multicast_count = all_users + .iter() + .filter(|u| u.user_type == UserType::Multicast) + .count(); + let edge_count = all_users + .iter() + .filter(|u| u.user_type == UserType::EdgeFiltering) + .count(); + let other_count = + all_users.len() - ibrl_count - ibrl_ip_count - multicast_count - edge_count; + + // "On DZ" = validator's gossip IP matches an IBRL user's client IP. + let dz_validator_count = validators.keys().filter(|ip| ibrl_ips.contains(ip)).count(); + let not_dz_count = validators.len() - dz_validator_count; + eprintln!( + "User accounts: {} total ({} IBRL, {} IBRL+IP, {} Multicast, {} EdgeFiltering{})", + all_users.len(), + ibrl_count, + ibrl_ip_count, + multicast_count, + edge_count, + if other_count > 0 { + format!(", {} other", other_count) + } else { + String::new() + }, + ); + eprintln!( + "IBRL users: {} | Validators: {} ({} on DZ, {} not on DZ)", + ibrl_users.len(), + validators.len(), + dz_validator_count, + not_dz_count, + ); + + let filters = FindFilters { + min_stake: self.min_stake, + max_stake: self.max_stake, + client: self.client.clone(), + is_publisher: self.is_publisher, + not_publisher: self.not_publisher, + }; + + // Cross-reference IBRL users with validators by IP. + let mut rows: Vec = Vec::new(); + + for user in &ibrl_users { + if let Some(val) = validators.get(&user.client_ip) { + let is_pub = publisher_groups_by_ip + .get(&user.client_ip) + .is_some_and(|groups| match &multicast_group_pk { + Some(group) => groups.contains(group), + None => !groups.is_empty(), + }); + + if !apply_filters(&filters, val, is_pub) { + continue; + } + + let device_label = codes + .as_ref() + .and_then(|c| c.device_codes.get(&user.device_pk).cloned()) + .unwrap_or_else(|| user.device_pk.to_string()); + + rows.push(ValidatorPublisherRow { + owner: user.owner.to_string(), + client_ip: user.client_ip.to_string(), + device: device_label, + vote_account: val.vote_account.clone(), + stake_sol: format!("{:.2}", val.activated_stake_sol), + client: val.software_client.clone(), + version: val.software_version.clone(), + is_publisher: if is_pub { "yes" } else { "no" }.to_string(), + }); + } + } + + // Include validators not on DZ for summary or when explicitly requested. + if self.include_not_on_dz || self.summary { + for val in validators.values() { + if ibrl_ips.contains(&val.gossip_ip) { + continue; // already included above + } + + if !apply_filters(&filters, val, false) { + continue; + } + + rows.push(ValidatorPublisherRow { + owner: String::new(), + client_ip: val.gossip_ip.to_string(), + device: String::new(), + vote_account: val.vote_account.clone(), + stake_sol: format!("{:.2}", val.activated_stake_sol), + client: val.software_client.clone(), + version: val.software_version.clone(), + is_publisher: "no".to_string(), + }); + } + } + + // Sort by stake descending. + rows.sort_by(|a, b| { + let sa: f64 = a.stake_sol.parse().unwrap_or(0.0); + let sb: f64 = b.stake_sol.parse().unwrap_or(0.0); + sb.partial_cmp(&sa).unwrap_or(std::cmp::Ordering::Equal) + }); + + if self.summary { + // (count, on_dz, publishers) + let mut by_client: HashMap = HashMap::new(); + for row in &rows { + let entry = by_client.entry(row.client.clone()).or_insert((0, 0, 0)); + entry.0 += 1; + if !row.owner.is_empty() { + entry.1 += 1; // on DZ + } + if row.is_publisher == "yes" { + entry.2 += 1; + } + } + + let total = rows.len(); + let total_on_dz = rows.iter().filter(|r| !r.owner.is_empty()).count(); + let total_pubs = rows.iter().filter(|r| r.is_publisher == "yes").count(); + + let mut summary_rows: Vec = by_client + .into_iter() + .map(|(client, (count, on_dz, pubs))| SummaryRow { + client, + validators: count, + on_dz, + not_on_dz: count - on_dz, + publishers: pubs, + not_publishers: on_dz - pubs, + }) + .collect(); + summary_rows.sort_by(|a, b| b.validators.cmp(&a.validators)); + summary_rows.push(SummaryRow { + client: "TOTAL".to_string(), + validators: total, + on_dz: total_on_dz, + not_on_dz: total - total_on_dz, + publishers: total_pubs, + not_publishers: total_on_dz - total_pubs, + }); + print_table(summary_rows, &self.output, &[1, 2, 3, 4, 5]); + } else { + if rows.is_empty() { + if self.output.json { + println!("[]"); + } else { + eprintln!("No IBRL validators found matching filters."); + } + return Ok(()); + } + + if !self.output.json { + eprintln!("\nFound {} IBRL validator(s)\n", rows.len()); + } + + // right-align: STAKE (SOL) is column index 4 + print_table(rows, &self.output, &[4]); + } + + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Create command +// --------------------------------------------------------------------------- + +/// Create multicast publisher users for IBRL validators that don't have one yet. +#[derive(Debug, Args)] +pub struct CreateValidatorMulticastPublishersCommand { + /// Multicast group (pubkey or code, e.g. "edge-solana-shreds"). Required. + #[arg(long, value_name = "KEY_OR_CODE")] + multicast_group: String, + + /// Maximum number of users to create in this run. + #[arg(long, value_name = "N")] + limit: Option, + + /// Minimum activated stake in SOL to include. + #[arg(long, value_name = "SOL")] + min_stake: Option, + + /// Maximum activated stake in SOL to include. + #[arg(long, value_name = "SOL")] + max_stake: Option, + + /// Filter by validator client name (e.g. "JitoLabs", "AgaveBam", "Frankendancer"). + #[arg(long, value_name = "NAME")] + client: Option, + + /// Validator metadata service URL. + #[arg(long, value_name = "URL", default_value = DEFAULT_VALIDATOR_METADATA_URL)] + validator_metadata_url: String, + + /// Simulate transactions without sending. + #[arg(long)] + dry_run: bool, +} + +impl CreateValidatorMulticastPublishersCommand { + pub async fn execute(self, dzclient: &DZClient) -> eyre::Result<()> { + let program_id = *dzclient.get_program_id(); + let rpc_client = dzclient.rpc_client(); + let payer = dzclient + .payer_keypair() + .ok_or_else(|| eyre::eyre!("No keypair configured. Use --keypair to specify one."))?; + let payer_pk = payer.pubkey(); + + let codes = dz_ledger_reader::fetch_device_codes(rpc_client, &program_id).ok(); + + let validator_metadata = HttpValidatorMetadataReader { + api_url: self.validator_metadata_url.clone(), + }; + let dz_ledger = RpcDzLedgerReader::new( + RpcClient::new_with_commitment( + dzclient.get_rpc().clone(), + CommitmentConfig::confirmed(), + ), + program_id, + ); + + // Resolve multicast group. + let multicast_group_pk = + dz_ledger_reader::resolve_multicast_group(&self.multicast_group, &dz_ledger) + .await + .map_err(|e| eyre::eyre!(e))?; + eprintln!( + "Multicast group: {} ({})", + multicast_group_pk, self.multicast_group + ); + + // Fetch users and validator data. + eprintln!("Fetching DZ Ledger users and validator metadata..."); + let (all_users, validators) = tokio::try_join!( + async { + dz_ledger + .fetch_all_dz_users() + .await + .map_err(|e| eyre::eyre!(e)) + }, + async { + validator_metadata + .fetch_validators() + .await + .map_err(|e| eyre::eyre!(e)) + }, + )?; + + let device_labels: HashMap = codes + .as_ref() + .map(|c| c.device_codes.clone()) + .unwrap_or_default(); + + let filters = CandidateFilters { + min_stake: self.min_stake, + max_stake: self.max_stake, + client: self.client, + limit: self.limit, + }; + + let candidates = find_candidates( + &all_users, + &validators, + &multicast_group_pk, + &filters, + &device_labels, + ); + + if candidates.is_empty() { + eprintln!("No candidates found — all matching validators already have a publisher."); + return Ok(()); + } + + // Display plan. + eprintln!( + "\nWill create {} multicast publisher user(s) on group {}:\n", + candidates.len(), + self.multicast_group, + ); + eprintln!( + " {:<44} {:<15} {:<24} {:<20} {:>14}", + "OWNER", "CLIENT IP", "DEVICE", "CLIENT", "STAKE (SOL)", + ); + eprintln!(" {}", "-".repeat(125)); + for c in &candidates { + eprintln!( + " {:<44} {:<15} {:<24} {:<20} {:>14.2}", + c.owner, c.client_ip, c.device_label, c.software_client, c.stake_sol, + ); + } + eprintln!(); + + if self.dry_run { + eprintln!("Dry run — no transactions sent."); + return Ok(()); + } + + // Confirmation prompt. + eprint!("Proceed? [y/N] "); + std::io::stderr().flush()?; + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + let input = input.trim().to_lowercase(); + if input != "y" && input != "yes" { + eyre::bail!("Aborted"); + } + + // Execute: for each candidate, run the 3-step creation flow. + for (i, candidate) in candidates.iter().enumerate() { + eprintln!( + "\n[{}/{}] Creating multicast publisher for {} (ip: {}, device: {})...", + i + 1, + candidates.len(), + candidate.owner, + candidate.client_ip, + candidate.device_label, + ); + + let dz_user = DzUser { + owner: candidate.owner, + client_ip: candidate.client_ip, + device_pk: candidate.device_pk, + tenant_pk: Pubkey::default(), + user_type: doublezero_sdk::UserType::IBRL, + publishers: vec![], + }; + + let ixs = build_create_multicast_publisher_instructions( + &program_id, + &payer_pk, + &multicast_group_pk, + &dz_user, + ) + .map_err(|e| eyre::eyre!(e))?; + + send_instruction(rpc_client, payer, &[ixs.set_access_pass], "set_access_pass").await?; + send_instruction(rpc_client, payer, &[ixs.add_allowlist], "add_pub_allowlist").await?; + send_instruction( + rpc_client, + payer, + &[ixs.create_user], + "create_subscribe_user", + ) + .await?; + + eprintln!( + " Created multicast publisher for {} on {}", + candidate.client_ip, candidate.device_label, + ); + } + + eprintln!( + "\nDone — created {} multicast publisher(s).", + candidates.len() + ); + + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async fn send_instruction( + rpc_client: &RpcClient, + payer: &Keypair, + ixs: &[Instruction], + label: &str, +) -> eyre::Result<()> { + let blockhash = rpc_client + .get_latest_blockhash() + .map_err(|e| eyre::eyre!("failed to get blockhash for {label}: {e}"))?; + + let mut tx = Transaction::new_with_payer(ixs, Some(&payer.pubkey())); + tx.sign(&[payer], blockhash); + + let sig = rpc_client + .send_and_confirm_transaction(&tx) + .map_err(|e| eyre::eyre!("failed to send {label}: {e}"))?; + + eprintln!(" {label}: {sig}"); + + Ok(()) +} diff --git a/controlplane/doublezero-admin/src/main.rs b/controlplane/doublezero-admin/src/main.rs index a2dc1b0fd3..73f4986141 100644 --- a/controlplane/doublezero-admin/src/main.rs +++ b/controlplane/doublezero-admin/src/main.rs @@ -247,6 +247,15 @@ async fn main() -> eyre::Result<()> { }, }, + Command::Sentinel(args) => match args.command { + cli::sentinel::SentinelCommands::FindValidatorMulticastPublishers(cmd) => { + cmd.execute(&dzclient).await + } + cli::sentinel::SentinelCommands::CreateValidatorMulticastPublishers(cmd) => { + cmd.execute(&dzclient).await + } + }, + Command::Export(args) => args.execute(&client, &mut handle), Command::Keygen(args) => args.execute(&client, &mut handle), Command::Log(args) => args.execute(&dzclient, &mut handle), diff --git a/crates/sentinel/Cargo.toml b/crates/sentinel/Cargo.toml new file mode 100644 index 0000000000..fec968b219 --- /dev/null +++ b/crates/sentinel/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "doublezero-sentinel" +version = "0.0.1" + +# Workspace inherited keys +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +borsh.workspace = true +clap.workspace = true +doublezero_sdk.workspace = true +doublezero-serviceability.workspace = true +reqwest.workspace = true +serde.workspace = true +serde_json.workspace = true +solana-account-decoder.workspace = true +solana-client.workspace = true +solana-sdk.workspace = true +tabled.workspace = true +tokio.workspace = true + +[dev-dependencies] +mockall.workspace = true diff --git a/crates/sentinel/src/dz_ledger_reader.rs b/crates/sentinel/src/dz_ledger_reader.rs new file mode 100644 index 0000000000..321af95a66 --- /dev/null +++ b/crates/sentinel/src/dz_ledger_reader.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, net::Ipv4Addr}; + +use anyhow::{bail, Context, Result}; +use doublezero_sdk::{ + AccountData, AccountType, DeviceStatus, MulticastGroupStatus, UserStatus, UserType, +}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + rpc_client::RpcClient, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; +use solana_sdk::pubkey::Pubkey; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// DZ Ledger user info. +#[derive(Debug, Clone)] +pub struct DzUser { + pub owner: Pubkey, + pub client_ip: Ipv4Addr, + pub device_pk: Pubkey, + pub tenant_pk: Pubkey, + pub user_type: UserType, + pub publishers: Vec, +} + +/// Maps device pubkey → device code. +pub struct DzLedgerCodes { + pub device_codes: HashMap, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Client for querying DZ Ledger onchain state. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait DzLedgerReader: Send + Sync { + /// Fetch all activated users from the DZ Ledger. + async fn fetch_all_dz_users(&self) -> Result>; + + /// Resolve a multicast group code to its onchain pubkey. + async fn resolve_multicast_group_code(&self, code: &str) -> Result>; +} + +// --------------------------------------------------------------------------- +// RPC implementation +// --------------------------------------------------------------------------- + +pub struct RpcDzLedgerReader { + client: RpcClient, + program_id: Pubkey, +} + +impl RpcDzLedgerReader { + pub fn new(client: RpcClient, program_id: Pubkey) -> Self { + Self { client, program_id } + } +} + +#[async_trait::async_trait] +impl DzLedgerReader for RpcDzLedgerReader { + async fn fetch_all_dz_users(&self) -> Result> { + let user_type_byte = AccountType::User as u8; + let accounts = self + .client + .get_program_accounts_with_config( + &self.program_id, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![user_type_byte], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }, + ) + .context("failed to fetch User accounts from DZ Ledger")?; + + let mut users = Vec::new(); + for (_pk, account) in accounts { + let Ok(ad) = AccountData::try_from(account.data.as_slice()) else { + continue; + }; + let Ok(user) = ad.get_user() else { + continue; + }; + if user.status != UserStatus::Activated { + continue; + } + users.push(DzUser { + owner: user.owner, + client_ip: user.client_ip, + device_pk: user.device_pk, + tenant_pk: user.tenant_pk, + user_type: user.user_type, + publishers: user.publishers.clone(), + }); + } + + Ok(users) + } + + async fn resolve_multicast_group_code(&self, code: &str) -> Result> { + let mgroup_type_byte = AccountType::MulticastGroup as u8; + let accounts = self + .client + .get_program_accounts_with_config( + &self.program_id, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![mgroup_type_byte], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }, + ) + .context("failed to fetch MulticastGroup accounts from DZ Ledger")?; + + for (pk, account) in accounts { + let Ok(ad) = AccountData::try_from(account.data.as_slice()) else { + continue; + }; + let Ok(group) = ad.get_multicastgroup() else { + continue; + }; + if group.code == code && group.status == MulticastGroupStatus::Activated { + return Ok(Some(pk)); + } + } + + Ok(None) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Fetch device codes from the serviceability program. +pub fn fetch_device_codes(client: &RpcClient, program_id: &Pubkey) -> Result { + let device_accounts = client + .get_program_accounts_with_config( + program_id, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![AccountType::Device as u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }, + ) + .context("failed to fetch Device accounts from DZ Ledger")?; + + let mut device_codes = HashMap::new(); + for (pk, account) in device_accounts { + let Ok(ad) = AccountData::try_from(account.data.as_slice()) else { + continue; + }; + let Ok(device) = ad.get_device() else { + continue; + }; + if device.status == DeviceStatus::Activated { + device_codes.insert(pk, device.code.clone()); + } + } + + Ok(DzLedgerCodes { device_codes }) +} + +/// Resolve a multicast group key-or-code to a pubkey. +pub async fn resolve_multicast_group( + key_or_code: &str, + dz_client: &dyn DzLedgerReader, +) -> Result { + if let Ok(pk) = key_or_code.parse::() { + return Ok(pk); + } + + match dz_client.resolve_multicast_group_code(key_or_code).await? { + Some(pk) => { + eprintln!("Resolved multicast group '{key_or_code}' -> {pk}"); + Ok(pk) + } + None => { + bail!( + "Multicast group not found: {key_or_code} \ + (not a valid pubkey or known group code)" + ); + } + } +} diff --git a/crates/sentinel/src/dz_ledger_writer.rs b/crates/sentinel/src/dz_ledger_writer.rs new file mode 100644 index 0000000000..f8d258da9b --- /dev/null +++ b/crates/sentinel/src/dz_ledger_writer.rs @@ -0,0 +1,195 @@ +use anyhow::{Context, Result}; +use doublezero_serviceability::{ + instructions::DoubleZeroInstruction, + pda::{get_accesspass_pda, get_globalstate_pda, get_user_pda}, + processors::{ + accesspass::set::SetAccessPassArgs, + multicastgroup::allowlist::publisher::add::AddMulticastGroupPubAllowlistArgs, + user::create_subscribe::UserCreateSubscribeArgs, + }, + state::{ + accesspass::AccessPassType, + user::{UserCYOA, UserType as SvcUserType}, + }, +}; +use solana_sdk::{ + instruction::{AccountMeta, Instruction}, + pubkey::Pubkey, +}; + +use super::dz_ledger_reader::DzUser; + +/// The three instructions needed to create a multicast publisher onchain. +pub struct CreateMulticastPublisherInstructions { + pub set_access_pass: Instruction, + pub add_allowlist: Instruction, + pub create_user: Instruction, +} + +/// Build the three instructions needed to create a multicast publisher for a user. +pub fn build_create_multicast_publisher_instructions( + program_id: &Pubkey, + payer: &Pubkey, + multicast_group_pk: &Pubkey, + user: &DzUser, +) -> Result { + let (accesspass_pda, _) = get_accesspass_pda(program_id, &user.client_ip, payer); + let (globalstate_pda, _) = get_globalstate_pda(program_id); + + // Step 1: set_access_pass + let set_access_pass = build_instruction( + program_id, + DoubleZeroInstruction::SetAccessPass(SetAccessPassArgs { + accesspass_type: AccessPassType::Prepaid, + client_ip: user.client_ip, + last_access_epoch: u64::MAX, + allow_multiple_ip: false, + }), + vec![ + AccountMeta::new(accesspass_pda, false), + AccountMeta::new_readonly(globalstate_pda, false), + AccountMeta::new(*payer, false), + AccountMeta::new(*payer, true), + AccountMeta::new_readonly(solana_sdk::system_program::ID, false), + ], + )?; + + // Step 2: add_multicast_publisher_allowlist + let add_allowlist = build_instruction( + program_id, + DoubleZeroInstruction::AddMulticastGroupPubAllowlist(AddMulticastGroupPubAllowlistArgs { + client_ip: user.client_ip, + user_payer: *payer, + }), + vec![ + AccountMeta::new(*multicast_group_pk, false), + AccountMeta::new(accesspass_pda, false), + AccountMeta::new_readonly(globalstate_pda, false), + AccountMeta::new(*payer, true), + AccountMeta::new_readonly(solana_sdk::system_program::ID, false), + ], + )?; + + // Step 3: create_subscribe_user (as publisher) + let (user_pda, _) = get_user_pda(program_id, &user.client_ip, SvcUserType::Multicast); + let create_user = build_instruction( + program_id, + DoubleZeroInstruction::CreateSubscribeUser(UserCreateSubscribeArgs { + user_type: SvcUserType::Multicast, + cyoa_type: UserCYOA::GREOverDIA, + client_ip: user.client_ip, + publisher: true, + subscriber: false, + tunnel_endpoint: std::net::Ipv4Addr::UNSPECIFIED, + dz_prefix_count: 0, + owner: Pubkey::default(), + }), + vec![ + AccountMeta::new(user_pda, false), + AccountMeta::new(user.device_pk, false), + AccountMeta::new(*multicast_group_pk, false), + AccountMeta::new(accesspass_pda, false), + AccountMeta::new_readonly(globalstate_pda, false), + AccountMeta::new(*payer, true), + AccountMeta::new_readonly(solana_sdk::system_program::ID, false), + ], + )?; + + Ok(CreateMulticastPublisherInstructions { + set_access_pass, + add_allowlist, + create_user, + }) +} + +fn build_instruction( + program_id: &Pubkey, + dz_ix: DoubleZeroInstruction, + accounts: Vec, +) -> Result { + let data = borsh::to_vec(&dz_ix).context("failed to serialize instruction")?; + Ok(Instruction { + program_id: *program_id, + accounts, + data, + }) +} + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use doublezero_sdk::UserType; + + use super::*; + + #[test] + fn build_instructions_returns_three_instructions() { + let program_id = Pubkey::new_unique(); + let payer = Pubkey::new_unique(); + let multicast_group = Pubkey::new_unique(); + let user = DzUser { + owner: Pubkey::new_unique(), + client_ip: Ipv4Addr::new(10, 0, 0, 1), + device_pk: Pubkey::new_unique(), + tenant_pk: Pubkey::default(), + user_type: UserType::IBRL, + publishers: vec![], + }; + + let ixs = build_create_multicast_publisher_instructions( + &program_id, + &payer, + &multicast_group, + &user, + ) + .unwrap(); + + // All three instructions target the correct program. + assert_eq!(ixs.set_access_pass.program_id, program_id); + assert_eq!(ixs.add_allowlist.program_id, program_id); + assert_eq!(ixs.create_user.program_id, program_id); + + // Each has non-empty data (serialized instruction). + assert!(!ixs.set_access_pass.data.is_empty()); + assert!(!ixs.add_allowlist.data.is_empty()); + assert!(!ixs.create_user.data.is_empty()); + + // set_access_pass: 5 accounts (accesspass_pda, globalstate, payer×2, system_program) + assert_eq!(ixs.set_access_pass.accounts.len(), 5); + // add_allowlist: 5 accounts (multicast_group, accesspass_pda, globalstate, payer, system_program) + assert_eq!(ixs.add_allowlist.accounts.len(), 5); + // create_user: 7 accounts (user_pda, device, multicast_group, accesspass_pda, globalstate, payer, system_program) + assert_eq!(ixs.create_user.accounts.len(), 7); + } + + #[test] + fn payer_is_signer_in_all_instructions() { + let program_id = Pubkey::new_unique(); + let payer = Pubkey::new_unique(); + let multicast_group = Pubkey::new_unique(); + let user = DzUser { + owner: Pubkey::new_unique(), + client_ip: Ipv4Addr::new(10, 0, 0, 1), + device_pk: Pubkey::new_unique(), + tenant_pk: Pubkey::default(), + user_type: UserType::IBRL, + publishers: vec![], + }; + + let ixs = build_create_multicast_publisher_instructions( + &program_id, + &payer, + &multicast_group, + &user, + ) + .unwrap(); + + for ix in [&ixs.set_access_pass, &ixs.add_allowlist, &ixs.create_user] { + assert!( + ix.accounts.iter().any(|a| a.pubkey == payer && a.is_signer), + "payer should be a signer" + ); + } + } +} diff --git a/crates/sentinel/src/lib.rs b/crates/sentinel/src/lib.rs new file mode 100644 index 0000000000..3a8f717bca --- /dev/null +++ b/crates/sentinel/src/lib.rs @@ -0,0 +1,6 @@ +pub mod dz_ledger_reader; +pub mod dz_ledger_writer; +pub mod multicast_create; +pub mod multicast_find; +pub mod output; +pub mod validator_metadata_reader; diff --git a/crates/sentinel/src/multicast_create.rs b/crates/sentinel/src/multicast_create.rs new file mode 100644 index 0000000000..7bbb991948 --- /dev/null +++ b/crates/sentinel/src/multicast_create.rs @@ -0,0 +1,333 @@ +use std::{ + collections::{HashMap, HashSet}, + net::Ipv4Addr, +}; + +use doublezero_sdk::UserType; +use solana_sdk::pubkey::Pubkey; + +use crate::{dz_ledger_reader::DzUser, validator_metadata_reader::ValidatorRecord}; + +/// A validator that needs a multicast publisher user created. +pub struct Candidate { + pub owner: Pubkey, + pub client_ip: Ipv4Addr, + pub device_pk: Pubkey, + pub vote_account: String, + pub stake_sol: f64, + pub software_client: String, + pub device_label: String, +} + +/// Filters for candidate selection. +pub struct CandidateFilters { + pub min_stake: Option, + pub max_stake: Option, + pub client: Option, + pub limit: Option, +} + +/// Find IBRL validators that need a multicast publisher created. +/// +/// Pure function: no I/O, no network calls. Returns candidates sorted by stake +/// descending, with limit applied. +pub fn find_candidates( + all_users: &[DzUser], + validators: &HashMap, + multicast_group_pk: &Pubkey, + filters: &CandidateFilters, + device_labels: &HashMap, +) -> Vec { + let ibrl_users: Vec<_> = all_users + .iter() + .filter(|u| u.user_type == UserType::IBRL || u.user_type == UserType::IBRLWithAllocatedIP) + .collect(); + + // Build set of IPs that already publish to this group. + let publisher_ips: HashSet = all_users + .iter() + .filter(|u| u.user_type == UserType::Multicast && u.publishers.contains(multicast_group_pk)) + .map(|u| u.client_ip) + .collect(); + + let mut candidates: Vec = Vec::new(); + for user in &ibrl_users { + if publisher_ips.contains(&user.client_ip) { + continue; + } + + let Some(val) = validators.get(&user.client_ip) else { + continue; + }; + + if let Some(min) = filters.min_stake { + if val.activated_stake_sol < min { + continue; + } + } + if let Some(max) = filters.max_stake { + if val.activated_stake_sol > max { + continue; + } + } + if let Some(ref client_filter) = filters.client { + if !val + .software_client + .to_lowercase() + .contains(&client_filter.to_lowercase()) + { + continue; + } + } + + let device_label = device_labels + .get(&user.device_pk) + .cloned() + .unwrap_or_else(|| user.device_pk.to_string()); + + candidates.push(Candidate { + owner: user.owner, + client_ip: user.client_ip, + device_pk: user.device_pk, + vote_account: val.vote_account.clone(), + stake_sol: val.activated_stake_sol, + software_client: val.software_client.clone(), + device_label, + }); + } + + // Sort by stake descending. + candidates.sort_by(|a, b| { + b.stake_sol + .partial_cmp(&a.stake_sol) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + if let Some(limit) = filters.limit { + candidates.truncate(limit); + } + + candidates +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use solana_sdk::pubkey::Pubkey; + + use super::*; + + fn make_ibrl_user(ip: [u8; 4], owner: Pubkey, device_pk: Pubkey) -> DzUser { + DzUser { + owner, + client_ip: Ipv4Addr::from(ip), + device_pk, + tenant_pk: Pubkey::default(), + user_type: UserType::IBRL, + publishers: vec![], + } + } + + fn make_multicast_user(ip: [u8; 4], groups: Vec) -> DzUser { + DzUser { + owner: Pubkey::new_unique(), + client_ip: Ipv4Addr::from(ip), + device_pk: Pubkey::new_unique(), + tenant_pk: Pubkey::default(), + user_type: UserType::Multicast, + publishers: groups, + } + } + + fn make_validator(ip: Ipv4Addr, stake: f64, client: &str) -> ValidatorRecord { + ValidatorRecord { + vote_account: Pubkey::new_unique().to_string(), + software_client: client.to_string(), + software_version: "1.0.0".to_string(), + activated_stake_sol: stake, + gossip_ip: ip, + } + } + + fn no_filters() -> CandidateFilters { + CandidateFilters { + min_stake: None, + max_stake: None, + client: None, + limit: None, + } + } + + #[test] + fn no_validators_returns_empty() { + let group = Pubkey::new_unique(); + let users = vec![make_ibrl_user( + [10, 0, 0, 1], + Pubkey::new_unique(), + Pubkey::new_unique(), + )]; + let validators = HashMap::new(); + + let result = find_candidates(&users, &validators, &group, &no_filters(), &HashMap::new()); + assert!(result.is_empty()); + } + + #[test] + fn no_ibrl_users_returns_empty() { + let group = Pubkey::new_unique(); + let ip = Ipv4Addr::new(10, 0, 0, 1); + let users = vec![make_multicast_user([10, 0, 0, 1], vec![])]; + let mut validators = HashMap::new(); + validators.insert(ip, make_validator(ip, 1000.0, "agave")); + + let result = find_candidates(&users, &validators, &group, &no_filters(), &HashMap::new()); + assert!(result.is_empty()); + } + + #[test] + fn existing_publishers_are_skipped() { + let group = Pubkey::new_unique(); + let ip = [10, 0, 0, 1]; + let ip_addr = Ipv4Addr::from(ip); + + let users = vec![ + make_ibrl_user(ip, Pubkey::new_unique(), Pubkey::new_unique()), + make_multicast_user(ip, vec![group]), + ]; + let mut validators = HashMap::new(); + validators.insert(ip_addr, make_validator(ip_addr, 1000.0, "agave")); + + let result = find_candidates(&users, &validators, &group, &no_filters(), &HashMap::new()); + assert!(result.is_empty()); + } + + #[test] + fn min_stake_filter() { + let group = Pubkey::new_unique(); + let ip1 = Ipv4Addr::new(10, 0, 0, 1); + let ip2 = Ipv4Addr::new(10, 0, 0, 2); + + let users = vec![ + make_ibrl_user([10, 0, 0, 1], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 2], Pubkey::new_unique(), Pubkey::new_unique()), + ]; + let mut validators = HashMap::new(); + validators.insert(ip1, make_validator(ip1, 500.0, "agave")); + validators.insert(ip2, make_validator(ip2, 1500.0, "agave")); + + let filters = CandidateFilters { + min_stake: Some(1000.0), + ..no_filters() + }; + + let result = find_candidates(&users, &validators, &group, &filters, &HashMap::new()); + assert_eq!(result.len(), 1); + assert_eq!(result[0].client_ip, ip2); + } + + #[test] + fn max_stake_filter() { + let group = Pubkey::new_unique(); + let ip1 = Ipv4Addr::new(10, 0, 0, 1); + let ip2 = Ipv4Addr::new(10, 0, 0, 2); + + let users = vec![ + make_ibrl_user([10, 0, 0, 1], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 2], Pubkey::new_unique(), Pubkey::new_unique()), + ]; + let mut validators = HashMap::new(); + validators.insert(ip1, make_validator(ip1, 500.0, "agave")); + validators.insert(ip2, make_validator(ip2, 1500.0, "agave")); + + let filters = CandidateFilters { + max_stake: Some(1000.0), + ..no_filters() + }; + + let result = find_candidates(&users, &validators, &group, &filters, &HashMap::new()); + assert_eq!(result.len(), 1); + assert_eq!(result[0].client_ip, ip1); + } + + #[test] + fn client_filter_case_insensitive() { + let group = Pubkey::new_unique(); + let ip1 = Ipv4Addr::new(10, 0, 0, 1); + let ip2 = Ipv4Addr::new(10, 0, 0, 2); + + let users = vec![ + make_ibrl_user([10, 0, 0, 1], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 2], Pubkey::new_unique(), Pubkey::new_unique()), + ]; + let mut validators = HashMap::new(); + validators.insert(ip1, make_validator(ip1, 1000.0, "Jito-Solana")); + validators.insert(ip2, make_validator(ip2, 1000.0, "Agave")); + + let filters = CandidateFilters { + client: Some("jito".to_string()), + ..no_filters() + }; + + let result = find_candidates(&users, &validators, &group, &filters, &HashMap::new()); + assert_eq!(result.len(), 1); + assert_eq!(result[0].client_ip, ip1); + } + + #[test] + fn sorted_by_stake_descending() { + let group = Pubkey::new_unique(); + let ip1 = Ipv4Addr::new(10, 0, 0, 1); + let ip2 = Ipv4Addr::new(10, 0, 0, 2); + let ip3 = Ipv4Addr::new(10, 0, 0, 3); + + let users = vec![ + make_ibrl_user([10, 0, 0, 1], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 2], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 3], Pubkey::new_unique(), Pubkey::new_unique()), + ]; + let mut validators = HashMap::new(); + validators.insert(ip1, make_validator(ip1, 500.0, "agave")); + validators.insert(ip2, make_validator(ip2, 2000.0, "agave")); + validators.insert(ip3, make_validator(ip3, 1000.0, "agave")); + + let result = find_candidates(&users, &validators, &group, &no_filters(), &HashMap::new()); + assert_eq!(result.len(), 3); + assert_eq!(result[0].client_ip, ip2); // 2000 + assert_eq!(result[1].client_ip, ip3); // 1000 + assert_eq!(result[2].client_ip, ip1); // 500 + } + + #[test] + fn limit_applied_after_sort() { + let group = Pubkey::new_unique(); + let ip1 = Ipv4Addr::new(10, 0, 0, 1); + let ip2 = Ipv4Addr::new(10, 0, 0, 2); + let ip3 = Ipv4Addr::new(10, 0, 0, 3); + + let users = vec![ + make_ibrl_user([10, 0, 0, 1], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 2], Pubkey::new_unique(), Pubkey::new_unique()), + make_ibrl_user([10, 0, 0, 3], Pubkey::new_unique(), Pubkey::new_unique()), + ]; + let mut validators = HashMap::new(); + validators.insert(ip1, make_validator(ip1, 500.0, "agave")); + validators.insert(ip2, make_validator(ip2, 2000.0, "agave")); + validators.insert(ip3, make_validator(ip3, 1000.0, "agave")); + + let filters = CandidateFilters { + limit: Some(2), + ..no_filters() + }; + + let result = find_candidates(&users, &validators, &group, &filters, &HashMap::new()); + assert_eq!(result.len(), 2); + // Top 2 by stake: ip2 (2000) and ip3 (1000) + assert_eq!(result[0].client_ip, ip2); + assert_eq!(result[1].client_ip, ip3); + } +} diff --git a/crates/sentinel/src/multicast_find.rs b/crates/sentinel/src/multicast_find.rs new file mode 100644 index 0000000000..0c59081cd9 --- /dev/null +++ b/crates/sentinel/src/multicast_find.rs @@ -0,0 +1,162 @@ +use crate::validator_metadata_reader::ValidatorRecord; + +/// Filter parameters for the find command. +pub struct FindFilters { + pub min_stake: Option, + pub max_stake: Option, + pub client: Option, + pub is_publisher: bool, + pub not_publisher: bool, +} + +/// Apply filters to a validator record. +pub fn apply_filters(filters: &FindFilters, val: &ValidatorRecord, is_pub: bool) -> bool { + if let Some(min) = filters.min_stake { + if val.activated_stake_sol < min { + return false; + } + } + if let Some(max) = filters.max_stake { + if val.activated_stake_sol > max { + return false; + } + } + if let Some(ref client_filter) = filters.client { + if !val + .software_client + .to_lowercase() + .contains(&client_filter.to_lowercase()) + { + return false; + } + } + if filters.is_publisher && !is_pub { + return false; + } + if filters.not_publisher && is_pub { + return false; + } + true +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + + use super::*; + + fn make_validator(ip: Ipv4Addr, stake: f64, client: &str) -> ValidatorRecord { + ValidatorRecord { + vote_account: String::new(), + software_client: client.to_string(), + software_version: String::new(), + activated_stake_sol: stake, + gossip_ip: ip, + } + } + + fn base_filters() -> FindFilters { + FindFilters { + min_stake: None, + max_stake: None, + client: None, + is_publisher: false, + not_publisher: false, + } + } + + #[test] + fn filter_min_stake() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 500.0, "agave"); + let filters = FindFilters { + min_stake: Some(1000.0), + ..base_filters() + }; + assert!(!apply_filters(&filters, &val, false)); + + let filters = FindFilters { + min_stake: Some(100.0), + ..base_filters() + }; + assert!(apply_filters(&filters, &val, false)); + } + + #[test] + fn filter_max_stake() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 1500.0, "agave"); + let filters = FindFilters { + max_stake: Some(1000.0), + ..base_filters() + }; + assert!(!apply_filters(&filters, &val, false)); + + let filters = FindFilters { + max_stake: Some(2000.0), + ..base_filters() + }; + assert!(apply_filters(&filters, &val, false)); + } + + #[test] + fn filter_client_case_insensitive() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 1000.0, "Jito-Solana"); + + let filters = FindFilters { + client: Some("jito".to_string()), + ..base_filters() + }; + assert!(apply_filters(&filters, &val, false)); + + let filters = FindFilters { + client: Some("agave".to_string()), + ..base_filters() + }; + assert!(!apply_filters(&filters, &val, false)); + } + + #[test] + fn filter_is_publisher() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 1000.0, "agave"); + + let filters = FindFilters { + is_publisher: true, + ..base_filters() + }; + assert!(!apply_filters(&filters, &val, false)); + assert!(apply_filters(&filters, &val, true)); + } + + #[test] + fn filter_not_publisher() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 1000.0, "agave"); + + let filters = FindFilters { + not_publisher: true, + ..base_filters() + }; + assert!(apply_filters(&filters, &val, false)); + assert!(!apply_filters(&filters, &val, true)); + } + + #[test] + fn combined_filters() { + let val = make_validator(Ipv4Addr::new(1, 2, 3, 4), 1500.0, "Jito-Solana"); + + // Passes all: stake in range, client matches, is publisher + let filters = FindFilters { + min_stake: Some(1000.0), + max_stake: Some(2000.0), + client: Some("jito".to_string()), + is_publisher: true, + not_publisher: false, + }; + assert!(apply_filters(&filters, &val, true)); + + // Fails: not a publisher but is_publisher required + assert!(!apply_filters(&filters, &val, false)); + } +} diff --git a/crates/sentinel/src/output.rs b/crates/sentinel/src/output.rs new file mode 100644 index 0000000000..999bcab144 --- /dev/null +++ b/crates/sentinel/src/output.rs @@ -0,0 +1,40 @@ +use clap::Args; +use serde::Serialize; +use tabled::{ + settings::{object::Columns, Alignment, Style}, + Table, Tabled, +}; + +#[derive(Debug, Args)] +pub struct OutputOptions { + /// Output as JSON instead of a table. + #[arg(long)] + pub json: bool, +} + +/// Print a collection of rows as a markdown table, or as JSON if `json` is set. +/// `right_aligned` specifies column indices that should be right-aligned. +pub fn print_table( + rows: Vec, + options: &OutputOptions, + right_aligned: &[usize], +) { + if options.json { + println!( + "{}", + serde_json::to_string_pretty(&rows).expect("JSON serialization") + ); + return; + } + + if rows.is_empty() { + return; + } + + let mut table = Table::new(rows); + table.with(Style::markdown()); + for &col in right_aligned { + table.modify(Columns::new(col..=col), Alignment::right()); + } + println!("{table}"); +} diff --git a/crates/sentinel/src/validator_metadata_reader.rs b/crates/sentinel/src/validator_metadata_reader.rs new file mode 100644 index 0000000000..3d1b35420e --- /dev/null +++ b/crates/sentinel/src/validator_metadata_reader.rs @@ -0,0 +1,93 @@ +use std::{collections::HashMap, net::Ipv4Addr}; + +use anyhow::{bail, Context, Result}; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// Validator metadata from the validator metadata service (client name, version, etc.). +#[derive(Debug, Clone)] +pub struct ValidatorRecord { + pub vote_account: String, + pub software_client: String, + pub software_version: String, + pub activated_stake_sol: f64, + pub gossip_ip: Ipv4Addr, +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Source for validator metadata not available onchain (client name, version, etc.). +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait ValidatorMetadataReader: Send + Sync { + /// Fetch active validators with their metadata, keyed by IP. + async fn fetch_validators(&self) -> Result>; +} + +// --------------------------------------------------------------------------- +// HTTP implementation +// --------------------------------------------------------------------------- + +pub const DEFAULT_VALIDATOR_METADATA_URL: &str = + "https://data.malbeclabs.com/api/v1/validators-metadata"; + +pub struct HttpValidatorMetadataReader { + pub api_url: String, +} + +#[derive(serde::Deserialize)] +struct ValidatorMetadataItem { + ip: String, + active_stake: i64, + vote_account: String, + software_client: String, + software_version: String, +} + +#[async_trait::async_trait] +impl ValidatorMetadataReader for HttpValidatorMetadataReader { + async fn fetch_validators(&self) -> Result> { + let client = reqwest::Client::new(); + let resp = client + .get(&self.api_url) + .send() + .await + .context("failed to fetch validator metadata")?; + + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + bail!("validator metadata service returned {status}: {body}"); + } + + let items: Vec = resp + .json() + .await + .context("failed to parse validator metadata response")?; + + let mut map = HashMap::new(); + for item in items { + let gossip_ip: Ipv4Addr = match item.ip.parse() { + Ok(ip) => ip, + Err(_) => continue, + }; + + map.insert( + gossip_ip, + ValidatorRecord { + vote_account: item.vote_account, + software_client: item.software_client, + software_version: item.software_version, + activated_stake_sol: item.active_stake as f64 / 1_000_000_000.0, + gossip_ip, + }, + ); + } + + Ok(map) + } +} diff --git a/smartcontract/sdk/rs/src/client.rs b/smartcontract/sdk/rs/src/client.rs index ff243aaa09..a648f4af09 100644 --- a/smartcontract/sdk/rs/src/client.rs +++ b/smartcontract/sdk/rs/src/client.rs @@ -105,6 +105,14 @@ impl DZClient { &self.rpc_url } + pub fn rpc_client(&self) -> &RpcClient { + &self.client + } + + pub fn payer_keypair(&self) -> Option<&Keypair> { + self.payer.as_ref() + } + pub fn get_ws(&self) -> &String { &self.rpc_ws_url } From faefd710b77ce756c4951e76cbadcffd4a17e7e9 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Tue, 31 Mar 2026 17:03:41 -0400 Subject: [PATCH 2/2] admin: address sentinel PR review feedback Switch RpcDzLedgerReader to use the nonblocking Solana RPC client so tokio::try_join! actually runs fetches concurrently. Add solana tenant filtering to the create command to match the find command. --- .../doublezero-admin/src/cli/sentinel.rs | 20 +++++++++++++++---- crates/sentinel/src/dz_ledger_reader.rs | 7 +++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index 61a8f536ea..aced019011 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -14,7 +14,9 @@ use doublezero_sentinel::{ }; use doublezero_serviceability::pda::get_tenant_pda; use serde::Serialize; -use solana_client::rpc_client::RpcClient; +use solana_client::{ + nonblocking::rpc_client::RpcClient as NonblockingRpcClient, rpc_client::RpcClient, +}; use solana_sdk::{ commitment_config::CommitmentConfig, instruction::Instruction, @@ -132,7 +134,7 @@ impl FindValidatorMulticastPublishersCommand { api_url: self.validator_metadata_url.clone(), }; let dz_ledger = RpcDzLedgerReader::new( - RpcClient::new_with_commitment( + NonblockingRpcClient::new_with_commitment( dzclient.get_rpc().clone(), CommitmentConfig::confirmed(), ), @@ -428,7 +430,7 @@ impl CreateValidatorMulticastPublishersCommand { api_url: self.validator_metadata_url.clone(), }; let dz_ledger = RpcDzLedgerReader::new( - RpcClient::new_with_commitment( + NonblockingRpcClient::new_with_commitment( dzclient.get_rpc().clone(), CommitmentConfig::confirmed(), ), @@ -445,9 +447,13 @@ impl CreateValidatorMulticastPublishersCommand { multicast_group_pk, self.multicast_group ); + // Derive the solana tenant PDA to scope user queries. + let (solana_tenant_pk, _) = get_tenant_pda(&program_id, "solana"); + let default_tenant_pk = Pubkey::default(); + // Fetch users and validator data. eprintln!("Fetching DZ Ledger users and validator metadata..."); - let (all_users, validators) = tokio::try_join!( + let (all_users_unfiltered, validators) = tokio::try_join!( async { dz_ledger .fetch_all_dz_users() @@ -462,6 +468,12 @@ impl CreateValidatorMulticastPublishersCommand { }, )?; + // Scope to solana tenant (or default/unset tenant). + let all_users: Vec<_> = all_users_unfiltered + .into_iter() + .filter(|u| u.tenant_pk == solana_tenant_pk || u.tenant_pk == default_tenant_pk) + .collect(); + let device_labels: HashMap = codes .as_ref() .map(|c| c.device_codes.clone()) diff --git a/crates/sentinel/src/dz_ledger_reader.rs b/crates/sentinel/src/dz_ledger_reader.rs index 321af95a66..9c3809abff 100644 --- a/crates/sentinel/src/dz_ledger_reader.rs +++ b/crates/sentinel/src/dz_ledger_reader.rs @@ -6,6 +6,7 @@ use doublezero_sdk::{ }; use solana_account_decoder::UiAccountEncoding; use solana_client::{ + nonblocking::rpc_client::RpcClient as NonblockingRpcClient, rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, RpcFilterType}, @@ -52,12 +53,12 @@ pub trait DzLedgerReader: Send + Sync { // --------------------------------------------------------------------------- pub struct RpcDzLedgerReader { - client: RpcClient, + client: NonblockingRpcClient, program_id: Pubkey, } impl RpcDzLedgerReader { - pub fn new(client: RpcClient, program_id: Pubkey) -> Self { + pub fn new(client: NonblockingRpcClient, program_id: Pubkey) -> Self { Self { client, program_id } } } @@ -82,6 +83,7 @@ impl DzLedgerReader for RpcDzLedgerReader { ..Default::default() }, ) + .await .context("failed to fetch User accounts from DZ Ledger")?; let mut users = Vec::new(); @@ -126,6 +128,7 @@ impl DzLedgerReader for RpcDzLedgerReader { ..Default::default() }, ) + .await .context("failed to fetch MulticastGroup accounts from DZ Ledger")?; for (pk, account) in accounts {