Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 59 additions & 58 deletions crates/fiber-lib/src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use ckb_types::packed::{OutPoint, Script};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Bound;
use std::sync::Arc;
use tentacle::multiaddr::MultiAddr;
use tentacle::secio::PeerId;
Expand Down Expand Up @@ -85,6 +86,7 @@ impl From<NodeAnnouncement> for NodeInfo {
#[derive(Clone, Debug, PartialEq)]
pub struct ChannelInfo {
pub channel_outpoint: OutPoint,
pub is_public: bool,
// The timestamp in the block header of the block that includes the funding transaction of the channel.
pub timestamp: u64,

Expand Down Expand Up @@ -195,8 +197,10 @@ impl TryFrom<&ChannelActorState> for ChannelInfo {
Some(state.get_local_channel_update_info()),
)
};
let is_public = state.is_public();
Ok(Self {
channel_outpoint,
is_public,
timestamp,
features: 0,
node1,
Expand All @@ -213,6 +217,7 @@ impl From<(u64, ChannelAnnouncement)> for ChannelInfo {
fn from((timestamp, channel_announcement): (u64, ChannelAnnouncement)) -> Self {
Self {
channel_outpoint: channel_announcement.channel_outpoint,
is_public: true,
timestamp,
features: channel_announcement.features,
node1: channel_announcement.node1_id,
Expand Down Expand Up @@ -440,10 +445,12 @@ pub struct NetworkGraph<S> {
pub always_process_gossip_message: bool,
// The pubkey of the node that is running this instance of the network graph.
source: Pubkey,
// The count of private channels
pub(crate) private_channels_count: usize,
// All the channels in the network.
pub(crate) channels: HashMap<OutPoint, ChannelInfo>,
pub(crate) channels: BTreeMap<OutPoint, ChannelInfo>,
// All the nodes in the network.
nodes: HashMap<Pubkey, NodeInfo>,
pub(crate) nodes: BTreeMap<Pubkey, NodeInfo>,

// Channel stats map, used to track the attempts for each channel,
// this information is used to HELP the path finding algorithm for better routing in two ways:
Expand Down Expand Up @@ -520,9 +527,10 @@ where
#[cfg(any(test, feature = "bench"))]
always_process_gossip_message: false,
source,
channels: HashMap::new(),
private_channels_count: 0,
channels: BTreeMap::new(),
channel_stats: Default::default(),
nodes: HashMap::new(),
nodes: BTreeMap::new(),
latest_cursor: Cursor::default(),
store: store.clone(),
history: PaymentHistory::new(source, None, store),
Expand Down Expand Up @@ -605,11 +613,18 @@ where
// so we can just overwrite the old channel info.
self.history
.remove_channel_history(&channel_info.channel_outpoint);
if !channel_info.is_public {
self.private_channels_count += 1;
}
self.channels
.insert(channel_info.channel_outpoint.clone(), channel_info);
}
OwnedChannelUpdateEvent::Down(channel_outpoint) => {
self.channels.remove(&channel_outpoint);
if let Some(channel_info) = self.channels.remove(&channel_outpoint) {
if !channel_info.is_public {
self.private_channels_count -= 1;
}
}
self.channel_stats.lock().remove_channel(&channel_outpoint);
}
OwnedChannelUpdateEvent::Updated(channel_outpoint, node, channel_update) => {
Expand Down Expand Up @@ -653,19 +668,6 @@ where
self.history.reset();
}

fn load_channel_updates_from_store(&self, channel_info: &mut ChannelInfo) {
let channel_update_of_node1 = self
.store
.get_latest_channel_update(&channel_info.channel_outpoint, true)
.map(Into::into);
let channel_update_of_node2 = self
.store
.get_latest_channel_update(&channel_info.channel_outpoint, false)
.map(Into::into);
channel_info.update_of_node1 = channel_update_of_node1;
channel_info.update_of_node2 = channel_update_of_node2;
}

fn load_channel_info_mut(&mut self, channel_outpoint: &OutPoint) -> Option<&mut ChannelInfo> {
if !self.channels.contains_key(channel_outpoint) {
if let Some((timestamp, channel_announcement)) =
Expand Down Expand Up @@ -849,19 +851,21 @@ where
self.nodes.values()
}

pub fn get_nodes_with_params(&self, limit: usize, after: Option<Cursor>) -> Vec<NodeInfo> {
let cursor = after.unwrap_or_default();
self.store
.get_broadcast_messages_iter(&cursor)
.into_iter()
.filter_map(|message| match message {
BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) => {
Some(NodeInfo::from(node_announcement))
}
_ => None,
})
.take(limit)
.collect()
pub fn get_nodes_with_params(&self, limit: usize, after: Option<Pubkey>) -> Vec<NodeInfo> {
match after {
Some(after) => self
.nodes
.range((Bound::Excluded(after), Bound::Unbounded))
.take(limit)
.map(|(_pubkey, node)| node.to_owned())
.collect(),
None => self
.nodes
.iter()
.take(limit)
.map(|(_pubkey, node)| node.to_owned())
.collect(),
}
}

pub fn get_node(&self, node_id: &Pubkey) -> Option<&NodeInfo> {
Expand Down Expand Up @@ -910,33 +914,29 @@ where
pub fn get_channels_with_params(
&self,
limit: usize,
after: Option<Cursor>,
after: Option<OutPoint>,
) -> Vec<ChannelInfo> {
let cursor = after.unwrap_or_default();
self.store
.get_broadcast_messages_iter(&cursor)
.into_iter()
.filter_map(|message| match message {
BroadcastMessageWithTimestamp::ChannelAnnouncement(
timestamp,
channel_announcement,
) => {
let mut channel_info = ChannelInfo::from((timestamp, channel_announcement));
self.load_channel_updates_from_store(&mut channel_info);

// assuming channel is closed if disabled from the both side
let is_closed = channel_info.update_of_node1.is_some_and(|u| !u.enabled)
&& channel_info.update_of_node2.is_some_and(|u| !u.enabled);
if !is_closed {
Some(channel_info)
} else {
None
}
}
_ => None,
})
.take(limit)
.collect()
let filter = |(_out_point, channel_info): (&OutPoint, &ChannelInfo)| {
if channel_info.is_public {
Some(channel_info.to_owned())
} else {
None
}
};
match after {
Some(after) => self
.channels
.range((Bound::Excluded(after), Bound::Unbounded))
.take(limit)
.filter_map(filter)
.collect(),
None => self
.channels
.iter()
.take(limit)
.filter_map(filter)
.collect(),
}
}

pub fn get_channels_by_peer(&self, node_id: Pubkey) -> impl Iterator<Item = &ChannelInfo> {
Expand Down Expand Up @@ -1113,6 +1113,7 @@ where
self.channels.clear();
self.nodes.clear();
self.history.reset();
self.private_channels_count = 0;
}

#[cfg(any(test, feature = "bench"))]
Expand Down
74 changes: 73 additions & 1 deletion crates/fiber-lib/src/fiber/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::gen_rand_sha256_hash;
use crate::invoice::CkbInvoice;
use crate::rpc::channel::{ChannelState, ShutdownChannelParams};
use crate::rpc::config::RpcConfig;
use crate::rpc::graph::{GraphChannelsParams, GraphChannelsResult};
use crate::rpc::info::NodeInfoResult;
use crate::tests::*;
use crate::{
Expand Down Expand Up @@ -251,7 +252,7 @@ async fn test_rpc_graph() {
(
(0, 1),
ChannelParameters {
public: true,
public: false,
node_a_funding_amount: MIN_RESERVED_CKB + 10000000000,
node_b_funding_amount: MIN_RESERVED_CKB,
..Default::default()
Expand All @@ -277,13 +278,84 @@ async fn test_rpc_graph() {

eprintln!("Graph nodes: {:#?}", graph_nodes);

assert_eq!(graph_nodes.total_count.value(), 2);
assert!(!graph_nodes.nodes.is_empty());
assert!(graph_nodes.nodes.iter().any(|n| n.node_id == node_1.pubkey));
assert!(graph_nodes
.nodes
.iter()
.all(|n| n.version == *env!("CARGO_PKG_VERSION")));
assert!(!graph_nodes.nodes[0].features.is_empty());

// query nodes by page
let graph_nodes_p1: GraphNodesResult = node_0
.send_rpc_request(
"graph_nodes",
GraphNodesParams {
limit: Some(1),
after: None,
},
)
.await
.unwrap();

assert_eq!(graph_nodes_p1.total_count.value(), 2);
assert_eq!(graph_nodes_p1.nodes.len(), 1);

let graph_nodes_p2: GraphNodesResult = node_0
.send_rpc_request(
"graph_nodes",
GraphNodesParams {
limit: Some(1),
after: Some(graph_nodes_p1.last_cursor),
},
)
.await
.unwrap();

assert_eq!(graph_nodes_p2.total_count.value(), 2);
assert_eq!(graph_nodes_p2.nodes.len(), 1);

let mut nodes = graph_nodes_p1.nodes.clone();
nodes.extend(graph_nodes_p2.nodes.clone());
assert_eq!(
graph_nodes
.nodes
.iter()
.map(|n| n.node_id)
.collect::<Vec<_>>(),
nodes.iter().map(|n| n.node_id).collect::<Vec<_>>(),
);

let graph_channels: GraphChannelsResult = node_0
.send_rpc_request(
"graph_channels",
GraphChannelsParams {
limit: None,
after: None,
},
)
.await
.unwrap();

// only public channels
assert_eq!(graph_channels.total_count.value(), 1);
assert_eq!(graph_channels.channels.len(), 1);

// next query is empty
let graph_channels: GraphChannelsResult = node_0
.send_rpc_request(
"graph_channels",
GraphChannelsParams {
limit: Some(1),
after: Some(graph_channels.last_cursor),
},
)
.await
.unwrap();

assert_eq!(graph_channels.total_count.value(), 1);
assert!(graph_channels.channels.is_empty());
}

#[tokio::test]
Expand Down
2 changes: 2 additions & 0 deletions crates/fiber-lib/src/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ Get the list of nodes in the network graph.

* `nodes` - <em>Vec<[NodeInfo](#type-nodeinfo)></em>, The list of nodes.
* `last_cursor` - <em>`JsonBytes`</em>, The last cursor.
* `total_count` - <em>`Uint64`</em>, The total count of nodes

---

Expand All @@ -482,6 +483,7 @@ Get the list of channels in the network graph.

* `channels` - <em>Vec<[ChannelInfo](#type-channelinfo)></em>, A list of channels.
* `last_cursor` - <em>`JsonBytes`</em>, The last cursor for pagination.
* `total_count` - <em>`Uint64`</em>, The total count of channels

---

Expand Down
Loading
Loading