From 57517bcf584682cc59a8ffeeed232039797286fb Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 14:03:23 -0700 Subject: [PATCH 01/12] graph/db: fix zero ChannelID in zombie ChannelEdgeInfo When FetchChannelEdgesByID hits a zombie edge, it constructs the partial ChannelEdgeInfo it returns alongside ErrZombieEdge with a hard-coded ChannelID of zero instead of the actual channel ID that was looked up. Callers such as the gossiper's processZombieUpdate receive this struct and may use the ChannelID field; returning zero is incorrect and could mask bugs in downstream code. Pass the looked-up chanID through to NewV1Channel / NewV2Channel so the zombie ChannelEdgeInfo carries the correct ChannelID. --- graph/db/graph_test.go | 1 + graph/db/kv_store.go | 2 +- graph/db/sql_store.go | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 3d0156ff07..cc6cf22402 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -5510,6 +5510,7 @@ func testFetchZombieEdgeVersioning(t *testing.T, v lnwire.GossipVersion) { require.ErrorIs(t, err, ErrZombieEdge) require.NotNil(t, info) require.Equal(t, v, info.Version) + require.Equal(t, edge.ChannelID, info.ChannelID) require.Equal(t, edge.NodeKey1Bytes, info.NodeKey1Bytes) require.Equal(t, edge.NodeKey2Bytes, info.NodeKey2Bytes) } diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 47c478c360..0f7d9cc4f6 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -4147,7 +4147,7 @@ func (c *KVStore) FetchChannelEdgesByID(_ context.Context, // party as this is the only information we have about // it and return an error signaling so. zombieEdge, err := models.NewV1Channel( - 0, chainhash.Hash{}, pubKey1, pubKey2, + chanID, chainhash.Hash{}, pubKey1, pubKey2, &models.ChannelV1Fields{}, ) if err != nil { diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index e705468a4d..5f75a90275 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2521,12 +2521,12 @@ func (s *SQLStore) FetchChannelEdgesByID(ctx context.Context, switch v { case gossipV1: edge, err = models.NewV1Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV1Fields{}, ) case gossipV2: edge, err = models.NewV2Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV2Fields{}, ) } From c5a59ac8b9f2b6d2968e2767629832db48420d72 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 08:03:55 +0545 Subject: [PATCH 02/12] sqldb: add preferred-node and preferred-channel mapping tables Add two precomputed mapping tables that track the "best" gossip version for each unique node (pub_key) and channel (SCID): - graph_preferred_nodes: pub_key -> node_id - graph_preferred_channels: scid -> channel_id Priority for nodes: v2 announced > v1 announced > v2 shell > v1 shell. Priority for channels: v2 with policies > v1 with policies > v2 > v1. These tables enable simple indexed-join queries for cross-version traversal (ForEachNode, ForEachChannel, ForEachNodeDirectedChannel) without expensive per-row COALESCE subqueries. The tables are populated from existing data during the migration and maintained by upsert/delete queries on every write path (added in the next commit). --- sqldb/migrations.go | 5 + sqldb/sqlc/graph.sql.go | 281 ++++++++++++++++++ .../000016_graph_preferred_lookups.down.sql | 2 + .../000016_graph_preferred_lookups.up.sql | 83 ++++++ sqldb/sqlc/models.go | 10 + sqldb/sqlc/querier.go | 8 + sqldb/sqlc/queries/graph.sql | 105 +++++++ 7 files changed, 494 insertions(+) create mode 100644 sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql create mode 100644 sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql diff --git a/sqldb/migrations.go b/sqldb/migrations.go index 241e5c0d68..afee722029 100644 --- a/sqldb/migrations.go +++ b/sqldb/migrations.go @@ -136,6 +136,11 @@ var ( Version: 18, SchemaVersion: 15, }, + { + Name: "000016_graph_preferred_lookups", + Version: 19, + SchemaVersion: 16, + }, }, migrationAdditions...) // ErrMigrationMismatch is returned when a migrated record does not diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 703afd8f45..61a769c24d 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -4217,6 +4217,236 @@ func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginated return items, nil } +const listPreferredChannelsPaginated = `-- name: ListPreferredChannelsPaginated :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, c.signature, c.funding_pk_script, c.merkle_root_hash, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2 +` + +type ListPreferredChannelsPaginatedParams struct { + Scid []byte + Limit int32 +} + +type ListPreferredChannelsPaginatedRow struct { + GraphChannel GraphChannel + Node1Pubkey []byte + Node2Pubkey []byte + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1MessageFlags sql.NullInt16 + Policy1ChannelFlags sql.NullInt16 + Policy1BlockHeight sql.NullInt64 + Policy1DisableFlags sql.NullInt16 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2MessageFlags sql.NullInt16 + Policy2ChannelFlags sql.NullInt16 + Policy2Signature []byte + Policy2BlockHeight sql.NullInt64 + Policy2DisableFlags sql.NullInt16 +} + +func (q *Queries) ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredChannelsPaginated, arg.Scid, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPreferredChannelsPaginatedRow + for rows.Next() { + var i ListPreferredChannelsPaginatedRow + if err := rows.Scan( + &i.GraphChannel.ID, + &i.GraphChannel.Version, + &i.GraphChannel.Scid, + &i.GraphChannel.NodeID1, + &i.GraphChannel.NodeID2, + &i.GraphChannel.Outpoint, + &i.GraphChannel.Capacity, + &i.GraphChannel.BitcoinKey1, + &i.GraphChannel.BitcoinKey2, + &i.GraphChannel.Node1Signature, + &i.GraphChannel.Node2Signature, + &i.GraphChannel.Bitcoin1Signature, + &i.GraphChannel.Bitcoin2Signature, + &i.GraphChannel.Signature, + &i.GraphChannel.FundingPkScript, + &i.GraphChannel.MerkleRootHash, + &i.Node1Pubkey, + &i.Node2Pubkey, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1MessageFlags, + &i.Policy1ChannelFlags, + &i.Policy1BlockHeight, + &i.Policy1DisableFlags, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2MessageFlags, + &i.Policy2ChannelFlags, + &i.Policy2Signature, + &i.Policy2BlockHeight, + &i.Policy2DisableFlags, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listPreferredNodesPaginated = `-- name: ListPreferredNodesPaginated :many +SELECT n.id, n.version, n.pub_key, n.alias, n.last_update, n.color, n.signature, n.block_height +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2 +` + +type ListPreferredNodesPaginatedParams struct { + PubKey []byte + Limit int32 +} + +type ListPreferredNodesPaginatedRow struct { + GraphNode GraphNode +} + +func (q *Queries) ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredNodesPaginated, arg.PubKey, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPreferredNodesPaginatedRow + for rows.Next() { + var i ListPreferredNodesPaginatedRow + if err := rows.Scan( + &i.GraphNode.ID, + &i.GraphNode.Version, + &i.GraphNode.PubKey, + &i.GraphNode.Alias, + &i.GraphNode.LastUpdate, + &i.GraphNode.Color, + &i.GraphNode.Signature, + &i.GraphNode.BlockHeight, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const nodeExists = `-- name: NodeExists :one SELECT EXISTS ( SELECT 1 @@ -4497,6 +4727,57 @@ func (q *Queries) UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTy return err } +const upsertPreferredChannel = `-- name: UpsertPreferredChannel :exec +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +` + +// Recompute the preferred channel for a given SCID and upsert the result. +// Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +func (q *Queries) UpsertPreferredChannel(ctx context.Context, scid []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredChannel, scid) + return err +} + +const upsertPreferredNode = `-- name: UpsertPreferredNode :exec +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +` + +// Recompute the preferred node for a given pub_key and upsert the result. +// Priority: v2 announced > v1 announced > v2 shell > v1 shell. +func (q *Queries) UpsertPreferredNode(ctx context.Context, pubKey []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredNode, pubKey) + return err +} + const upsertPruneLogEntry = `-- name: UpsertPruneLogEntry :exec /* ───────────────────────────���───────────────── graph_prune_log table queries diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql new file mode 100644 index 0000000000..2cfc0cd5af --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS graph_preferred_channels; +DROP TABLE IF EXISTS graph_preferred_nodes; diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql new file mode 100644 index 0000000000..547e4971e2 --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql @@ -0,0 +1,83 @@ +-- Preferred-node mapping: one row per unique pub_key pointing at the "best" +-- node row across gossip versions. Priority: v2 announced > v1 announced > +-- v2 shell > v1 shell. +CREATE TABLE IF NOT EXISTS graph_preferred_nodes ( + pub_key BLOB PRIMARY KEY, + node_id BIGINT NOT NULL REFERENCES graph_nodes(id) ON DELETE CASCADE +); + +-- Index on node_id so cascade deletes from graph_nodes can locate the +-- referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_nodes_node_id_idx + ON graph_preferred_nodes (node_id); + +-- Preferred-channel mapping: one row per unique SCID pointing at the "best" +-- channel row across gossip versions. Priority: v2 with policies > +-- v1 with policies > v2 bare > v1 bare. +CREATE TABLE IF NOT EXISTS graph_preferred_channels ( + scid BLOB PRIMARY KEY, + channel_id BIGINT NOT NULL REFERENCES graph_channels(id) ON DELETE CASCADE +); + +-- Index on channel_id so cascade deletes from graph_channels can locate +-- the referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_channels_channel_id_idx + ON graph_preferred_channels (channel_id); + +-- Populate graph_preferred_nodes from the graph_nodes rows that already +-- existed before this migration. The inner query ranks every node row within +-- each pub_key group. Announced nodes, identified by a non-empty signature, +-- outrank shell nodes, and higher gossip versions win within the same +-- announced/shell class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred node_id per pub_key. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT sub.pub_key, sub.node_id +FROM ( + SELECT + n.pub_key, + n.id AS node_id, + ROW_NUMBER() OVER ( + PARTITION BY n.pub_key + ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC + ) AS rn + FROM graph_nodes n +) sub +WHERE sub.rn = 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +WHERE graph_preferred_nodes.node_id <> EXCLUDED.node_id; + +-- Populate graph_preferred_channels from the graph_channels rows that already +-- existed before this migration. The inner query ranks every channel row +-- within each SCID group. A channel version with at least one policy row +-- outranks a bare channel version, and higher gossip versions win within the +-- same policy/bare class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred channel_id per SCID. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT sub.scid, sub.channel_id +FROM ( + SELECT + c.scid, + c.id AS channel_id, + ROW_NUMBER() OVER ( + PARTITION BY c.scid + ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id + AND p.version = c.version + ) DESC, + c.version DESC + ) AS rn + FROM graph_channels c +) sub +WHERE sub.rn = 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +WHERE graph_preferred_channels.channel_id <> EXCLUDED.channel_id; diff --git a/sqldb/sqlc/models.go b/sqldb/sqlc/models.go index ef9aa9006f..6df9fae7e3 100644 --- a/sqldb/sqlc/models.go +++ b/sqldb/sqlc/models.go @@ -123,6 +123,16 @@ type GraphNodeFeature struct { FeatureBit int32 } +type GraphPreferredChannel struct { + Scid []byte + ChannelID int64 +} + +type GraphPreferredNode struct { + PubKey []byte + NodeID int64 +} + type GraphPruneLog struct { BlockHeight int64 BlockHash []byte diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 9b95a66991..6b8e0ad218 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -231,6 +231,8 @@ type Querier interface { ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) + ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) + ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) NodeExists(ctx context.Context, arg NodeExistsParams) (bool, error) OnAMPSubInvoiceCanceled(ctx context.Context, arg OnAMPSubInvoiceCanceledParams) error @@ -255,6 +257,12 @@ type Querier interface { UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error) UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTypeParams) error + // Recompute the preferred channel for a given SCID and upsert the result. + // Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. + UpsertPreferredChannel(ctx context.Context, scid []byte) error + // Recompute the preferred node for a given pub_key and upsert the result. + // Priority: v2 announced > v1 announced > v2 shell > v1 shell. + UpsertPreferredNode(ctx context.Context, pubKey []byte) error UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEntryParams) error // We use a separate upsert for our own node since we want to be less strict // about the last_update field. For our own node, we always want to diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index a7683d1fbd..fef054d0c3 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -80,6 +80,14 @@ WHERE version = $1 AND id > $2 ORDER BY id LIMIT $3; +-- name: ListPreferredNodesPaginated :many +SELECT sqlc.embed(n) +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2; + -- name: ListNodeIDsAndPubKeys :many SELECT id, pub_key FROM graph_nodes @@ -978,6 +986,64 @@ WHERE c.version = $1 AND c.id > $2 ORDER BY c.id LIMIT $3; +-- name: ListPreferredChannelsPaginated :many +SELECT + sqlc.embed(c), + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2; + -- name: ListChannelsWithPoliciesForCachePaginated :many SELECT c.id as id, @@ -1435,3 +1501,42 @@ ON CONFLICT (channel_id, node_id, version) channel_flags = EXCLUDED.channel_flags, signature = EXCLUDED.signature RETURNING id; + +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredNode :exec +-- Recompute the preferred node for a given pub_key and upsert the result. +-- Priority: v2 announced > v1 announced > v2 shell > v1 shell. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id; + +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredChannel :exec +-- Recompute the preferred channel for a given SCID and upsert the result. +-- Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id; From bb6427ecae3672310a7d2c566718b8ed7ed72768 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 13:17:13 -0700 Subject: [PATCH 03/12] graph/db: add preferred channel fetch methods Add version-agnostic channel fetch helpers that choose the preferred gossip-version row for a logical channel: the highest version with policy data, falling back to the highest bare version. Implement the SQL path inside a single read transaction instead of calling SQLStore methods recursively, preserve zombie edge info for SCID lookups, and keep KV behavior as v1-only delegation. Callers that need to know whether any version of a channel exists can rely on the ErrEdgeNotFound returned by the Preferred fetch path. Add coverage for preferred selection, missing channels, missing outpoints, and zombie edge info. --- graph/db/graph_test.go | 150 ++++++++++++++++++++++ graph/db/interfaces.go | 19 +++ graph/db/kv_store.go | 27 ++++ graph/db/sql_store.go | 275 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 471 insertions(+) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index cc6cf22402..7e979251cd 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -6776,3 +6776,153 @@ func TestUpdateRangeValidateForVersion(t *testing.T) { }) } } + +// TestPreferredChannelFetch tests the two new Store methods: +// FetchChannelEdgesByIDPreferred and FetchChannelEdgesByOutpointPreferred. +func TestPreferredChannelFetch(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + + // Create and add a v1 channel edge. + edgeInfo, scid := createEdge( + lnwire.GossipVersion1, 100, 1, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) + + chanID := scid.ToUint64() + op := edgeInfo.ChannelPoint + + // FetchChannelEdgesByIDPreferred should return the v1 channel. + info, _, _, err := store.FetchChannelEdgesByIDPreferred( + ctx, chanID, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // FetchChannelEdgesByOutpointPreferred should also return it. + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &op, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // Querying a non-existent channel should return ErrEdgeNotFound for + // both the SCID and outpoint lookups. + _, _, _, err = store.FetchChannelEdgesByIDPreferred(ctx, 999999) + require.ErrorIs(t, err, ErrEdgeNotFound) + + unknownOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 99, + } + _, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &unknownOutpoint, + ) + require.ErrorIs(t, err, ErrEdgeNotFound) + + zombieChanID := uint64(888888) + err = store.MarkEdgeZombie( + ctx, lnwire.GossipVersion1, zombieChanID, + node1V1.PubKeyBytes, node2V1.PubKeyBytes, + ) + require.NoError(t, err) + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, zombieChanID, + ) + require.ErrorIs(t, err, ErrZombieEdge) + require.NotNil(t, info) + require.Equal(t, route.Vertex(node1V1.PubKeyBytes), info.NodeKey1Bytes) + require.Equal(t, route.Vertex(node2V1.PubKeyBytes), info.NodeKey2Bytes) + + if !isSQLDB { + return + } + + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a duplicate v1/v2 channel and verify preferred lookup chooses + // the v2 edge for both the SCID and outpoint lookups. + dupV1, dupSCID := createEdge( + lnwire.GossipVersion1, 101, 1, 0, 2, node1V1, node2V1, + ) + dupV2, _ := createEdge( + lnwire.GossipVersion2, 101, 1, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, dupV1)) + require.NoError(t, graph.AddChannelEdge(ctx, dupV2)) + + dupChanID := dupSCID.ToUint64() + dupOutpoint := dupV1.ChannelPoint + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, dupChanID, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &dupOutpoint, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + + // Add another duplicate v1/v2 channel where only the v1 version has a + // policy. Preferred lookup should return the lower version with usable + // policy data instead of the higher version shell. + policyPrefV1, policyPrefSCID := createEdge( + lnwire.GossipVersion1, 102, 1, 0, 3, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 102, 1, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + policyPrefChanID := policyPrefSCID.ToUint64() + policyPrefOutpoint := policyPrefV1.ChannelPoint + + info, p1, p2, err := store.FetchChannelEdgesByIDPreferred( + ctx, policyPrefChanID, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) + + info, p1, p2, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &policyPrefOutpoint, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) +} diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index c126be014c..cc02ef8322 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -318,6 +318,25 @@ type Store interface { //nolint:interfacebloat *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) + // FetchChannelEdgesByIDPreferred behaves like FetchChannelEdgesByID + // but is version-agnostic: if the channel exists under multiple gossip + // versions it returns the preferred record. Preferred means the highest + // version with policies, falling back to the highest bare version. + FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + + // FetchChannelEdgesByOutpointPreferred behaves like + // FetchChannelEdgesByOutpoint but is version-agnostic: if the channel + // exists under multiple gossip versions it returns the preferred + // record. Preferred means the highest version with policies, falling + // back to the highest bare version. + FetchChannelEdgesByOutpointPreferred(ctx context.Context, + op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + // ChannelView returns the verifiable edge information for each active // channel within the known channel graph for the given gossip version. // The set of UTXO's (along with their scripts) returned are the ones diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 0f7d9cc4f6..e97cb7499b 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -4193,6 +4193,33 @@ func (c *KVStore) FetchChannelEdgesByID(_ context.Context, return edgeInfo, policy1, policy2, nil } +// FetchChannelEdgesByIDPreferred looks up the channel by ID. The KV store +// only supports gossip v1, so this simply delegates to the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByID(ctx, lnwire.GossipVersion1, chanID) +} + +// FetchChannelEdgesByOutpointPreferred looks up the channel by funding +// outpoint. The KV store only supports gossip v1, so this simply delegates to +// the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByOutpoint( + ctx, lnwire.GossipVersion1, op, + ) +} + // IsPublicNode is a helper method that determines whether the node with the // given public key is seen as a public node in the graph from the graph's // source node's point of view. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 5f75a90275..23e70525d6 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2658,6 +2658,281 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(ctx context.Context, return edge, policy1, policy2, nil } +var preferredGossipVersionsDescending = []lnwire.GossipVersion{ + gossipV2, gossipV1, +} + +// FetchChannelEdgesByIDPreferred tries each known gossip version from highest +// to lowest and returns the first result that has at least one policy. If no +// version has policies, the highest version found is returned. This prevents a +// v2 channel with no policies from hiding a v1 channel that has valid policy +// data. +// +// If no live edge is found across versions but at least one version reports +// the channel as a zombie, ErrZombieEdge is returned with the zombie edge info +// populated so callers can resurrect it. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + bestZombie *models.ChannelEdgeInfo + chanIDB = channelIDToBytes(chanID) + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelBySCIDWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.GraphNode.PubKey, row.GraphNode_2.PubKey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + row, err := db.GetChannelBySCIDWithPolicies( + ctx, sqlc.GetChannelBySCIDWithPoliciesParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + zombie, err := db.GetZombieChannel( + ctx, sqlc.GetZombieChannelParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to check if "+ + "channel is zombie: %w", err) + } + + if bestZombie == nil { + var err error + bestZombie, err = buildZombieEdge( + v, chanID, zombie.NodeKey1, + zombie.NodeKey2, + ) + if err != nil { + return err + } + } + + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + if bestZombie != nil { + return ErrZombieEdge + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if errors.Is(err, ErrZombieEdge) { + return bestZombie, nil, nil, ErrZombieEdge + } + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +// FetchChannelEdgesByOutpointPreferred tries each known gossip version from +// highest to lowest and returns the first result that has at least one policy. +// If no version has policies, the highest version found is returned. This +// prevents a v2 channel with no policies from hiding a v1 channel that has +// valid policy data. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelByOutpointWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + params := sqlc.GetChannelByOutpointWithPoliciesParams{ + Outpoint: op.String(), + Version: int16(v), + } + row, err := db.GetChannelByOutpointWithPolicies( + ctx, params, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +func buildZombieEdge(v lnwire.GossipVersion, chanID uint64, nodeKey1, + nodeKey2 []byte) (*models.ChannelEdgeInfo, error) { + + node1, err := route.NewVertexFromBytes(nodeKey1) + if err != nil { + return nil, err + } + node2, err := route.NewVertexFromBytes(nodeKey2) + if err != nil { + return nil, err + } + + switch v { + case gossipV1: + return models.NewV1Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV1Fields{}, + ) + + case gossipV2: + return models.NewV2Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV2Fields{}, + ) + + default: + return nil, fmt.Errorf("unsupported gossip version: %d", v) + } +} + // HasV1ChannelEdge returns true if the database knows of a channel edge // with the passed channel ID, and false otherwise. If an edge with that ID // is found within the graph, then two time stamps representing the last time From 0e815c4ee6994c7ad23e6721a0b8ca5b9588093a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 08:35:10 +0545 Subject: [PATCH 04/12] graph/db: wire preferred-table maintenance into write paths Add UpsertPreferredNode and UpsertPreferredChannel calls to every Store write path so the preferred mapping tables stay consistent: - upsertSourceNode, upsertNode, maybeCreateShellNode, DeleteNode - insertChannel, updateChanEdgePolicy, DeleteChannelEdges - pruneGraphNodes CASCADE deletes on the underlying graph_nodes and graph_channels tables automatically clean up preferred entries when a version is removed, so PruneGraph and DisconnectBlockAtHeight (which delete every version of a SCID) need no explicit upsert call. --- graph/db/sql_store.go | 95 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 4 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 23e70525d6..4d604c9f54 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -55,6 +55,7 @@ type SQLQueries interface { GetNodesByBlockHeightRange(ctx context.Context, arg sqlc.GetNodesByBlockHeightRangeParams) ([]sqlc.GraphNode, error) GetPublicNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetPublicNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.GraphNode, error) + UpsertPreferredNode(ctx context.Context, pubKey []byte) error ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) IsPublicV2Node(ctx context.Context, pubKey []byte) (bool, error) @@ -105,6 +106,7 @@ type SQLQueries interface { ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsForNodeIDs(ctx context.Context, arg sqlc.ListChannelsForNodeIDsParams) ([]sqlc.ListChannelsForNodeIDsRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + UpsertPreferredChannel(ctx context.Context, scid []byte) error ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) ListChannelsPaginatedV2(ctx context.Context, arg sqlc.ListChannelsPaginatedV2Params) ([]sqlc.ListChannelsPaginatedV2Row, error) @@ -439,7 +441,13 @@ func (s *SQLStore) DeleteNode(ctx context.Context, v lnwire.GossipVersion, return fmt.Errorf("deleted %d rows, expected 1", rows) } - return err + // Recompute the preferred mapping. If another version of + // this node still exists, UpsertPreferredNode will point + // the mapping at it. If no version remains, the + // INSERT...SELECT is a no-op and the CASCADE on the FK + // already removed the mapping row when the node was + // deleted above. + return db.UpsertPreferredNode(ctx, pubKey[:]) }, sqldb.NoOpReset) if err != nil { return fmt.Errorf("unable to delete node: %w", err) @@ -2432,7 +2440,25 @@ func (s *SQLStore) DeleteChannelEdges(ctx context.Context, } } - return s.deleteChannels(ctx, db, chanIDsToDelete) + err = s.deleteChannels(ctx, db, chanIDsToDelete) + if err != nil { + return err + } + + // The CASCADE on graph_preferred_channels will have + // removed the mapping row for any deleted channel. If + // another version of the same SCID still exists, we + // need to re-insert the mapping. + for _, chanID := range chanIDs { + scidBytes := channelIDToBytes(chanID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("recalc preferred "+ + "channel(%d): %w", chanID, err) + } + } + + return nil }, func() { edges = nil @@ -3592,6 +3618,10 @@ func (s *SQLStore) PruneGraph(ctx context.Context, return err } + // Delete all matched channels. GetChannelsByOutpoints + // returns every version for a given outpoint, so all + // versions are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chansToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -3899,9 +3929,22 @@ func (s *SQLStore) pruneGraphNodes(ctx context.Context, "nodes: %w", err) } + // Recalc preferred node mappings for all affected pub_keys. The + // CASCADE may have removed some entries; if another version of the + // node still exists, UpsertPreferredNode will re-insert the mapping. + // If no version remains, the upsert is a no-op (the INSERT ... SELECT + // returns no rows). Note that nodeKeys may contain duplicates if a + // node existed in multiple gossip versions and was pruned in all of + // them; UpsertPreferredNode is idempotent, so this is harmless. prunedNodes := make([]route.Vertex, len(nodeKeys)) - for i, nodeKey := range nodeKeys { - pub, err := route.NewVertexFromBytes(nodeKey) + for i, key := range nodeKeys { + err = db.UpsertPreferredNode(ctx, key) + if err != nil { + return nil, fmt.Errorf("recalc preferred "+ + "node: %w", err) + } + + pub, err := route.NewVertexFromBytes(key) if err != nil { return nil, fmt.Errorf("unable to parse pubkey "+ "from bytes: %w", err) @@ -3976,6 +4019,10 @@ func (s *SQLStore) DisconnectBlockAtHeight(ctx context.Context, removedChans = channelEdges + // Delete all matched channels. GetChannelsBySCIDRange + // returns every version for a given SCID, so all versions + // are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chanIDsToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -4558,6 +4605,15 @@ func updateChanEdgePolicy(ctx context.Context, tx SQLQueries, "policy extra TLVs: %w", err) } + // Adding a policy may change which version is preferred for this + // SCID (a version with policies outranks one without). + scidBytes := channelIDToBytes(edge.ChannelID) + err = tx.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("upserting "+ + "preferred channel(%d): %w", edge.ChannelID, err) + } + return node1Pub, node2Pub, isNode1, nil } @@ -4911,6 +4967,13 @@ func upsertSourceNode(ctx context.Context, db SQLQueries, node.PubKeyBytes, err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { return nodeID, nil @@ -4947,6 +5010,15 @@ func upsertNode(ctx context.Context, db SQLQueries, err) } + // Recompute the preferred node mapping for this pub_key. Even a shell + // node may become the preferred entry (e.g. first v2 shell for a key + // that previously only existed as a v1 shell). + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { return nodeID, nil @@ -5429,6 +5501,14 @@ func insertChannel(ctx context.Context, db SQLQueries, } } + // Recompute the preferred channel mapping for this SCID. + scidBytes := channelIDToBytes(edge.ChannelID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("upserting preferred channel(%d): %w", + edge.ChannelID, err) + } + return nil } @@ -5462,6 +5542,13 @@ func maybeCreateShellNode(ctx context.Context, db SQLQueries, return 0, fmt.Errorf("unable to create shell node: %w", err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, pubKey[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + pubKey[:], err) + } + return id, nil } From 89ea0c30dcc6d95554cef29ce16c1130167c0fb9 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:06:17 +0545 Subject: [PATCH 05/12] graph/db: make ForEachNode and ForEachChannel cross-version Drop the GossipVersion parameter from ForEachNode and ForEachChannel. Both methods now iterate across all gossip versions, yielding one result per unique pub_key or SCID using the preferred mapping tables for pagination. Wire ChannelGraph.FetchChannelEdgesByID and FetchChannelEdgesByOutpoint through the Preferred variants so the public ChannelGraph API loses its GossipVersion parameter and becomes version-agnostic on read. --- graph/db/benchmark_test.go | 17 +-- graph/db/graph.go | 35 +++-- graph/db/graph_test.go | 286 ++++++++++++++++++++++++++++++++++- graph/db/interfaces.go | 33 ++-- graph/db/kv_store.go | 15 +- graph/db/sql_store.go | 159 ++++++++++++------- sqldb/sqlc/graph.sql.go | 46 ------ sqldb/sqlc/querier.go | 1 - sqldb/sqlc/queries/graph.sql | 7 - 9 files changed, 430 insertions(+), 169 deletions(-) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index ff245d08b2..b8311beb75 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -371,7 +371,7 @@ func TestPopulateDBs(t *testing.T) { numPolicies = 0 ) err := graph.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(info *models.ChannelEdgeInfo, policy, policy2 *models.ChannelEdgePolicy) error { @@ -499,7 +499,7 @@ func syncGraph(t *testing.T, src, dest *ChannelGraph) { } var wgChans sync.WaitGroup - err = src.ForEachChannel(ctx, lnwire.GossipVersion1, + err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy) error { @@ -623,7 +623,7 @@ func BenchmarkGraphReadMethods(b *testing.B) { name: "ForEachNode", fn: func(b testing.TB, store Store) { err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { // Increment the counter to // ensure the callback is doing @@ -639,12 +639,11 @@ func BenchmarkGraphReadMethods(b *testing.B) { { name: "ForEachChannel", fn: func(b testing.TB, store Store) { - //nolint:ll - err := store.ForEachChannel( - ctx, lnwire.GossipVersion1, + err := store.ForEachChannel(ctx, func(_ *models.ChannelEdgeInfo, + _, _ *models.ChannelEdgePolicy, - _ *models.ChannelEdgePolicy) error { + ) error { // Increment the counter to // ensure the callback is doing @@ -994,7 +993,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { ) err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { numNodes++ @@ -1005,7 +1004,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { //nolint:ll err = store.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.ChannelEdgeInfo, _, _ *models.ChannelEdgePolicy) error { diff --git a/graph/db/graph.go b/graph/db/graph.go index a63b9d0e66..caaafb966d 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -679,13 +679,14 @@ func (c *ChannelGraph) HasV1Node(ctx context.Context, return c.db.HasV1Node(ctx, nodePub) } -// ForEachChannel iterates through all channel edges stored within the graph. +// ForEachChannel iterates through all channel edges stored within the graph +// across all gossip versions. func (c *ChannelGraph) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - return c.db.ForEachChannel(ctx, v, cb, reset) + return c.db.ForEachChannel(ctx, cb, reset) } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -817,26 +818,23 @@ func (c *ChannelGraph) FetchChanInfos(ctx context.Context, } // FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding -// outpoint. +// outpoint, returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByOutpoint(ctx context.Context, op *wire.OutPoint) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByOutpoint( - ctx, lnwire.GossipVersion1, op, - ) + return c.db.FetchChannelEdgesByOutpointPreferred(ctx, op) } -// FetchChannelEdgesByID attempts to lookup directed edges by channel ID. +// FetchChannelEdgesByID attempts to lookup directed edges by channel ID, +// returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByID(ctx context.Context, chanID uint64) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByID( - ctx, lnwire.GossipVersion1, chanID, - ) + return c.db.FetchChannelEdgesByIDPreferred(ctx, chanID) } // PutClosedScid stores a SCID for a closed channel in the database. @@ -926,11 +924,14 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context, return c.ChannelGraph.ForEachNodeCached(ctx, c.v, cb, reset) } -// ForEachNode iterates through all stored vertices/nodes in the graph. +// ForEachNode iterates through all stored vertices/nodes in the graph across +// all gossip versions, returning the preferred version for each pub_key. Note +// that this intentionally ignores c.v — cross-version iteration is the desired +// behaviour for callers that enumerate graph topology. func (c *VersionedGraph) ForEachNode(ctx context.Context, cb func(*models.Node) error, reset func()) error { - return c.db.ForEachNode(ctx, c.v, cb, reset) + return c.db.ForEachNode(ctx, cb, reset) } // NumZombies returns the current number of zombie channels in the graph. @@ -1106,12 +1107,14 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context, return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset) } -// ForEachChannel iterates through all channel edges stored within the graph. +// ForEachChannel iterates through all channel edges stored within the graph +// across all gossip versions, returning the preferred version for each SCID. +// See ForEachNode for the rationale on ignoring c.v. func (c *VersionedGraph) ForEachChannel(ctx context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, reset func()) error { - return c.db.ForEachChannel(ctx, c.v, cb, reset) + return c.db.ForEachChannel(ctx, cb, reset) } // ForEachNodeCacheable iterates through all stored vertices/nodes in the graph. diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 7e979251cd..66fe10d99e 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1806,7 +1806,7 @@ func TestGraphTraversal(t *testing.T) { // Iterate through all the known channels within the graph DB, once // again if the map is empty that indicates that all edges have // properly been reached. - err = graph.ForEachChannel(ctx, lnwire.GossipVersion1, + err = graph.ForEachChannel(ctx, func(ei *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { @@ -2161,7 +2161,7 @@ func assertPruneTip(t *testing.T, graph *ChannelGraph, func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { numChans := 0 err := graph.ForEachChannel( - t.Context(), lnwire.GossipVersion1, + t.Context(), func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error { @@ -6926,3 +6926,285 @@ func TestPreferredChannelFetch(t *testing.T) { require.NotNil(t, p1) require.Nil(t, p2) } + +// TestDeleteNodePreferredRecomputation verifies that deleting one gossip +// version of a dual-version node correctly recomputes the preferred-node +// mapping so the surviving version remains visible via ForEachNode. +func TestDeleteNodePreferredRecomputation(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + // Create a node with both v1 and v2 announcements. + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV1 := createNode(t, lnwire.GossipVersion1, priv) + nodeV2 := createNode(t, lnwire.GossipVersion2, priv) + + require.NoError(t, graph.AddNode(ctx, nodeV1)) + require.NoError(t, graph.AddNode(ctx, nodeV2)) + + // ForEachNode should return the node (v2 preferred). + var count int + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion2, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, "node should be visible before delete") + + // Delete the v2 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion2, nodeV1.PubKeyBytes, + )) + + // The node should still be visible via ForEachNode, now as v1. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion1, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, + "node should still be visible after deleting one version") + + // Delete the remaining v1 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion1, nodeV1.PubKeyBytes, + )) + + // The node should now be gone. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 0, count, + "node should be gone after deleting all versions") +} + +// TestPreferredForEachNode verifies that SQLStore.ForEachNode returns one +// node per pubkey, preferring the highest announced version and otherwise +// falling back to the highest-version shell node. +func TestPreferredForEachNode(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + v1Only := createTestVertex(t, lnwire.GossipVersion1) + v1Only.Alias = fn.Some("v1-only") + require.NoError(t, graph.AddNode(ctx, v1Only)) + + bothPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + bothV1 := createNode(t, lnwire.GossipVersion1, bothPriv) + bothV1.Alias = fn.Some("both-v1") + require.NoError(t, graph.AddNode(ctx, bothV1)) + + bothV2 := createNode(t, lnwire.GossipVersion2, bothPriv) + bothV2.Alias = fn.Some("both-v2") + require.NoError(t, graph.AddNode(ctx, bothV2)) + + shellPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + shellPub, err := route.NewVertexFromBytes( + shellPriv.PubKey().SerializeCompressed(), + ) + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion1, shellPub), + )) + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion2, shellPub), + )) + + var nodeCount int + nodesByPub := make(map[route.Vertex]*models.Node) + err = store.ForEachNode(ctx, func(node *models.Node) error { + nodesByPub[node.PubKeyBytes] = node + nodeCount++ + + return nil + }, func() { + clear(nodesByPub) + nodeCount = 0 + }) + require.NoError(t, err) + require.Len(t, nodesByPub, 3) + require.Equal(t, 3, nodeCount, "unexpected duplicate nodes") + + gotV1Only := nodesByPub[v1Only.PubKeyBytes] + require.NotNil(t, gotV1Only) + require.Equal(t, lnwire.GossipVersion1, gotV1Only.Version) + require.Equal(t, "v1-only", gotV1Only.Alias.UnwrapOr("")) + require.True(t, gotV1Only.HaveAnnouncement()) + + gotBoth := nodesByPub[bothV1.PubKeyBytes] + require.NotNil(t, gotBoth) + require.Equal(t, lnwire.GossipVersion2, gotBoth.Version) + require.Equal(t, "both-v2", gotBoth.Alias.UnwrapOr("")) + require.True(t, gotBoth.HaveAnnouncement()) + + gotShell := nodesByPub[shellPub] + require.NotNil(t, gotShell) + require.Equal(t, lnwire.GossipVersion2, gotShell.Version) + require.False(t, gotShell.HaveAnnouncement()) +} + +// TestPreferredForEachChannel verifies that SQLStore.ForEachChannel returns +// one channel per SCID, preferring a higher-version channel when both versions +// have policies, preserving lower-version policy data when the higher version +// has none, and otherwise falling back to the highest-version no-policy +// channel. +func TestPreferredForEachChannel(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + v1Only, _ := createEdge( + lnwire.GossipVersion1, 200, 0, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, v1Only)) + + policyPrefV1, _ := createEdge( + lnwire.GossipVersion1, 201, 0, 0, 2, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 201, 0, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + versionPrefV1, _ := createEdge( + lnwire.GossipVersion1, 202, 0, 0, 3, node1V1, node2V1, + ) + versionPrefV2, _ := createEdge( + lnwire.GossipVersion2, 202, 0, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV2)) + + versionPolicyV1 := newEdgePolicy( + lnwire.GossipVersion1, versionPrefV1.ChannelID, 1001, true, + ) + versionPolicyV1.ToNode = node2V1.PubKeyBytes + versionPolicyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV1)) + + versionPolicyV2 := newEdgePolicy( + lnwire.GossipVersion2, versionPrefV2.ChannelID, 1002, true, + ) + versionPolicyV2.ToNode = node2V2.PubKeyBytes + versionPolicyV2.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV2)) + + shellPrefV1, _ := createEdge( + lnwire.GossipVersion1, 203, 0, 0, 4, node1V1, node2V1, + ) + shellPrefV2, _ := createEdge( + lnwire.GossipVersion2, 203, 0, 0, 4, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV2)) + + type channelResult struct { + info *models.ChannelEdgeInfo + p1 *models.ChannelEdgePolicy + p2 *models.ChannelEdgePolicy + } + var chanCount int + channelsByID := make(map[uint64]channelResult) + err = store.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, + p1, p2 *models.ChannelEdgePolicy) error { + + channelsByID[info.ChannelID] = channelResult{ + info: info, + p1: p1, + p2: p2, + } + chanCount++ + + return nil + }, func() { + clear(channelsByID) + chanCount = 0 + }) + require.NoError(t, err) + require.Len(t, channelsByID, 4) + require.Equal(t, 4, chanCount, "unexpected duplicate channels") + + gotV1Only := channelsByID[v1Only.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotV1Only.info.Version) + require.Nil(t, gotV1Only.p1) + require.Nil(t, gotV1Only.p2) + + gotPolicyPref := channelsByID[policyPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotPolicyPref.info.Version) + require.NotNil(t, gotPolicyPref.p1) + require.Nil(t, gotPolicyPref.p2) + + gotVersionPref := channelsByID[versionPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotVersionPref.info.Version) + require.NotNil(t, gotVersionPref.p1) + + gotShellPref := channelsByID[shellPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotShellPref.info.Version) + require.Nil(t, gotShellPref.p1) + require.Nil(t, gotShellPref.p2) +} diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index cc02ef8322..fe3785829b 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -89,11 +89,11 @@ type Store interface { //nolint:interfacebloat chans map[uint64]*DirectedChannel) error, reset func()) error - // ForEachNode iterates through all the stored vertices/nodes in the - // graph, executing the passed callback with each node encountered. If - // the callback returns an error, then the transaction is aborted and - // the iteration stops early. - ForEachNode(ctx context.Context, v lnwire.GossipVersion, + // ForEachNode iterates through all nodes in the graph across all + // gossip versions, yielding each unique node exactly once. The + // callback receives the best available Node (highest advertised + // version preferred, falling back to shell nodes). + ForEachNode(ctx context.Context, cb func(*models.Node) error, reset func()) error // ForEachNodeCacheable iterates through all the stored vertices/nodes @@ -156,21 +156,16 @@ type Store interface { //nolint:interfacebloat GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error - // ForEachChannel iterates through all the channel edges stored within - // the graph and invokes the passed callback for each edge. The callback - // takes two edges as since this is a directed graph, both the in/out - // edges are visited. If the callback returns an error, then the - // transaction is aborted and the iteration stops early. - // - // NOTE: If an edge can't be found, or wasn't advertised, then a nil - // pointer for that particular channel edge routing policy will be - // passed into the callback. - // - // TODO(elle): add a cross-version iteration API and make this iterate - // over all versions. - ForEachChannel(ctx context.Context, v lnwire.GossipVersion, + // ForEachChannel iterates through all channel edges stored within the + // graph across all gossip versions, yielding each unique channel + // exactly once. The callback receives the edge info and both + // directional policies. When both versions are present, v2 is + // preferred. Nil pointers are passed for policies that haven't been + // advertised. + ForEachChannel(ctx context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error + *models.ChannelEdgePolicy) error, + reset func()) error // ForEachChannelCacheable iterates through all the channel edges stored // within the graph and invokes the passed callback for each edge. The diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index e97cb7499b..7cd99fd1e3 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -408,13 +408,10 @@ func (c *KVStore) AddrsForNode(ctx context.Context, v lnwire.GossipVersion, // NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer // for that particular channel edge routing policy will be passed into the // callback. -func (c *KVStore) ForEachChannel(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachChannel(_ context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } + *models.ChannelEdgePolicy) error, + reset func()) error { return forEachChannel(c.db, cb, reset) } @@ -837,13 +834,9 @@ func (c *KVStore) DisabledChannelIDs( // early. // // NOTE: this is part of the Store interface. -func (c *KVStore) ForEachNode(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachNode(_ context.Context, cb func(*models.Node) error, reset func()) error { - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } - return forEachNode(c.db, func(tx kvdb.RTx, node *models.Node) error { diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 4d604c9f54..82ab5eded4 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -54,7 +54,7 @@ type SQLQueries interface { GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) GetNodesByBlockHeightRange(ctx context.Context, arg sqlc.GetNodesByBlockHeightRangeParams) ([]sqlc.GraphNode, error) GetPublicNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetPublicNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) - ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.GraphNode, error) + ListPreferredNodesPaginated(ctx context.Context, arg sqlc.ListPreferredNodesPaginatedParams) ([]sqlc.ListPreferredNodesPaginatedRow, error) UpsertPreferredNode(ctx context.Context, pubKey []byte) error ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) @@ -106,6 +106,7 @@ type SQLQueries interface { ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsForNodeIDs(ctx context.Context, arg sqlc.ListChannelsForNodeIDsParams) ([]sqlc.ListChannelsForNodeIDsRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + ListPreferredChannelsPaginated(ctx context.Context, arg sqlc.ListPreferredChannelsPaginatedParams) ([]sqlc.ListPreferredChannelsPaginatedRow, error) UpsertPreferredChannel(ctx context.Context, scid []byte) error ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) @@ -1156,17 +1157,12 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context, // early. // // NOTE: part of the Store interface. -func (s *SQLStore) ForEachNode(ctx context.Context, v lnwire.GossipVersion, +func (s *SQLStore) ForEachNode(ctx context.Context, cb func(node *models.Node) error, reset func()) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachNodePaginated( - ctx, s.cfg.QueryCfg, db, - v, func(_ context.Context, _ int64, - node *models.Node) error { - - return cb(node) - }, + return forEachPreferredNodePaginated( + ctx, s.cfg.QueryCfg, db, cb, ) }, reset) } @@ -2008,16 +2004,14 @@ func (s *SQLStore) ForEachChannelCacheable(ctx context.Context, // // NOTE: part of the Store interface. func (s *SQLStore) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - if !isKnownGossipVersion(v) { - return fmt.Errorf("unsupported gossip version: %d", v) - } - return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachChannelWithPolicies(ctx, db, s.cfg, v, cb) + return forEachPreferredChannelWithPolicies( + ctx, db, s.cfg, cb, + ) }, reset) } @@ -4214,7 +4208,6 @@ func (s *sqlNodeTraverser) ForEachNodeDirectedChannel( ctx, s.db, lnwire.GossipVersion1, nodePub, cb, ) } - // FetchNodeFeatures returns the features of the given node. If the node is // unknown, assume no additional features are supported. // @@ -6320,6 +6313,54 @@ func extractChannelPolicies(row any) (*sqlc.GraphChannelPolicy, return policy1, policy2, nil + case sqlc.ListPreferredChannelsPaginatedRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.GraphChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + MessageFlags: r.Policy1MessageFlags, + ChannelFlags: r.Policy1ChannelFlags, + Signature: r.Policy1Signature, + BlockHeight: r.Policy1BlockHeight, + DisableFlags: r.Policy1DisableFlags, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.GraphChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + MessageFlags: r.Policy2MessageFlags, + ChannelFlags: r.Policy2ChannelFlags, + Signature: r.Policy2Signature, + BlockHeight: r.Policy2BlockHeight, + DisableFlags: r.Policy2DisableFlags, + } + } + + return policy1, policy2, nil + case sqlc.ListChannelsWithPoliciesPaginatedRow: if r.Policy1ID.Valid { policy1 = &sqlc.GraphChannelPolicy{ @@ -6889,32 +6930,32 @@ func batchLoadChannelPolicyExtrasHelper(ctx context.Context, ) } -// forEachNodePaginated executes a paginated query to process each node in the -// graph. It uses the provided SQLQueries interface to fetch nodes in batches -// and applies the provided processNode function to each node. -func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, - db SQLQueries, protocol lnwire.GossipVersion, - processNode func(context.Context, int64, - *models.Node) error) error { +// forEachPreferredNodePaginated executes a paginated query that yields one +// preferred node per pubkey across all gossip versions. +func forEachPreferredNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, + db SQLQueries, processNode func(*models.Node) error) error { - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.GraphNode, error) { + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredNodesPaginatedRow, error) { - return db.ListNodesPaginated( - ctx, sqlc.ListNodesPaginatedParams{ - Version: int16(protocol), - ID: lastID, - Limit: limit, + return db.ListPreferredNodesPaginated( + ctx, sqlc.ListPreferredNodesPaginatedParams{ + PubKey: cursor, + Limit: limit, }, ) } - extractPageCursor := func(node sqlc.GraphNode) int64 { - return node.ID + extractPageCursor := func( + row sqlc.ListPreferredNodesPaginatedRow) []byte { + + return row.GraphNode.PubKey } - collectFunc := func(node sqlc.GraphNode) (int64, error) { - return node.ID, nil + collectFunc := func( + row sqlc.ListPreferredNodesPaginatedRow) (int64, error) { + + return row.GraphNode.ID, nil } batchQueryFunc := func(ctx context.Context, @@ -6923,29 +6964,32 @@ func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, return batchLoadNodeData(ctx, cfg, db, nodeIDs) } - processItem := func(ctx context.Context, dbNode sqlc.GraphNode, + processItem := func(_ context.Context, + row sqlc.ListPreferredNodesPaginatedRow, batchData *batchNodeData) error { + dbNode := row.GraphNode node, err := buildNodeWithBatchData(dbNode, batchData) if err != nil { - return fmt.Errorf("unable to build "+ - "node(id=%d): %w", dbNode.ID, err) + return fmt.Errorf("unable to build node(id=%d): %w", + dbNode.ID, err) } - return processNode(ctx, dbNode.ID, node) + return processNode(node) } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchQueryFunc, processItem, ) } -// forEachChannelWithPolicies executes a paginated query to process each channel -// with policies in the graph. -func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, - cfg *SQLStoreConfig, v lnwire.GossipVersion, - processChannel func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, +// forEachPreferredChannelWithPolicies executes a paginated query that yields +// one preferred channel per SCID across all gossip versions. +func forEachPreferredChannelWithPolicies(ctx context.Context, + db SQLQueries, cfg *SQLStoreConfig, + processChannel func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { type channelBatchIDs struct { @@ -6953,33 +6997,32 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, policyIDs []int64 } - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredChannelsPaginatedRow, error) { - return db.ListChannelsWithPoliciesPaginated( - ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ - Version: int16(v), - ID: lastID, - Limit: limit, + return db.ListPreferredChannelsPaginated( + ctx, sqlc.ListPreferredChannelsPaginatedParams{ + Scid: cursor, + Limit: limit, }, ) } extractPageCursor := func( - row sqlc.ListChannelsWithPoliciesPaginatedRow) int64 { + row sqlc.ListPreferredChannelsPaginatedRow) []byte { - return row.GraphChannel.ID + return row.GraphChannel.Scid } - collectFunc := func(row sqlc.ListChannelsWithPoliciesPaginatedRow) ( + collectFunc := func( + row sqlc.ListPreferredChannelsPaginatedRow) ( channelBatchIDs, error) { ids := channelBatchIDs{ channelID: row.GraphChannel.ID, } - // Extract policy IDs from the row. dbPol1, dbPol2, err := extractChannelPolicies(row) if err != nil { return ids, err @@ -7015,7 +7058,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } processItem := func(ctx context.Context, - row sqlc.ListChannelsWithPoliciesPaginatedRow, + row sqlc.ListPreferredChannelsPaginatedRow, batchData *batchChannelData) error { node1, node2, err := buildNodeVertices( @@ -7050,7 +7093,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg.QueryCfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg.QueryCfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchDataFunc, processItem, ) } diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 61a769c24d..48d0f3284e 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -4171,52 +4171,6 @@ func (q *Queries) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndP return items, nil } -const listNodesPaginated = `-- name: ListNodesPaginated :many -SELECT id, version, pub_key, alias, last_update, color, signature, block_height -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3 -` - -type ListNodesPaginatedParams struct { - Version int16 - ID int64 - Limit int32 -} - -func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, listNodesPaginated, arg.Version, arg.ID, arg.Limit) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GraphNode - for rows.Next() { - var i GraphNode - if err := rows.Scan( - &i.ID, - &i.Version, - &i.PubKey, - &i.Alias, - &i.LastUpdate, - &i.Color, - &i.Signature, - &i.BlockHeight, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const listPreferredChannelsPaginated = `-- name: ListPreferredChannelsPaginated :many SELECT c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, c.signature, c.funding_pk_script, c.merkle_root_hash, diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 6b8e0ad218..a49cb6d6b1 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -230,7 +230,6 @@ type Querier interface { ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg ListChannelsWithPoliciesForCachePaginatedParams) ([]ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) - ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index fef054d0c3..9d55e2fecd 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -73,13 +73,6 @@ FROM graph_nodes WHERE pub_key = $1 AND version = $2; --- name: ListNodesPaginated :many -SELECT * -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3; - -- name: ListPreferredNodesPaginated :many SELECT sqlc.embed(n) FROM graph_preferred_nodes pn From a07d3fe38f8403979bf0fa6181fcc3a83e38d0ad Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:09:27 +0545 Subject: [PATCH 06/12] graph/db: implement cross-version node traversal Make FetchNodeFeatures version-agnostic by trying v2 first and falling back to v1 when v2 features are empty. The no-cache fallback for ForEachNodeDirectedChannel stays version-scoped (v1 only) because the SQL backend always runs with the in-memory cache enabled, and the cache already merges v1+v2 with v2 precedence. populateCache now skips empty v2 feature entries so they don't shadow a non-empty v1 feature set when the cache is rebuilt, matching the FetchNodeFeatures precedence rule. The cache iterates v1 then v2 so v2 data overwrites v1 on key collision. --- graph/db/graph.go | 41 +++++++++++++---- graph/db/graph_test.go | 102 +++++++++++++++++++++++++++++++++++++++++ graph/db/sql_store.go | 1 + 3 files changed, 134 insertions(+), 10 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index caaafb966d..ccb8e09133 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -238,12 +238,21 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error { for _, v := range []lnwire.GossipVersion{ gossipV1, gossipV2, } { - // TODO(elle): If we have both v1 and v2 entries for the same - // node/channel, prefer v2 when merging. + // We iterate v1 first, then v2. AddNodeFeatures and AddChannel + // overwrite on key collision, so v2 data takes precedence when + // both versions exist. For features specifically we + // additionally skip empty v2 entries so they don't shadow a + // non-empty v1 feature set; this matches the no-cache + // FetchNodeFeatures fallback rule that a non-empty + // lower-version vector wins over an empty higher-version one. err := c.db.ForEachNodeCacheable(ctx, v, func(node route.Vertex, features *lnwire.FeatureVector) error { + if v == gossipV2 && features.IsEmpty() { + return nil + } + cache.AddNodeFeatures(node, features) return nil @@ -299,9 +308,9 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context, return c.cache.graphCache.ForEachChannel(node, cb) } - // TODO(elle): once the no-cache path needs to support - // pathfinding across gossip versions, this should iterate - // across all versions rather than defaulting to v1. + // Without the in-memory cache, traversal stays version-scoped. Both + // SQL and KV stores support v1 adjacency reads here; cross-version + // merging is reserved for the cache-backed path above. return c.db.ForEachNodeDirectedChannel( ctx, gossipV1, node, cb, reset, ) @@ -320,7 +329,22 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, return c.cache.graphCache.GetFeatures(node), nil } - return c.db.FetchNodeFeatures(ctx, lnwire.GossipVersion1, node) + // Try v2 first, fall back to v1 if the v2 features are empty. + for _, v := range []lnwire.GossipVersion{gossipV2, gossipV1} { + features, err := c.db.FetchNodeFeatures(ctx, v, node) + if errors.Is(err, ErrVersionNotSupportedForKVDB) { + continue + } + if err != nil { + return nil, err + } + + if !features.IsEmpty() { + return features, nil + } + } + + return lnwire.EmptyFeatureVector(), nil } // GraphSession will provide the call-back with access to a NodeTraverser @@ -969,7 +993,7 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, // for performing queries against the channel graph. If the graph cache is // enabled, the callback receives the VersionedGraph directly (which implements // NodeTraverser using the cache). Otherwise a read-only database session is -// used. +// used, and that session remains v1-only for now. func (c *VersionedGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { @@ -977,9 +1001,6 @@ func (c *VersionedGraph) GraphSession(ctx context.Context, return cb(c) } - // TODO(elle): the underlying GraphSession currently creates a - // NodeTraverser that is hardcoded to GossipVersion1. This needs to be - // updated to pass the version through for v2 support. return c.db.GraphSession(ctx, cb, reset) } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 66fe10d99e..04b15d286e 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7002,6 +7002,108 @@ func TestDeleteNodePreferredRecomputation(t *testing.T) { "node should be gone after deleting all versions") } +// TestPreferredNodeTraversal verifies that ChannelGraph's +// ForEachNodeDirectedChannel and FetchNodeFeatures correctly prefer v2 over v1 +// when the graph cache is disabled (exercising the no-cache code paths). +func TestPreferredNodeTraversal(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + + // Disable the cache so we exercise the no-cache code paths in + // ChannelGraph.ForEachNodeDirectedChannel and FetchNodeFeatures. + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + // --- FetchNodeFeatures --- + + // Create a v1-only node and verify its features are returned. + privV1, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV1 := createNode(t, lnwire.GossipVersion1, privV1) + require.NoError(t, graph.AddNode(ctx, nodeV1)) + + features, err := graph.FetchNodeFeatures(ctx, nodeV1.PubKeyBytes) + require.NoError(t, err) + require.False(t, features.IsEmpty(), + "v1-only node should have features") + + // Create a v2-only node and verify its features are returned + // (exercises the v2 fallback). + privV2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV2 := createNode(t, lnwire.GossipVersion2, privV2) + require.NoError(t, graph.AddNode(ctx, nodeV2)) + + features, err = graph.FetchNodeFeatures(ctx, nodeV2.PubKeyBytes) + require.NoError(t, err) + require.False(t, features.IsEmpty(), + "v2-only node should have features") + + // Create a node with both v1 and v2 announcements. + privBoth, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeBothV1 := createNode(t, lnwire.GossipVersion1, privBoth) + v1Features := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(lnwire.GossipQueriesRequired), + lnwire.Features, + ) + nodeBothV1.Features = v1Features + require.NoError(t, graph.AddNode(ctx, nodeBothV1)) + + nodeBothV2 := createNode(t, lnwire.GossipVersion2, privBoth) + v2Features := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(lnwire.TLVOnionPayloadRequired), + lnwire.Features, + ) + nodeBothV2.Features = v2Features + require.NoError(t, graph.AddNode(ctx, nodeBothV2)) + + features, err = graph.FetchNodeFeatures( + ctx, nodeBothV1.PubKeyBytes, + ) + require.NoError(t, err) + require.Equal(t, v2Features, features) + require.NotEqual(t, v1Features, features) + + // --- ForEachNodeDirectedChannel --- + + // Add a v1 channel between nodeV1 and nodeBothV1. + edge, _ := createEdge( + lnwire.GossipVersion1, 100, 0, 0, 0, + nodeV1, nodeBothV1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edge)) + + pol := newEdgePolicy( + lnwire.GossipVersion1, edge.ChannelID, 1000, true, + ) + pol.ToNode = nodeBothV1.PubKeyBytes + pol.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, pol)) + + // ForEachNodeDirectedChannel should find the channel. + var foundChannels int + err = graph.ForEachNodeDirectedChannel( + ctx, nodeV1.PubKeyBytes, + func(_ *DirectedChannel) error { + foundChannels++ + return nil + }, func() { + foundChannels = 0 + }, + ) + require.NoError(t, err) + require.Equal(t, 1, foundChannels, + "expected 1 channel for v1 node") +} + // TestPreferredForEachNode verifies that SQLStore.ForEachNode returns one // node per pubkey, preferring the highest announced version and otherwise // falling back to the highest-version shell node. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 82ab5eded4..9f56b44e34 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -4208,6 +4208,7 @@ func (s *sqlNodeTraverser) ForEachNodeDirectedChannel( ctx, s.db, lnwire.GossipVersion1, nodePub, cb, ) } + // FetchNodeFeatures returns the features of the given node. If the node is // unknown, assume no additional features are supported. // From 208daca24ec270587e07d0e76c99be0e97271afa Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 14:43:57 -0700 Subject: [PATCH 07/12] graph/db: move ForEachNode/ForEachChannel off VersionedGraph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VersionedGraph wraps ChannelGraph for callers that want to operate against a specific gossip version. After the cross-version refactor, the ForEachNode and ForEachChannel methods on VersionedGraph silently ignored c.v and iterated across all versions — a surprising behaviour for a wrapper whose whole purpose is version-scoping. Move the cross-version iteration to ChannelGraph (where it belongs) and drop the foot-gun overrides from VersionedGraph. *VersionedGraph continues to expose these via the embedded *ChannelGraph, but now they are explicitly methods of the version-agnostic type. Add ChannelGraph.ForEachNode mirroring ChannelGraph.ForEachChannel, and update the only non-embedded callsite (rpcserver describeGraph) to take ChannelGraph directly. The two remaining test helpers also switch to *ChannelGraph since they only ever wanted a node count. --- graph/db/benchmark_test.go | 3 +-- graph/db/graph.go | 28 ++++++++-------------------- graph/db/graph_test.go | 3 +-- rpcserver.go | 9 ++++----- 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index b8311beb75..06cd62cb2e 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -347,8 +347,7 @@ func TestPopulateDBs(t *testing.T) { // graph. countNodes := func(graph *ChannelGraph) int { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode( + err := graph.ForEachNode( ctx, func(node *models.Node) error { numNodes++ diff --git a/graph/db/graph.go b/graph/db/graph.go index ccb8e09133..5dd0b8767b 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -713,6 +713,14 @@ func (c *ChannelGraph) ForEachChannel(ctx context.Context, return c.db.ForEachChannel(ctx, cb, reset) } +// ForEachNode iterates through all stored vertices/nodes in the graph across +// all gossip versions. +func (c *ChannelGraph) ForEachNode(ctx context.Context, + cb func(*models.Node) error, reset func()) error { + + return c.db.ForEachNode(ctx, cb, reset) +} + // DisabledChannelIDs returns the channel ids of disabled channels. func (c *ChannelGraph) DisabledChannelIDs(ctx context.Context, v lnwire.GossipVersion) ( @@ -948,16 +956,6 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context, return c.ChannelGraph.ForEachNodeCached(ctx, c.v, cb, reset) } -// ForEachNode iterates through all stored vertices/nodes in the graph across -// all gossip versions, returning the preferred version for each pub_key. Note -// that this intentionally ignores c.v — cross-version iteration is the desired -// behaviour for callers that enumerate graph topology. -func (c *VersionedGraph) ForEachNode(ctx context.Context, - cb func(*models.Node) error, reset func()) error { - - return c.db.ForEachNode(ctx, cb, reset) -} - // NumZombies returns the current number of zombie channels in the graph. func (c *VersionedGraph) NumZombies(ctx context.Context) (uint64, error) { return c.db.NumZombies(ctx, c.v) @@ -1128,16 +1126,6 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context, return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset) } -// ForEachChannel iterates through all channel edges stored within the graph -// across all gossip versions, returning the preferred version for each SCID. -// See ForEachNode for the rationale on ignoring c.v. -func (c *VersionedGraph) ForEachChannel(ctx context.Context, - cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - return c.db.ForEachChannel(ctx, cb, reset) -} - // ForEachNodeCacheable iterates through all stored vertices/nodes in the graph. func (c *VersionedGraph) ForEachNodeCacheable(ctx context.Context, cb func(route.Vertex, *lnwire.FeatureVector) error, diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 04b15d286e..0f923862d4 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2178,8 +2178,7 @@ func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { func assertNumNodes(t *testing.T, graph *ChannelGraph, n int) { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode(t.Context(), func(_ *models.Node) error { + err := graph.ForEachNode(t.Context(), func(_ *models.Node) error { numNodes++ return nil diff --git a/rpcserver.go b/rpcserver.go index 491bd8a142..e980db1324 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7000,11 +7000,10 @@ func (r *rpcServer) DescribeGraph(ctx context.Context, } } - // Obtain the pointer to the V1 channel graph. This will provide a - // consistent view of the graph due to bolt db's transactional model. - // - // TODO(elle): switch to a cross-version graph view when available. - graph := r.server.v1Graph + // Obtain the pointer to the cross-version channel graph. This will + // provide a consistent view of the graph due to bolt db's + // transactional model. + graph := r.server.graphDB // First iterate through all the known nodes (connected or unconnected // within the graph), collating their current state into the RPC From 90ee209a5f9b0b88ffac2faaf1c2a9b9ecfba6a4 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 15:04:12 -0700 Subject: [PATCH 08/12] graph/db: thread version through ForEachNodeCached channel lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SQLStore.ForEachNodeCached takes a gossip version and uses it correctly when paginating through nodes, but the inner ListChannelsForNodeIDs call hardcoded GossipVersion1 instead of forwarding the requested version. Calling ForEachNodeCached(ctx, GossipVersion2, ...) therefore returned v2 nodes paired with v1 channels — silently inconsistent data. The bug was latent because every existing caller happens to request v1, but it would surface as soon as any v2-scoped caller appears. Add a regression test that creates a v2-only channel between two nodes that exist under both versions and asserts that ForEachNodeCached(ctx, GossipVersion2, ...) reports the channel for each endpoint. --- graph/db/graph_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++ graph/db/sql_store.go | 2 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 0f923862d4..b25f7341fc 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7309,3 +7309,65 @@ func TestPreferredForEachChannel(t *testing.T) { require.Nil(t, gotShellPref.p1) require.Nil(t, gotShellPref.p2) } + +// TestForEachNodeCachedHonoursVersion verifies that +// SQLStore.ForEachNodeCached uses the requested gossip version when looking +// up channels for the iterated nodes, rather than silently falling back to +// v1 channels for v2 nodes. +func TestForEachNodeCachedHonoursVersion(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("test only meaningful for SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + // Create two nodes that exist under both v1 and v2. + priv1, err := btcec.NewPrivateKey() + require.NoError(t, err) + priv2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv1), + )) + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv2), + )) + node1V2 := createNode(t, lnwire.GossipVersion2, priv1) + node2V2 := createNode(t, lnwire.GossipVersion2, priv2) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a channel only under v2 between the two nodes. If + // ForEachNodeCached honours its version parameter, callers asking for + // v2 nodes must see this channel; if it instead hardcodes v1 in the + // channel lookup, the channel will be missing. + edgeV2, _ := createEdge( + lnwire.GossipVersion2, 100, 1, 0, 1, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeV2)) + + store := graph.db + chansSeen := 0 + err = store.ForEachNodeCached( + ctx, lnwire.GossipVersion2, + func(_ context.Context, n route.Vertex, + chans map[uint64]*DirectedChannel) error { + + if n == node1V2.PubKeyBytes || + n == node2V2.PubKeyBytes { + + chansSeen += len(chans) + } + + return nil + }, func() { chansSeen = 0 }, + ) + require.NoError(t, err) + require.Equal(t, 2, chansSeen, + "v2 ForEachNodeCached should report the v2 channel for "+ + "each endpoint") +} diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 9f56b44e34..d172ba4b67 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1769,7 +1769,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, // page. allChannels, err := db.ListChannelsForNodeIDs( ctx, sqlc.ListChannelsForNodeIDsParams{ - Version: int16(lnwire.GossipVersion1), + Version: int16(v), Node1Ids: nodeIDs, Node2Ids: nodeIDs, }, From e14546c9fa8c03a23d60aff8a3930fe82453b395 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 15:12:16 -0700 Subject: [PATCH 09/12] graph/db: simplify no-cache fallback paths to v1-only The in-memory graph cache is only ever disabled on the bbolt KV backend in production, so the no-cache fallback paths in ChannelGraph.ForEachNodeDirectedChannel, ChannelGraph.FetchNodeFeatures and VersionedGraph.GraphSession will only run against a v1-only store. The v2-then-v1 loop in FetchNodeFeatures is therefore dead code in production, and the comments framing v1 as a temporary choice are misleading. Collapse FetchNodeFeatures to a direct v1 call, drop the "version-scoped"/"for now" comment language, and trim the TestPreferredNodeTraversal cases that were exercising the now-removed v2 fallback. Rename the test to TestNoCacheNodeTraversal to reflect what it actually covers (the v1 KV path). --- graph/db/graph.go | 26 +++---------- graph/db/graph_test.go | 84 +++++++++++------------------------------- 2 files changed, 28 insertions(+), 82 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index 5dd0b8767b..e89c9ca2b1 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -308,9 +308,8 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context, return c.cache.graphCache.ForEachChannel(node, cb) } - // Without the in-memory cache, traversal stays version-scoped. Both - // SQL and KV stores support v1 adjacency reads here; cross-version - // merging is reserved for the cache-backed path above. + // The no-cache path only runs against the KV backend, which is + // v1-only. return c.db.ForEachNodeDirectedChannel( ctx, gossipV1, node, cb, reset, ) @@ -329,22 +328,9 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, return c.cache.graphCache.GetFeatures(node), nil } - // Try v2 first, fall back to v1 if the v2 features are empty. - for _, v := range []lnwire.GossipVersion{gossipV2, gossipV1} { - features, err := c.db.FetchNodeFeatures(ctx, v, node) - if errors.Is(err, ErrVersionNotSupportedForKVDB) { - continue - } - if err != nil { - return nil, err - } - - if !features.IsEmpty() { - return features, nil - } - } - - return lnwire.EmptyFeatureVector(), nil + // The no-cache path only runs against the KV backend, which is + // v1-only. + return c.db.FetchNodeFeatures(ctx, gossipV1, node) } // GraphSession will provide the call-back with access to a NodeTraverser @@ -991,7 +977,7 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, // for performing queries against the channel graph. If the graph cache is // enabled, the callback receives the VersionedGraph directly (which implements // NodeTraverser using the cache). Otherwise a read-only database session is -// used, and that session remains v1-only for now. +// used; the no-cache path only runs against the KV backend, which is v1-only. func (c *VersionedGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index b25f7341fc..71ae4ec9fb 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7001,98 +7001,58 @@ func TestDeleteNodePreferredRecomputation(t *testing.T) { "node should be gone after deleting all versions") } -// TestPreferredNodeTraversal verifies that ChannelGraph's -// ForEachNodeDirectedChannel and FetchNodeFeatures correctly prefer v2 over v1 -// when the graph cache is disabled (exercising the no-cache code paths). -func TestPreferredNodeTraversal(t *testing.T) { +// TestNoCacheNodeTraversal verifies that the no-cache fallback paths of +// ChannelGraph.FetchNodeFeatures and ChannelGraph.ForEachNodeDirectedChannel +// return v1 data correctly. The no-cache path is only ever reached on the KV +// backend in production (which is v1-only), so cross-version semantics are +// the cache path's job, not this one's. +func TestNoCacheNodeTraversal(t *testing.T) { t.Parallel() if !isSQLDB { - t.Skip("preferred lookup requires SQL backend") + t.Skip("test only meaningful for SQL backend") } ctx := t.Context() - // Disable the cache so we exercise the no-cache code paths in - // ChannelGraph.ForEachNodeDirectedChannel and FetchNodeFeatures. + // Disable the cache so we exercise the no-cache code paths. graph := MakeTestGraph(t, WithUseGraphCache(false)) - // --- FetchNodeFeatures --- - - // Create a v1-only node and verify its features are returned. - privV1, err := btcec.NewPrivateKey() - require.NoError(t, err) - - nodeV1 := createNode(t, lnwire.GossipVersion1, privV1) - require.NoError(t, graph.AddNode(ctx, nodeV1)) - - features, err := graph.FetchNodeFeatures(ctx, nodeV1.PubKeyBytes) + priv1, err := btcec.NewPrivateKey() require.NoError(t, err) - require.False(t, features.IsEmpty(), - "v1-only node should have features") - - // Create a v2-only node and verify its features are returned - // (exercises the v2 fallback). - privV2, err := btcec.NewPrivateKey() + priv2, err := btcec.NewPrivateKey() require.NoError(t, err) - nodeV2 := createNode(t, lnwire.GossipVersion2, privV2) - require.NoError(t, graph.AddNode(ctx, nodeV2)) + node1 := createNode(t, lnwire.GossipVersion1, priv1) + node2 := createNode(t, lnwire.GossipVersion1, priv2) + require.NoError(t, graph.AddNode(ctx, node1)) + require.NoError(t, graph.AddNode(ctx, node2)) - features, err = graph.FetchNodeFeatures(ctx, nodeV2.PubKeyBytes) + // FetchNodeFeatures should return the node's v1 features. + features, err := graph.FetchNodeFeatures(ctx, node1.PubKeyBytes) require.NoError(t, err) require.False(t, features.IsEmpty(), - "v2-only node should have features") + "v1 node should have features") - // Create a node with both v1 and v2 announcements. - privBoth, err := btcec.NewPrivateKey() - require.NoError(t, err) - - nodeBothV1 := createNode(t, lnwire.GossipVersion1, privBoth) - v1Features := lnwire.NewFeatureVector( - lnwire.NewRawFeatureVector(lnwire.GossipQueriesRequired), - lnwire.Features, - ) - nodeBothV1.Features = v1Features - require.NoError(t, graph.AddNode(ctx, nodeBothV1)) - - nodeBothV2 := createNode(t, lnwire.GossipVersion2, privBoth) - v2Features := lnwire.NewFeatureVector( - lnwire.NewRawFeatureVector(lnwire.TLVOnionPayloadRequired), - lnwire.Features, - ) - nodeBothV2.Features = v2Features - require.NoError(t, graph.AddNode(ctx, nodeBothV2)) - - features, err = graph.FetchNodeFeatures( - ctx, nodeBothV1.PubKeyBytes, - ) - require.NoError(t, err) - require.Equal(t, v2Features, features) - require.NotEqual(t, v1Features, features) - - // --- ForEachNodeDirectedChannel --- - - // Add a v1 channel between nodeV1 and nodeBothV1. + // Add a v1 channel and verify ForEachNodeDirectedChannel sees it. edge, _ := createEdge( - lnwire.GossipVersion1, 100, 0, 0, 0, - nodeV1, nodeBothV1, + lnwire.GossipVersion1, 100, 0, 0, 0, node1, node2, ) require.NoError(t, graph.AddChannelEdge(ctx, edge)) pol := newEdgePolicy( lnwire.GossipVersion1, edge.ChannelID, 1000, true, ) - pol.ToNode = nodeBothV1.PubKeyBytes + pol.ToNode = node2.PubKeyBytes pol.SigBytes = testSig.Serialize() require.NoError(t, graph.UpdateEdgePolicy(ctx, pol)) - // ForEachNodeDirectedChannel should find the channel. var foundChannels int err = graph.ForEachNodeDirectedChannel( - ctx, nodeV1.PubKeyBytes, + ctx, node1.PubKeyBytes, func(_ *DirectedChannel) error { foundChannels++ + return nil }, func() { foundChannels = 0 From 6c7548883d4be4305362bc07cc93239d5cf89f46 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 14 May 2026 06:58:35 -0700 Subject: [PATCH 10/12] graph/db: drop redundant VersionedGraph.GraphSession override MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VersionedGraph.GraphSession duplicated ChannelGraph.GraphSession with no observable difference: in the cache-loaded branch, both pass themselves to the callback, and the cache's NodeTraverser surface (GetFeatures, ForEachChannel) has no version concept — so receiving a *VersionedGraph vs a *ChannelGraph routes to the same cache lookups. In the no-cache branch, both delegate to c.db.GraphSession identically. Remove the override and fold its v1-only note into ChannelGraph's docstring so the production invariant (no-cache path is KV-only, which is v1-only) is preserved at the surviving callsite. This mirrors the hygiene from the earlier ForEachNode/ForEachChannel cleanup, where VersionedGraph overrides that ignored c.v were moved off the wrapper. --- graph/db/graph.go | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index e89c9ca2b1..1a355d8324 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -336,7 +336,8 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, // GraphSession will provide the call-back with access to a NodeTraverser // instance which can be used to perform queries against the channel graph. If // the graph cache is not enabled, then the call-back will be provided with -// access to the graph via a consistent read-only transaction. +// access to the graph via a consistent read-only transaction; the no-cache +// path only runs against the KV backend, which is v1-only. func (c *ChannelGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { @@ -973,21 +974,6 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, return c.db.ChannelView(ctx, c.v) } -// GraphSession provides the callback with access to a NodeTraverser instance -// for performing queries against the channel graph. If the graph cache is -// enabled, the callback receives the VersionedGraph directly (which implements -// NodeTraverser using the cache). Otherwise a read-only database session is -// used; the no-cache path only runs against the KV backend, which is v1-only. -func (c *VersionedGraph) GraphSession(ctx context.Context, - cb func(graph NodeTraverser) error, reset func()) error { - - if c.cache != nil && c.cache.isLoaded() { - return cb(c) - } - - return c.db.GraphSession(ctx, cb, reset) -} - // FetchNode attempts to look up a target node by its identity public key. func (c *VersionedGraph) FetchNode(ctx context.Context, nodePub route.Vertex) (*models.Node, error) { From 40d8c13bec8ab031da21693b0cdac8841563a65d Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 14 May 2026 07:02:58 -0700 Subject: [PATCH 11/12] graph/db: document why sqlNodeTraverser hardcodes v1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sqlNodeTraverser.ForEachNodeDirectedChannel and FetchNodeFeatures both pass lnwire.GossipVersion1 to their helpers without explanation. The reason this is correct is non-obvious: sqlNodeTraverser is only ever constructed by SQLStore.GraphSession, which is only reached when the in-memory graph cache is unavailable. Since the SQL backend always runs with the cache enabled in production, this fallback never executes at runtime — the cache-backed NodeTraverser (which already merges v1+v2) is what real callers see. Only tests exercise this code, and they operate on v1 data. Capture that rationale on the type doc so a future reader doesn't mistake the hardcoding for a bug. Also switch the call sites from lnwire.GossipVersion1 to the file-local gossipV1 alias to match the convention used elsewhere in this file. --- graph/db/sql_store.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index d172ba4b67..9ec43af58a 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -4177,6 +4177,14 @@ func (s *SQLStore) GraphSession(ctx context.Context, // sqlNodeTraverser implements the NodeTraverser interface but with a backing // read only transaction for a consistent view of the graph. +// +// The read methods below hardcode gossipV1: this type is only constructed by +// SQLStore.GraphSession, which is itself only invoked when the in-memory graph +// cache is unavailable (see ChannelGraph.GraphSession). The SQL backend always +// runs with the cache enabled in production, so this fallback is never reached +// at runtime; the cache-backed NodeTraverser is what real callers use, and it +// already merges v1+v2 with v2 precedence. The only paths that exercise this +// code are tests, which operate on v1 data. type sqlNodeTraverser struct { db SQLQueries chain chainhash.Hash @@ -4205,7 +4213,7 @@ func (s *sqlNodeTraverser) ForEachNodeDirectedChannel( cb func(channel *DirectedChannel) error, _ func()) error { return forEachNodeDirectedChannel( - ctx, s.db, lnwire.GossipVersion1, nodePub, cb, + ctx, s.db, gossipV1, nodePub, cb, ) } @@ -4217,7 +4225,7 @@ func (s *sqlNodeTraverser) FetchNodeFeatures(ctx context.Context, nodePub route.Vertex) ( *lnwire.FeatureVector, error) { - return fetchNodeFeatures(ctx, s.db, lnwire.GossipVersion1, nodePub) + return fetchNodeFeatures(ctx, s.db, gossipV1, nodePub) } // forEachNodeDirectedChannel iterates through all channels of a given From a7df91341a41981894420683f4c3ce5212a9a5ce Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:09:38 +0545 Subject: [PATCH 12/12] docs: add release note for cross-version graph Store --- docs/release-notes/release-notes-0.21.0.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index 13d36c69f3..da4cfeba42 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -420,6 +420,12 @@ fallback](https://github.com/lightningnetwork/lnd/pull/10717) so that gossip channel filtering and zombie edge lookups use the correct gossip version instead of hardcoding v1. +* Make the [graph `Store` interface + cross-version](https://github.com/lightningnetwork/lnd/pull/10714) so that + `ForEachNode`, `ForEachChannel`, and `ForEachNodeDirectedChannel` work across + gossip v1 and v2. Add `Preferred` fetch helpers and `GetVersions` queries + so callers can retrieve channels without knowing which gossip version + announced them. * Updated waiting proof persistence for gossip upgrades by introducing typed waiting proof keys and payloads, with a DB migration to rewrite legacy waiting proof records to the new key/value format