From ed86ba38edcfc6c534863d080f29492589b69539 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 20 May 2026 16:52:12 -0700 Subject: [PATCH 1/3] channeldb: add LoadChannelFwdPkgsSet for batched fwd pkg loading In this commit, we extend `GlobalFwdPkgReader` (and its `SwitchPackager` implementation) with a new `LoadChannelFwdPkgsSet` method that loads fwd pkgs for a set of channels within a single read transaction. The existing `LoadChannelFwdPkgs` is per-channel, so callers that want to read fwd pkgs for the entire active channel set end up opening one read tx per channel. On nodes with 1k+ channels, this gets expensive fast, especially on remote-tx backends (etcd, postgres) where each tx carries network round-trip overhead. The new query takes a `[]lnwire.ShortChannelID`, walks the `fwd-packages` root bucket once, and returns a flat `[]*FwdPkg` across the requested set. Each `FwdPkg` already carries its originating channel via its `Source` field, so callers that only need to operate on the pkgs (e.g. the switch's reforward path) don't need a keyed map. Channels with no fwd pkgs on disk are silently skipped. --- channeldb/forwarding_package.go | 78 ++++++++++++++++++++++++ channeldb/forwarding_package_test.go | 91 ++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index c393a53b37f..331586d6e55 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -391,6 +391,13 @@ type GlobalFwdPkgReader interface { // channel. LoadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) + + // LoadChannelFwdPkgsSet loads all known forwarding packages for the + // given set of channels within a single read transaction. The result + // is a flat slice across all channels. Each returned FwdPkg carries + // its originating channel via its Source field. + LoadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) } // FwdOperator defines the interfaces for managing forwarding packages that are @@ -433,6 +440,16 @@ func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx, return loadChannelFwdPkgs(tx, source) } +// LoadChannelFwdPkgsSet loads all forwarding packages for the provided set of +// channels using a single read transaction. The returned slice contains the +// fwd pkgs for all channels concatenated together; each FwdPkg carries its +// originating channel via its Source field. +func (*SwitchPackager) LoadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) { + + return loadChannelFwdPkgsSet(tx, sources) +} + // FwdPackager supports all operations required to modify fwd packages, such as // creation, updates, reading, and removal. The interfaces are broken down in // this way to support future delegation of the subinterfaces. @@ -607,6 +624,67 @@ func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, e return fwdPkgs, nil } +// loadChannelFwdPkgsSet loads all forwarding packages for the provided set of +// channels using a single read transaction. The returned slice contains the +// fwd pkgs across all channels concatenated together; each FwdPkg carries its +// originating channel via its Source field. +// +// This is the batched equivalent of calling loadChannelFwdPkgs in a loop, each +// within its own db transaction. By amortizing the cost of opening a read +// transaction across many channels, this is materially faster when reading +// fwd pkgs for a large number of channels (1k+), particularly on remote-tx +// backends such as etcd or postgres where each transaction carries network +// round-trip overhead. +func loadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) { + + fwdPkgBkt := tx.ReadBucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return nil, nil + } + + // Pre-size optimistically assuming most channels carry one fwd pkg. + // The slice will grow as needed but this avoids early reallocations + // for the common case. + result := make([]*FwdPkg, 0, len(sources)) + + // heights is reused across channels to avoid per-channel slice + // allocation churn. + var heights []uint64 + for _, source := range sources { + sourceKey := makeLogKey(source.ToUint64()) + sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:]) + if sourceBkt == nil { + continue + } + + heights = heights[:0] + err := sourceBkt.ForEach(func(k, _ []byte) error { + if len(k) != 8 { + return ErrCorruptedFwdPkg + } + + heights = append(heights, byteOrder.Uint64(k)) + + return nil + }) + if err != nil { + return nil, err + } + + for _, height := range heights { + fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height) + if err != nil { + return nil, err + } + + result = append(result, fwdPkg) + } + } + + return result, nil +} + // loadFwdPkg reads the packager's fwd pkg at a given height, and determines the // appropriate FwdState. func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID, diff --git a/channeldb/forwarding_package_test.go b/channeldb/forwarding_package_test.go index b11764bee93..4bb161105bb 100644 --- a/channeldb/forwarding_package_test.go +++ b/channeldb/forwarding_package_test.go @@ -851,6 +851,97 @@ func loadFwdPkgs(t *testing.T, db kvdb.Backend, return fwdPkgs } +// TestSwitchPackagerLoadChannelFwdPkgsSet verifies that the batched fwd pkg +// load returns the same fwd pkgs as the per-channel call, that channels with +// no fwd pkgs are silently skipped, and that pkgs from distinct channels are +// not mixed up. +func TestSwitchPackagerLoadChannelFwdPkgsSet(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + // Channel 1: two pkgs (heights 0 and 1). + ch1 := lnwire.NewShortChanIDFromInt(1) + ch1Packager := channeldb.NewChannelPackager(ch1) + + // Channel 2: one pkg (height 7), distinct height to test multi-height + // ordering. + ch2 := lnwire.NewShortChanIDFromInt(2) + ch2Packager := channeldb.NewChannelPackager(ch2) + + // Channel 3: deliberately has no fwd pkgs on disk. It must be silently + // dropped from the result, not surface as an error. + ch3 := lnwire.NewShortChanIDFromInt(3) + + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + ch1Pkg0 := channeldb.NewFwdPkg( + ch1, 0, testAdds(), testSettleFails(), + ) + if err := ch1Packager.AddFwdPkg(tx, ch1Pkg0); err != nil { + return err + } + + ch1Pkg1 := channeldb.NewFwdPkg( + ch1, 1, testAdds(), testSettleFails(), + ) + if err := ch1Packager.AddFwdPkg(tx, ch1Pkg1); err != nil { + return err + } + + ch2Pkg := channeldb.NewFwdPkg( + ch2, 7, testAdds(), testSettleFails(), + ) + return ch2Packager.AddFwdPkg(tx, ch2Pkg) + }, func() {}) + require.NoError(t, err) + + // Validate against the per-channel call. Construct the expected + // concatenation in the same order LoadChannelFwdPkgsSet processes its + // inputs. + switchPkgr := channeldb.NewSwitchPackager() + var perChannel []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + for _, source := range []lnwire.ShortChannelID{ch1, ch3, ch2} { + pkgs, err := switchPkgr.LoadChannelFwdPkgs(tx, source) + if err != nil { + return err + } + perChannel = append(perChannel, pkgs...) + } + return nil + }, func() { perChannel = nil }) + require.NoError(t, err) + + var batched []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + var err error + batched, err = switchPkgr.LoadChannelFwdPkgsSet( + tx, []lnwire.ShortChannelID{ch1, ch3, ch2}, + ) + return err + }, func() { batched = nil }) + require.NoError(t, err) + + require.Len(t, batched, 3, "expected 3 fwd pkgs across the channel set") + require.Equal(t, perChannel, batched, + "batched load must match per-channel concatenation") + + // Sources should track which channel each pkg belongs to. + require.Equal(t, ch1, batched[0].Source) + require.Equal(t, ch1, batched[1].Source) + require.Equal(t, ch2, batched[2].Source) + + // Empty input should yield no work and no error. + var empty []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + var err error + empty, err = switchPkgr.LoadChannelFwdPkgsSet(tx, nil) + return err + }, func() { empty = nil }) + require.NoError(t, err) + require.Empty(t, empty) +} + // makeFwdPkgDB initializes a test database for forwarding packages. If the // provided path is an empty, it will create a temp dir/file to use. func makeFwdPkgDB(t *testing.T, path string) kvdb.Backend { // nolint:unparam From ef4c4320524ba173e804132f557a897e4dab7e2e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 20 May 2026 16:52:30 -0700 Subject: [PATCH 2/3] channeldb: benchmark fwd pkg loading per-tx vs batched MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we add a pair of benchmarks that exercise the prior per-channel-tx path against the new batched `LoadChannelFwdPkgsSet` query, across a matrix of channel-count + pkgs-per-channel scenarios (up to 2k channels). The bench names are structured so they can be paired up under a common name and fed to benchstat. On a local bolt backend the per-iteration time is roughly at parity for both variants (bolt read tx setup is cheap, so amortizing it across channels doesn't move the needle much), while the batched path consistently drops ~10% of allocs/op and ~4% of B/op across the matrix. The bigger win lands on the remote-tx backends (etcd, postgres) where the per-tx network round-trip dominates and the batched query collapses N round-trips into one. Sample benchstat output on darwin/arm64 (M4 Max), `-benchtime=2s -count=6`: ``` │ old (per-tx) │ new (batched) │ │ sec/op │ sec/op vs base │ LoadChannelFwdPkgs/channels=100/pkgs=1-16 352.4µ ± 5% 341.9µ ± 3% ~ LoadChannelFwdPkgs/channels=500/pkgs=1-16 1.954m ± 3% 1.959m ± 3% ~ LoadChannelFwdPkgs/channels=1000/pkgs=1-16 4.036m ± 6% 3.990m ± 7% ~ LoadChannelFwdPkgs/channels=2000/pkgs=1-16 8.819m ± 18% 8.120m ± 44% ~ LoadChannelFwdPkgs/channels=1000/pkgs=4-16 15.86m ± 4% 17.11m ± 32% ~ geomean 3.296m 3.265m -0.93% │ B/op │ B/op vs base │ LoadChannelFwdPkgs/channels=1000/pkgs=1-16 11.57Mi ± 0% 11.04Mi ± 0% -4.61% geomean 9.478Mi 9.107Mi -3.91% │ allocs/op │ allocs/op vs base │ LoadChannelFwdPkgs/channels=1000/pkgs=1-16 160.4k ± 0% 144.4k ± 0% -9.97% geomean 123.3k 112.5k -8.74% ``` --- channeldb/forwarding_package_bench_test.go | 195 +++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 channeldb/forwarding_package_bench_test.go diff --git a/channeldb/forwarding_package_bench_test.go b/channeldb/forwarding_package_bench_test.go new file mode 100644 index 00000000000..7472c38ff8f --- /dev/null +++ b/channeldb/forwarding_package_bench_test.go @@ -0,0 +1,195 @@ +package channeldb_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// benchFwdPkgScenario describes the shape of the synthetic fwd pkg DB used by +// the load benchmarks: numChannels independent channels, each carrying +// pkgsPerChannel forwarding packages with a mix of adds and settle/fails. +type benchFwdPkgScenario struct { + numChannels int + pkgsPerChannel int +} + +// populateFwdPkgs writes a synthetic fwd pkg database for the provided +// scenario. All channels are populated within a single write transaction so +// that benchmark setup is dominated by the work under test rather than by the +// fsync cost of one write tx per channel. +func populateFwdPkgs(b *testing.B, db kvdb.Backend, + scenario benchFwdPkgScenario) []lnwire.ShortChannelID { + + b.Helper() + + sources := make([]lnwire.ShortChannelID, 0, scenario.numChannels) + for i := 0; i < scenario.numChannels; i++ { + // Skip 0 to avoid collisions with hop.Source-style sentinels. + shortChanID := lnwire.NewShortChanIDFromInt(uint64(i + 1)) + sources = append(sources, shortChanID) + } + + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + for _, shortChanID := range sources { + packager := channeldb.NewChannelPackager(shortChanID) + for h := 0; h < scenario.pkgsPerChannel; h++ { + fwdPkg := channeldb.NewFwdPkg( + shortChanID, uint64(h), + testAdds(), testSettleFails(), + ) + if err := packager.AddFwdPkg( + tx, fwdPkg, + ); err != nil { + return err + } + } + } + + return nil + }, func() {}) + require.NoError(b, err) + + return sources +} + +// benchScenarios is the matrix exercised by the load benchmarks. The 1k+ case +// matches the motivating production scenario. +var benchScenarios = []benchFwdPkgScenario{ + {numChannels: 100, pkgsPerChannel: 1}, + {numChannels: 500, pkgsPerChannel: 1}, + {numChannels: 1000, pkgsPerChannel: 1}, + {numChannels: 2000, pkgsPerChannel: 1}, + {numChannels: 1000, pkgsPerChannel: 4}, +} + +// BenchmarkLoadChannelFwdPkgsPerTx measures the prior strategy of opening one +// read transaction per channel to load that channel's forwarding packages. +func BenchmarkLoadChannelFwdPkgsPerTx(b *testing.B) { + for _, scenario := range benchScenarios { + scenario := scenario + name := benchName(scenario) + b.Run(name, func(b *testing.B) { + db := makeFwdPkgBenchDB(b) + sources := populateFwdPkgs(b, db, scenario) + + pkgr := channeldb.NewSwitchPackager() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + totalPkgs := 0 + for _, source := range sources { + var fwdPkgs []*channeldb.FwdPkg + err := kvdb.View(db, + func(tx kvdb.RTx) error { + var err error + fwdPkgs, err = pkgr.LoadChannelFwdPkgs( //nolint:lll + tx, source, + ) + return err + }, func() { + fwdPkgs = nil + }, + ) + if err != nil { + b.Fatal(err) + } + totalPkgs += len(fwdPkgs) + } + + if totalPkgs == 0 { + b.Fatal("expected fwd pkgs to be loaded") + } + } + }) + } +} + +// BenchmarkLoadChannelFwdPkgsSet measures the batched query which loads +// forwarding packages for the entire set of channels in a single read +// transaction. +func BenchmarkLoadChannelFwdPkgsSet(b *testing.B) { + for _, scenario := range benchScenarios { + scenario := scenario + name := benchName(scenario) + b.Run(name, func(b *testing.B) { + db := makeFwdPkgBenchDB(b) + sources := populateFwdPkgs(b, db, scenario) + + pkgr := channeldb.NewSwitchPackager() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var fwdPkgs []*channeldb.FwdPkg + err := kvdb.View(db, func(tx kvdb.RTx) error { + var err error + fwdPkgs, err = pkgr.LoadChannelFwdPkgsSet( //nolint:lll + tx, sources, + ) + return err + }, func() { + fwdPkgs = nil + }) + if err != nil { + b.Fatal(err) + } + + if len(fwdPkgs) == 0 { + b.Fatal("expected fwd pkgs to be loaded") + } + } + }) + } +} + +// benchName produces a deterministic, parseable sub-benchmark name. This shape +// is important so that benchstat can pair the per-tx and batched variants when +// computing deltas. +func benchName(s benchFwdPkgScenario) string { + return "channels=" + itoa(s.numChannels) + + "/pkgs=" + itoa(s.pkgsPerChannel) +} + +// itoa is a tiny strconv.Itoa stand-in to avoid pulling strconv into the test +// file solely for benchmark naming. +func itoa(n int) string { + if n == 0 { + return "0" + } + + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + + return string(buf[i:]) +} + +// makeFwdPkgBenchDB constructs a fresh on-disk fwd pkg database for the +// benchmark to operate on. We use the standard bolt backend here since this is +// the same backend used by lnd in practice for fwd pkg storage. +func makeFwdPkgBenchDB(b *testing.B) kvdb.Backend { + b.Helper() + + path := b.TempDir() + "/fwdpkg.db" + bdb, err := kvdb.Create( + kvdb.BoltBackendName, path, true, kvdb.DefaultDBTimeout, false, + ) + require.NoError(b, err, "unable to open boltdb") + + b.Cleanup(func() { + _ = bdb.Close() + }) + + return bdb +} From 5321e1c0587ec095caa123d627530b2e9e388b6a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 20 May 2026 16:52:41 -0700 Subject: [PATCH 3/3] htlcswitch: load reforward fwd pkgs in a single read tx In this commit, we rework `reforwardResponses` to load fwd pkgs for every active channel in a single read transaction via the new `SwitchPackager.LoadChannelFwdPkgsSet` query, instead of opening one read tx per channel. The prior loop opened a fresh `kvdb.View` for each channel returned by `FetchAllChannels` so that it could call `LoadChannelFwdPkgs(source)`. On nodes with 1k+ channels this adds up to 1k+ read txs at startup, which is particularly painful on remote-tx backends (etcd, postgres) where each tx is a network round-trip. We now collect the set of non-pending, non-local sources up front and hand the whole set to `LoadChannelFwdPkgsSet`, which walks the `fwd-packages` bucket once and returns a flat slice of fwd pkgs across all channels. `reforwardSettleFails` already keys off `fwdPkg.Source` for its ack/forward routing, so flattening the result doesn't lose any information: each pkg still knows the channel it came from. --- htlcswitch/switch.go | 49 ++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a3aae809b93..50a4337f3da 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1868,6 +1868,10 @@ func (s *Switch) reforwardResponses() error { return err } + // Collect the set of channels that may have outstanding responses in + // their forwarding packages. We skip locally-initiated payments and + // pending channels, since neither will have fwd pkgs to reload. + sources := make([]lnwire.ShortChannelID, 0, len(openChannels)) for _, openChannel := range openChannels { shortChanID := openChannel.ShortChanID() @@ -1882,34 +1886,43 @@ func (s *Switch) reforwardResponses() error { continue } - // Channels in open or waiting-close may still have responses in - // their forwarding packages. We will continue to reattempt - // forwarding on startup until the channel is fully-closed. - // - // Load this channel's forwarding packages, and deliver them to - // the switch. - fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) - if err != nil { - log.Errorf("unable to load forwarding "+ - "packages for %v: %v", shortChanID, err) - return err - } + sources = append(sources, shortChanID) + } + + if len(sources) == 0 { + return nil + } - s.reforwardSettleFails(fwdPkgs) + // Channels in open or waiting-close may still have responses in their + // forwarding packages. We will continue to reattempt forwarding on + // startup until the channel is fully-closed. + // + // Load all fwd pkgs across the active channel set in a single read + // transaction, which is materially faster than opening one tx per + // channel for nodes with a large channel set. + fwdPkgs, err := s.loadChannelFwdPkgsSet(sources) + if err != nil { + log.Errorf("unable to load forwarding packages: %v", err) + return err } + s.reforwardSettleFails(fwdPkgs) + return nil } -// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short -// channel identifier. -func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { +// loadChannelFwdPkgsSet loads all forwarding packages owned by the provided +// set of `sources` short channel identifiers using a single read transaction. +// The returned slice contains the fwd pkgs across all channels; each FwdPkg +// carries its originating channel via its Source field. +func (s *Switch) loadChannelFwdPkgsSet( + sources []lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { var fwdPkgs []*channeldb.FwdPkg if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error { var err error - fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs( - tx, source, + fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgsSet( + tx, sources, ) return err }, func() {