From b72b64da74200e543ece79d5fe9f6e4c56e94fda Mon Sep 17 00:00:00 2001 From: jjy Date: Wed, 3 Sep 2025 12:44:07 +0800 Subject: [PATCH 1/4] graph_channels read from memory instead of from DB --- Cargo.lock | 17 ++++---- crates/fiber-lib/Cargo.toml | 1 + crates/fiber-lib/src/fiber/graph.rs | 62 +++++++++-------------------- crates/fiber-lib/src/rpc/graph.rs | 23 +++++------ 4 files changed, 39 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ecf1ebad5..c73200db9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,7 @@ dependencies = [ "hex", "home", "hyper 1.6.0", + "indexmap 2.11.0", "indicatif", "jsonrpsee", "lightning-invoice", @@ -2492,7 +2493,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.10.0", + "indexmap 2.11.0", "slab", "tokio", "tokio-util", @@ -2511,7 +2512,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.10.0", + "indexmap 2.11.0", "slab", "tokio", "tokio-util", @@ -3031,9 +3032,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", "hashbrown 0.15.4", @@ -3982,7 +3983,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.10.0", + "indexmap 2.11.0", ] [[package]] @@ -5126,7 +5127,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.10.0", + "indexmap 2.11.0", "schemars 0.9.0", "schemars 1.0.4", "serde", @@ -5154,7 +5155,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "itoa", "ryu", "serde", @@ -5831,7 +5832,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "toml_datetime", "winnow", ] diff --git a/crates/fiber-lib/Cargo.toml b/crates/fiber-lib/Cargo.toml index a220f06cd..6f8561f24 100644 --- a/crates/fiber-lib/Cargo.toml +++ b/crates/fiber-lib/Cargo.toml @@ -57,6 +57,7 @@ thiserror = "1.0.58" tokio-util = {version = "0.7.10", features = ["rt"]} tracing = "0.1" tracing-subscriber = {version = "0.3", features = ["env-filter"]} +indexmap = "2.11.0" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] biscuit-auth = "6.0.0-beta.3" diff --git a/crates/fiber-lib/src/fiber/graph.rs b/crates/fiber-lib/src/fiber/graph.rs index 5424b47f1..372533038 100644 --- a/crates/fiber-lib/src/fiber/graph.rs +++ b/crates/fiber-lib/src/fiber/graph.rs @@ -22,6 +22,7 @@ use crate::fiber::types::PaymentHopData; use crate::invoice::CkbInvoice; use crate::now_timestamp_as_millis_u64; use ckb_types::packed::{OutPoint, Script}; +use indexmap::IndexMap; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -85,6 +86,7 @@ impl From for NodeInfo { #[derive(Clone, Debug, PartialEq)] pub struct ChannelInfo { pub channel_outpoint: OutPoint, + pub is_public: bool, // The timestamp in the block header of the block that includes the funding transaction of the channel. pub timestamp: u64, @@ -195,8 +197,10 @@ impl TryFrom<&ChannelActorState> for ChannelInfo { Some(state.get_local_channel_update_info()), ) }; + let is_public = state.is_public(); Ok(Self { channel_outpoint, + is_public, timestamp, features: 0, node1, @@ -213,6 +217,7 @@ impl From<(u64, ChannelAnnouncement)> for ChannelInfo { fn from((timestamp, channel_announcement): (u64, ChannelAnnouncement)) -> Self { Self { channel_outpoint: channel_announcement.channel_outpoint, + is_public: true, timestamp, features: channel_announcement.features, node1: channel_announcement.node1_id, @@ -441,7 +446,7 @@ pub struct NetworkGraph { // The pubkey of the node that is running this instance of the network graph. source: Pubkey, // All the channels in the network. - pub(crate) channels: HashMap, + pub(crate) channels: IndexMap, // All the nodes in the network. nodes: HashMap, @@ -520,7 +525,7 @@ where #[cfg(any(test, feature = "bench"))] always_process_gossip_message: false, source, - channels: HashMap::new(), + channels: IndexMap::new(), channel_stats: Default::default(), nodes: HashMap::new(), latest_cursor: Cursor::default(), @@ -609,7 +614,7 @@ where .insert(channel_info.channel_outpoint.clone(), channel_info); } OwnedChannelUpdateEvent::Down(channel_outpoint) => { - self.channels.remove(&channel_outpoint); + self.channels.swap_remove(&channel_outpoint); self.channel_stats.lock().remove_channel(&channel_outpoint); } OwnedChannelUpdateEvent::Updated(channel_outpoint, node, channel_update) => { @@ -653,19 +658,6 @@ where self.history.reset(); } - fn load_channel_updates_from_store(&self, channel_info: &mut ChannelInfo) { - let channel_update_of_node1 = self - .store - .get_latest_channel_update(&channel_info.channel_outpoint, true) - .map(Into::into); - let channel_update_of_node2 = self - .store - .get_latest_channel_update(&channel_info.channel_outpoint, false) - .map(Into::into); - channel_info.update_of_node1 = channel_update_of_node1; - channel_info.update_of_node2 = channel_update_of_node2; - } - fn load_channel_info_mut(&mut self, channel_outpoint: &OutPoint) -> Option<&mut ChannelInfo> { if !self.channels.contains_key(channel_outpoint) { if let Some((timestamp, channel_announcement)) = @@ -907,35 +899,19 @@ where } } - pub fn get_channels_with_params( - &self, - limit: usize, - after: Option, - ) -> Vec { - let cursor = after.unwrap_or_default(); - self.store - .get_broadcast_messages_iter(&cursor) - .into_iter() - .filter_map(|message| match message { - BroadcastMessageWithTimestamp::ChannelAnnouncement( - timestamp, - channel_announcement, - ) => { - let mut channel_info = ChannelInfo::from((timestamp, channel_announcement)); - self.load_channel_updates_from_store(&mut channel_info); - - // assuming channel is closed if disabled from the both side - let is_closed = channel_info.update_of_node1.is_some_and(|u| !u.enabled) - && channel_info.update_of_node2.is_some_and(|u| !u.enabled); - if !is_closed { - Some(channel_info) - } else { - None - } + pub fn get_channels_with_params(&self, limit: usize, after: Option) -> Vec { + let after = after.unwrap_or_default(); + self.channels + .iter() + .skip(after as usize) + .take(limit) + .filter_map(|(_out_point, channel_info)| { + if channel_info.is_public { + Some(channel_info.to_owned()) + } else { + None } - _ => None, }) - .take(limit) .collect() } diff --git a/crates/fiber-lib/src/rpc/graph.rs b/crates/fiber-lib/src/rpc/graph.rs index b0f62bf60..13f7f3d48 100644 --- a/crates/fiber-lib/src/rpc/graph.rs +++ b/crates/fiber-lib/src/rpc/graph.rs @@ -357,20 +357,17 @@ where let default_max_limit = 500; let network_graph = self.network_graph.read().await; let limit = params.limit.unwrap_or(default_max_limit) as usize; - let cursor = params - .after - .as_ref() - .map(|cursor| Cursor::from_bytes(cursor.as_bytes())) - .transpose() - .map_err(|e| { - ErrorObjectOwned::owned(INVALID_PARAMS_CODE, e.to_string(), Some(params)) - })?; + let after = params.after.as_ref().map(|after| { + let buf: [u8; 8] = after.as_bytes().try_into().unwrap_or_default(); + u64::from_le_bytes(buf) + }); - let channels = network_graph.get_channels_with_params(limit, cursor); - let last_cursor = channels - .last() - .map(|node| JsonBytes::from_vec(node.cursor().to_bytes().into())) - .unwrap_or_default(); + let channels = network_graph.get_channels_with_params(limit, after); + let last_cursor = JsonBytes::from_vec( + (after.unwrap_or_default() + channels.len() as u64) + .to_le_bytes() + .to_vec(), + ); let channels = channels.into_iter().map(Into::into).collect(); Ok(GraphChannelsResult { From 67e5b68df03057190f53bd3be4f2eb20cc796589 Mon Sep 17 00:00:00 2001 From: jjy Date: Wed, 3 Sep 2025 14:05:43 +0800 Subject: [PATCH 2/4] graph_nodes read from memory; add total_count to graph RPC --- crates/fiber-lib/src/fiber/graph.rs | 34 ++++++++++-------- crates/fiber-lib/src/fiber/tests/rpc.rs | 33 +++++++++++++++++- crates/fiber-lib/src/rpc/graph.rs | 46 ++++++++++++++++--------- migrate/Cargo.lock | 17 ++++----- 4 files changed, 90 insertions(+), 40 deletions(-) diff --git a/crates/fiber-lib/src/fiber/graph.rs b/crates/fiber-lib/src/fiber/graph.rs index 372533038..c0222942c 100644 --- a/crates/fiber-lib/src/fiber/graph.rs +++ b/crates/fiber-lib/src/fiber/graph.rs @@ -445,10 +445,12 @@ pub struct NetworkGraph { pub always_process_gossip_message: bool, // The pubkey of the node that is running this instance of the network graph. source: Pubkey, + // The count of private channels + pub(crate) private_channels_count: usize, // All the channels in the network. pub(crate) channels: IndexMap, // All the nodes in the network. - nodes: HashMap, + pub(crate) nodes: IndexMap, // Channel stats map, used to track the attempts for each channel, // this information is used to HELP the path finding algorithm for better routing in two ways: @@ -525,9 +527,10 @@ where #[cfg(any(test, feature = "bench"))] always_process_gossip_message: false, source, + private_channels_count: 0, channels: IndexMap::new(), channel_stats: Default::default(), - nodes: HashMap::new(), + nodes: IndexMap::new(), latest_cursor: Cursor::default(), store: store.clone(), history: PaymentHistory::new(source, None, store), @@ -610,11 +613,18 @@ where // so we can just overwrite the old channel info. self.history .remove_channel_history(&channel_info.channel_outpoint); + if !channel_info.is_public { + self.private_channels_count += 1; + } self.channels .insert(channel_info.channel_outpoint.clone(), channel_info); } OwnedChannelUpdateEvent::Down(channel_outpoint) => { - self.channels.swap_remove(&channel_outpoint); + if let Some(channel_info) = self.channels.swap_remove(&channel_outpoint) { + if !channel_info.is_public { + self.private_channels_count -= 1; + } + } self.channel_stats.lock().remove_channel(&channel_outpoint); } OwnedChannelUpdateEvent::Updated(channel_outpoint, node, channel_update) => { @@ -841,18 +851,13 @@ where self.nodes.values() } - pub fn get_nodes_with_params(&self, limit: usize, after: Option) -> Vec { - let cursor = after.unwrap_or_default(); - self.store - .get_broadcast_messages_iter(&cursor) - .into_iter() - .filter_map(|message| match message { - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) => { - Some(NodeInfo::from(node_announcement)) - } - _ => None, - }) + pub fn get_nodes_with_params(&self, limit: usize, after: Option) -> Vec { + let after = after.unwrap_or_default(); + self.nodes + .iter() + .skip(after as usize) .take(limit) + .map(|(_pubkey, node)| node.to_owned()) .collect() } @@ -1089,6 +1094,7 @@ where self.channels.clear(); self.nodes.clear(); self.history.reset(); + self.private_channels_count = 0; } #[cfg(any(test, feature = "bench"))] diff --git a/crates/fiber-lib/src/fiber/tests/rpc.rs b/crates/fiber-lib/src/fiber/tests/rpc.rs index 5bdc50ed1..36169381c 100644 --- a/crates/fiber-lib/src/fiber/tests/rpc.rs +++ b/crates/fiber-lib/src/fiber/tests/rpc.rs @@ -4,6 +4,7 @@ use crate::gen_rand_sha256_hash; use crate::invoice::CkbInvoice; use crate::rpc::channel::{ChannelState, ShutdownChannelParams}; use crate::rpc::config::RpcConfig; +use crate::rpc::graph::{GraphChannelsParams, GraphChannelsResult}; use crate::rpc::info::NodeInfoResult; use crate::tests::*; use crate::{ @@ -251,7 +252,7 @@ async fn test_rpc_graph() { ( (0, 1), ChannelParameters { - public: true, + public: false, node_a_funding_amount: MIN_RESERVED_CKB + 10000000000, node_b_funding_amount: MIN_RESERVED_CKB, ..Default::default() @@ -277,6 +278,7 @@ async fn test_rpc_graph() { eprintln!("Graph nodes: {:#?}", graph_nodes); + assert_eq!(graph_nodes.total_count.value(), 2); assert!(!graph_nodes.nodes.is_empty()); assert!(graph_nodes.nodes.iter().any(|n| n.node_id == node_1.pubkey)); assert!(graph_nodes @@ -284,6 +286,35 @@ async fn test_rpc_graph() { .iter() .all(|n| n.version == *env!("CARGO_PKG_VERSION"))); assert!(!graph_nodes.nodes[0].features.is_empty()); + + let graph_nodes: GraphNodesResult = node_0 + .send_rpc_request( + "graph_nodes", + GraphNodesParams { + limit: Some(1), + after: None, + }, + ) + .await + .unwrap(); + + assert_eq!(graph_nodes.total_count.value(), 2); + assert_eq!(graph_nodes.nodes.len(), 1); + + let graph_channels: GraphChannelsResult = node_0 + .send_rpc_request( + "graph_channels", + GraphChannelsParams { + limit: None, + after: None, + }, + ) + .await + .unwrap(); + + // only public channels + assert_eq!(graph_channels.total_count.value(), 1); + assert_eq!(graph_channels.channels.len(), 1); } #[tokio::test] diff --git a/crates/fiber-lib/src/rpc/graph.rs b/crates/fiber-lib/src/rpc/graph.rs index 13f7f3d48..c5c3c6702 100644 --- a/crates/fiber-lib/src/rpc/graph.rs +++ b/crates/fiber-lib/src/rpc/graph.rs @@ -8,13 +8,15 @@ use crate::fiber::graph::{ChannelUpdateInfo, NetworkGraph, NetworkGraphStateStor use crate::fiber::network::get_chain_hash; use crate::fiber::serde_utils::EntityHex; use crate::fiber::serde_utils::{U128Hex, U64Hex}; -use crate::fiber::types::{Cursor, Hash256, Pubkey}; -use ckb_jsonrpc_types::{DepType, JsonBytes, OutPoint as OutPointWrapper, Script, ScriptHashType}; +use crate::fiber::types::{Hash256, Pubkey}; +use ckb_jsonrpc_types::{ + DepType, JsonBytes, OutPoint as OutPointWrapper, Script, ScriptHashType, Uint64, +}; use ckb_types::packed::OutPoint; use ckb_types::H256; #[cfg(not(target_arch = "wasm32"))] use jsonrpsee::proc_macros::rpc; -use jsonrpsee::{types::error::INVALID_PARAMS_CODE, types::ErrorObjectOwned}; +use jsonrpsee::types::ErrorObjectOwned; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -182,6 +184,8 @@ pub struct GraphNodesResult { pub nodes: Vec, /// The last cursor. pub last_cursor: JsonBytes, + /// The total count of nodes + pub total_count: Uint64, } #[serde_as] @@ -247,6 +251,8 @@ pub struct GraphChannelsResult { pub channels: Vec, /// The last cursor for pagination. pub last_cursor: JsonBytes, + /// The total count of channels + pub total_count: Uint64, } /// RPC module for graph management. @@ -332,22 +338,24 @@ where let network_graph = self.network_graph.read().await; let default_max_limit = 500; let limit = params.limit.unwrap_or(default_max_limit) as usize; - let cursor = params - .after - .as_ref() - .map(|cursor| Cursor::from_bytes(cursor.as_bytes())) - .transpose() - .map_err(|e| { - ErrorObjectOwned::owned(INVALID_PARAMS_CODE, e.to_string(), Some(params)) - })?; - let nodes = network_graph.get_nodes_with_params(limit, cursor); - let last_cursor = nodes - .last() - .map(|node| JsonBytes::from_vec(node.cursor().to_bytes().into())) - .unwrap_or_default(); + let after = params.after.as_ref().map(|after| { + let buf: [u8; 8] = after.as_bytes().try_into().unwrap_or_default(); + u64::from_le_bytes(buf) + }); + let nodes = network_graph.get_nodes_with_params(limit, after); + let last_cursor = JsonBytes::from_vec( + (after.unwrap_or_default() + nodes.len() as u64) + .to_le_bytes() + .to_vec(), + ); let nodes = nodes.into_iter().map(Into::into).collect(); + let total_count = (network_graph.nodes.len() as u64).into(); - Ok(GraphNodesResult { nodes, last_cursor }) + Ok(GraphNodesResult { + nodes, + last_cursor, + total_count, + }) } pub async fn graph_channels( @@ -370,9 +378,13 @@ where ); let channels = channels.into_iter().map(Into::into).collect(); + let public_channels_count = + network_graph.channels.len() - network_graph.private_channels_count; + let total_count = (public_channels_count as u64).into(); Ok(GraphChannelsResult { channels, last_cursor, + total_count, }) } } diff --git a/migrate/Cargo.lock b/migrate/Cargo.lock index e157f5cc9..4d3d3594c 100644 --- a/migrate/Cargo.lock +++ b/migrate/Cargo.lock @@ -1906,6 +1906,7 @@ dependencies = [ "hex", "home", "hyper 1.6.0", + "indexmap 2.11.0", "indicatif", "jsonrpsee", "lightning-invoice", @@ -2277,7 +2278,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.10.0", + "indexmap 2.11.0", "slab", "tokio", "tokio-util", @@ -2296,7 +2297,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.10.0", + "indexmap 2.11.0", "slab", "tokio", "tokio-util", @@ -2786,9 +2787,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", "hashbrown 0.15.4", @@ -3710,7 +3711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.10.0", + "indexmap 2.11.0", ] [[package]] @@ -4795,7 +4796,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.10.0", + "indexmap 2.11.0", "schemars 0.9.0", "schemars 1.0.4", "serde", @@ -4823,7 +4824,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "itoa", "ryu", "serde", @@ -5491,7 +5492,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.11.0", "toml_datetime", "winnow", ] From e4a217005051d894e7d4296c4589ab1f5ddce583 Mon Sep 17 00:00:00 2001 From: jjy Date: Wed, 3 Sep 2025 14:47:06 +0800 Subject: [PATCH 3/4] update rpc README.md --- crates/fiber-lib/src/rpc/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/fiber-lib/src/rpc/README.md b/crates/fiber-lib/src/rpc/README.md index addd0fc47..bf1334030 100644 --- a/crates/fiber-lib/src/rpc/README.md +++ b/crates/fiber-lib/src/rpc/README.md @@ -463,6 +463,7 @@ Get the list of nodes in the network graph. * `nodes` - Vec<[NodeInfo](#type-nodeinfo)>, The list of nodes. * `last_cursor` - `JsonBytes`, The last cursor. +* `total_count` - `Uint64`, The total count of nodes --- @@ -482,6 +483,7 @@ Get the list of channels in the network graph. * `channels` - Vec<[ChannelInfo](#type-channelinfo)>, A list of channels. * `last_cursor` - `JsonBytes`, The last cursor for pagination. +* `total_count` - `Uint64`, The total count of channels --- From b44759ce45cd538861ab1a5a2c09fdbf270f648a Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 12 Sep 2025 20:01:48 +0800 Subject: [PATCH 4/4] Use BTreeMap to store graph nodes and channels --- Cargo.lock | 1 - crates/fiber-lib/Cargo.toml | 1 - crates/fiber-lib/src/fiber/graph.rs | 77 +++++++++++++++---------- crates/fiber-lib/src/fiber/tests/rpc.rs | 47 ++++++++++++++- crates/fiber-lib/src/rpc/graph.rs | 32 +++++----- migrate/Cargo.lock | 1 - 6 files changed, 111 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c73200db9..5828fd58f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,7 +2136,6 @@ dependencies = [ "hex", "home", "hyper 1.6.0", - "indexmap 2.11.0", "indicatif", "jsonrpsee", "lightning-invoice", diff --git a/crates/fiber-lib/Cargo.toml b/crates/fiber-lib/Cargo.toml index 6f8561f24..a220f06cd 100644 --- a/crates/fiber-lib/Cargo.toml +++ b/crates/fiber-lib/Cargo.toml @@ -57,7 +57,6 @@ thiserror = "1.0.58" tokio-util = {version = "0.7.10", features = ["rt"]} tracing = "0.1" tracing-subscriber = {version = "0.3", features = ["env-filter"]} -indexmap = "2.11.0" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] biscuit-auth = "6.0.0-beta.3" diff --git a/crates/fiber-lib/src/fiber/graph.rs b/crates/fiber-lib/src/fiber/graph.rs index c0222942c..40b63688b 100644 --- a/crates/fiber-lib/src/fiber/graph.rs +++ b/crates/fiber-lib/src/fiber/graph.rs @@ -22,11 +22,11 @@ use crate::fiber::types::PaymentHopData; use crate::invoice::CkbInvoice; use crate::now_timestamp_as_millis_u64; use ckb_types::packed::{OutPoint, Script}; -use indexmap::IndexMap; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::ops::Bound; use std::sync::Arc; use tentacle::multiaddr::MultiAddr; use tentacle::secio::PeerId; @@ -448,9 +448,9 @@ pub struct NetworkGraph { // The count of private channels pub(crate) private_channels_count: usize, // All the channels in the network. - pub(crate) channels: IndexMap, + pub(crate) channels: BTreeMap, // All the nodes in the network. - pub(crate) nodes: IndexMap, + pub(crate) nodes: BTreeMap, // Channel stats map, used to track the attempts for each channel, // this information is used to HELP the path finding algorithm for better routing in two ways: @@ -528,9 +528,9 @@ where always_process_gossip_message: false, source, private_channels_count: 0, - channels: IndexMap::new(), + channels: BTreeMap::new(), channel_stats: Default::default(), - nodes: IndexMap::new(), + nodes: BTreeMap::new(), latest_cursor: Cursor::default(), store: store.clone(), history: PaymentHistory::new(source, None, store), @@ -620,7 +620,7 @@ where .insert(channel_info.channel_outpoint.clone(), channel_info); } OwnedChannelUpdateEvent::Down(channel_outpoint) => { - if let Some(channel_info) = self.channels.swap_remove(&channel_outpoint) { + if let Some(channel_info) = self.channels.remove(&channel_outpoint) { if !channel_info.is_public { self.private_channels_count -= 1; } @@ -851,14 +851,21 @@ where self.nodes.values() } - pub fn get_nodes_with_params(&self, limit: usize, after: Option) -> Vec { - let after = after.unwrap_or_default(); - self.nodes - .iter() - .skip(after as usize) - .take(limit) - .map(|(_pubkey, node)| node.to_owned()) - .collect() + pub fn get_nodes_with_params(&self, limit: usize, after: Option) -> Vec { + match after { + Some(after) => self + .nodes + .range((Bound::Excluded(after), Bound::Unbounded)) + .take(limit) + .map(|(_pubkey, node)| node.to_owned()) + .collect(), + None => self + .nodes + .iter() + .take(limit) + .map(|(_pubkey, node)| node.to_owned()) + .collect(), + } } pub fn get_node(&self, node_id: &Pubkey) -> Option<&NodeInfo> { @@ -904,20 +911,32 @@ where } } - pub fn get_channels_with_params(&self, limit: usize, after: Option) -> Vec { - let after = after.unwrap_or_default(); - self.channels - .iter() - .skip(after as usize) - .take(limit) - .filter_map(|(_out_point, channel_info)| { - if channel_info.is_public { - Some(channel_info.to_owned()) - } else { - None - } - }) - .collect() + pub fn get_channels_with_params( + &self, + limit: usize, + after: Option, + ) -> Vec { + let filter = |(_out_point, channel_info): (&OutPoint, &ChannelInfo)| { + if channel_info.is_public { + Some(channel_info.to_owned()) + } else { + None + } + }; + match after { + Some(after) => self + .channels + .range((Bound::Excluded(after), Bound::Unbounded)) + .take(limit) + .filter_map(filter) + .collect(), + None => self + .channels + .iter() + .take(limit) + .filter_map(filter) + .collect(), + } } pub fn get_channels_by_peer(&self, node_id: Pubkey) -> impl Iterator { diff --git a/crates/fiber-lib/src/fiber/tests/rpc.rs b/crates/fiber-lib/src/fiber/tests/rpc.rs index 36169381c..8a6a3642f 100644 --- a/crates/fiber-lib/src/fiber/tests/rpc.rs +++ b/crates/fiber-lib/src/fiber/tests/rpc.rs @@ -287,7 +287,8 @@ async fn test_rpc_graph() { .all(|n| n.version == *env!("CARGO_PKG_VERSION"))); assert!(!graph_nodes.nodes[0].features.is_empty()); - let graph_nodes: GraphNodesResult = node_0 + // query nodes by page + let graph_nodes_p1: GraphNodesResult = node_0 .send_rpc_request( "graph_nodes", GraphNodesParams { @@ -298,8 +299,33 @@ async fn test_rpc_graph() { .await .unwrap(); - assert_eq!(graph_nodes.total_count.value(), 2); - assert_eq!(graph_nodes.nodes.len(), 1); + assert_eq!(graph_nodes_p1.total_count.value(), 2); + assert_eq!(graph_nodes_p1.nodes.len(), 1); + + let graph_nodes_p2: GraphNodesResult = node_0 + .send_rpc_request( + "graph_nodes", + GraphNodesParams { + limit: Some(1), + after: Some(graph_nodes_p1.last_cursor), + }, + ) + .await + .unwrap(); + + assert_eq!(graph_nodes_p2.total_count.value(), 2); + assert_eq!(graph_nodes_p2.nodes.len(), 1); + + let mut nodes = graph_nodes_p1.nodes.clone(); + nodes.extend(graph_nodes_p2.nodes.clone()); + assert_eq!( + graph_nodes + .nodes + .iter() + .map(|n| n.node_id) + .collect::>(), + nodes.iter().map(|n| n.node_id).collect::>(), + ); let graph_channels: GraphChannelsResult = node_0 .send_rpc_request( @@ -315,6 +341,21 @@ async fn test_rpc_graph() { // only public channels assert_eq!(graph_channels.total_count.value(), 1); assert_eq!(graph_channels.channels.len(), 1); + + // next query is empty + let graph_channels: GraphChannelsResult = node_0 + .send_rpc_request( + "graph_channels", + GraphChannelsParams { + limit: Some(1), + after: Some(graph_channels.last_cursor), + }, + ) + .await + .unwrap(); + + assert_eq!(graph_channels.total_count.value(), 1); + assert!(graph_channels.channels.is_empty()); } #[tokio::test] diff --git a/crates/fiber-lib/src/rpc/graph.rs b/crates/fiber-lib/src/rpc/graph.rs index c5c3c6702..9084111c0 100644 --- a/crates/fiber-lib/src/rpc/graph.rs +++ b/crates/fiber-lib/src/rpc/graph.rs @@ -18,6 +18,7 @@ use ckb_types::H256; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::ErrorObjectOwned; +use molecule::prelude::Entity; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::sync::Arc; @@ -338,15 +339,16 @@ where let network_graph = self.network_graph.read().await; let default_max_limit = 500; let limit = params.limit.unwrap_or(default_max_limit) as usize; - let after = params.after.as_ref().map(|after| { - let buf: [u8; 8] = after.as_bytes().try_into().unwrap_or_default(); - u64::from_le_bytes(buf) - }); + let after = params + .after + .as_ref() + .and_then(|after| Pubkey::from_slice(after.as_bytes()).ok()); let nodes = network_graph.get_nodes_with_params(limit, after); let last_cursor = JsonBytes::from_vec( - (after.unwrap_or_default() + nodes.len() as u64) - .to_le_bytes() - .to_vec(), + nodes + .last() + .map(|n| n.node_id.serialize().to_vec()) + .unwrap_or_default(), ); let nodes = nodes.into_iter().map(Into::into).collect(); let total_count = (network_graph.nodes.len() as u64).into(); @@ -365,16 +367,20 @@ where let default_max_limit = 500; let network_graph = self.network_graph.read().await; let limit = params.limit.unwrap_or(default_max_limit) as usize; - let after = params.after.as_ref().map(|after| { - let buf: [u8; 8] = after.as_bytes().try_into().unwrap_or_default(); - u64::from_le_bytes(buf) + let after = params.after.as_ref().and_then(|after| { + if after.is_empty() { + None + } else { + OutPoint::from_slice(after.as_bytes()).ok() + } }); let channels = network_graph.get_channels_with_params(limit, after); let last_cursor = JsonBytes::from_vec( - (after.unwrap_or_default() + channels.len() as u64) - .to_le_bytes() - .to_vec(), + channels + .last() + .map(|c| c.channel_outpoint.as_slice().to_vec()) + .unwrap_or_default(), ); let channels = channels.into_iter().map(Into::into).collect(); diff --git a/migrate/Cargo.lock b/migrate/Cargo.lock index 4d3d3594c..75f9fe703 100644 --- a/migrate/Cargo.lock +++ b/migrate/Cargo.lock @@ -1906,7 +1906,6 @@ dependencies = [ "hex", "home", "hyper 1.6.0", - "indexmap 2.11.0", "indicatif", "jsonrpsee", "lightning-invoice",