diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index c393a53b37f..331586d6e55 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -391,6 +391,13 @@ type GlobalFwdPkgReader interface { // channel. LoadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) + + // LoadChannelFwdPkgsSet loads all known forwarding packages for the + // given set of channels within a single read transaction. The result + // is a flat slice across all channels. Each returned FwdPkg carries + // its originating channel via its Source field. + LoadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) } // FwdOperator defines the interfaces for managing forwarding packages that are @@ -433,6 +440,16 @@ func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx, return loadChannelFwdPkgs(tx, source) } +// LoadChannelFwdPkgsSet loads all forwarding packages for the provided set of +// channels using a single read transaction. The returned slice contains the +// fwd pkgs for all channels concatenated together; each FwdPkg carries its +// originating channel via its Source field. +func (*SwitchPackager) LoadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) { + + return loadChannelFwdPkgsSet(tx, sources) +} + // FwdPackager supports all operations required to modify fwd packages, such as // creation, updates, reading, and removal. The interfaces are broken down in // this way to support future delegation of the subinterfaces. @@ -607,6 +624,67 @@ func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, e return fwdPkgs, nil } +// loadChannelFwdPkgsSet loads all forwarding packages for the provided set of +// channels using a single read transaction. The returned slice contains the +// fwd pkgs across all channels concatenated together; each FwdPkg carries its +// originating channel via its Source field. +// +// This is the batched equivalent of calling loadChannelFwdPkgs in a loop, each +// within its own db transaction. By amortizing the cost of opening a read +// transaction across many channels, this is materially faster when reading +// fwd pkgs for a large number of channels (1k+), particularly on remote-tx +// backends such as etcd or postgres where each transaction carries network +// round-trip overhead. +func loadChannelFwdPkgsSet(tx kvdb.RTx, + sources []lnwire.ShortChannelID) ([]*FwdPkg, error) { + + fwdPkgBkt := tx.ReadBucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return nil, nil + } + + // Pre-size optimistically assuming most channels carry one fwd pkg. + // The slice will grow as needed but this avoids early reallocations + // for the common case. + result := make([]*FwdPkg, 0, len(sources)) + + // heights is reused across channels to avoid per-channel slice + // allocation churn. + var heights []uint64 + for _, source := range sources { + sourceKey := makeLogKey(source.ToUint64()) + sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:]) + if sourceBkt == nil { + continue + } + + heights = heights[:0] + err := sourceBkt.ForEach(func(k, _ []byte) error { + if len(k) != 8 { + return ErrCorruptedFwdPkg + } + + heights = append(heights, byteOrder.Uint64(k)) + + return nil + }) + if err != nil { + return nil, err + } + + for _, height := range heights { + fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height) + if err != nil { + return nil, err + } + + result = append(result, fwdPkg) + } + } + + return result, nil +} + // loadFwdPkg reads the packager's fwd pkg at a given height, and determines the // appropriate FwdState. func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID, diff --git a/channeldb/forwarding_package_bench_test.go b/channeldb/forwarding_package_bench_test.go new file mode 100644 index 00000000000..7472c38ff8f --- /dev/null +++ b/channeldb/forwarding_package_bench_test.go @@ -0,0 +1,195 @@ +package channeldb_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// benchFwdPkgScenario describes the shape of the synthetic fwd pkg DB used by +// the load benchmarks: numChannels independent channels, each carrying +// pkgsPerChannel forwarding packages with a mix of adds and settle/fails. +type benchFwdPkgScenario struct { + numChannels int + pkgsPerChannel int +} + +// populateFwdPkgs writes a synthetic fwd pkg database for the provided +// scenario. All channels are populated within a single write transaction so +// that benchmark setup is dominated by the work under test rather than by the +// fsync cost of one write tx per channel. +func populateFwdPkgs(b *testing.B, db kvdb.Backend, + scenario benchFwdPkgScenario) []lnwire.ShortChannelID { + + b.Helper() + + sources := make([]lnwire.ShortChannelID, 0, scenario.numChannels) + for i := 0; i < scenario.numChannels; i++ { + // Skip 0 to avoid collisions with hop.Source-style sentinels. + shortChanID := lnwire.NewShortChanIDFromInt(uint64(i + 1)) + sources = append(sources, shortChanID) + } + + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + for _, shortChanID := range sources { + packager := channeldb.NewChannelPackager(shortChanID) + for h := 0; h < scenario.pkgsPerChannel; h++ { + fwdPkg := channeldb.NewFwdPkg( + shortChanID, uint64(h), + testAdds(), testSettleFails(), + ) + if err := packager.AddFwdPkg( + tx, fwdPkg, + ); err != nil { + return err + } + } + } + + return nil + }, func() {}) + require.NoError(b, err) + + return sources +} + +// benchScenarios is the matrix exercised by the load benchmarks. The 1k+ case +// matches the motivating production scenario. +var benchScenarios = []benchFwdPkgScenario{ + {numChannels: 100, pkgsPerChannel: 1}, + {numChannels: 500, pkgsPerChannel: 1}, + {numChannels: 1000, pkgsPerChannel: 1}, + {numChannels: 2000, pkgsPerChannel: 1}, + {numChannels: 1000, pkgsPerChannel: 4}, +} + +// BenchmarkLoadChannelFwdPkgsPerTx measures the prior strategy of opening one +// read transaction per channel to load that channel's forwarding packages. +func BenchmarkLoadChannelFwdPkgsPerTx(b *testing.B) { + for _, scenario := range benchScenarios { + scenario := scenario + name := benchName(scenario) + b.Run(name, func(b *testing.B) { + db := makeFwdPkgBenchDB(b) + sources := populateFwdPkgs(b, db, scenario) + + pkgr := channeldb.NewSwitchPackager() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + totalPkgs := 0 + for _, source := range sources { + var fwdPkgs []*channeldb.FwdPkg + err := kvdb.View(db, + func(tx kvdb.RTx) error { + var err error + fwdPkgs, err = pkgr.LoadChannelFwdPkgs( //nolint:lll + tx, source, + ) + return err + }, func() { + fwdPkgs = nil + }, + ) + if err != nil { + b.Fatal(err) + } + totalPkgs += len(fwdPkgs) + } + + if totalPkgs == 0 { + b.Fatal("expected fwd pkgs to be loaded") + } + } + }) + } +} + +// BenchmarkLoadChannelFwdPkgsSet measures the batched query which loads +// forwarding packages for the entire set of channels in a single read +// transaction. +func BenchmarkLoadChannelFwdPkgsSet(b *testing.B) { + for _, scenario := range benchScenarios { + scenario := scenario + name := benchName(scenario) + b.Run(name, func(b *testing.B) { + db := makeFwdPkgBenchDB(b) + sources := populateFwdPkgs(b, db, scenario) + + pkgr := channeldb.NewSwitchPackager() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var fwdPkgs []*channeldb.FwdPkg + err := kvdb.View(db, func(tx kvdb.RTx) error { + var err error + fwdPkgs, err = pkgr.LoadChannelFwdPkgsSet( //nolint:lll + tx, sources, + ) + return err + }, func() { + fwdPkgs = nil + }) + if err != nil { + b.Fatal(err) + } + + if len(fwdPkgs) == 0 { + b.Fatal("expected fwd pkgs to be loaded") + } + } + }) + } +} + +// benchName produces a deterministic, parseable sub-benchmark name. This shape +// is important so that benchstat can pair the per-tx and batched variants when +// computing deltas. +func benchName(s benchFwdPkgScenario) string { + return "channels=" + itoa(s.numChannels) + + "/pkgs=" + itoa(s.pkgsPerChannel) +} + +// itoa is a tiny strconv.Itoa stand-in to avoid pulling strconv into the test +// file solely for benchmark naming. +func itoa(n int) string { + if n == 0 { + return "0" + } + + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + + return string(buf[i:]) +} + +// makeFwdPkgBenchDB constructs a fresh on-disk fwd pkg database for the +// benchmark to operate on. We use the standard bolt backend here since this is +// the same backend used by lnd in practice for fwd pkg storage. +func makeFwdPkgBenchDB(b *testing.B) kvdb.Backend { + b.Helper() + + path := b.TempDir() + "/fwdpkg.db" + bdb, err := kvdb.Create( + kvdb.BoltBackendName, path, true, kvdb.DefaultDBTimeout, false, + ) + require.NoError(b, err, "unable to open boltdb") + + b.Cleanup(func() { + _ = bdb.Close() + }) + + return bdb +} diff --git a/channeldb/forwarding_package_test.go b/channeldb/forwarding_package_test.go index b11764bee93..4bb161105bb 100644 --- a/channeldb/forwarding_package_test.go +++ b/channeldb/forwarding_package_test.go @@ -851,6 +851,97 @@ func loadFwdPkgs(t *testing.T, db kvdb.Backend, return fwdPkgs } +// TestSwitchPackagerLoadChannelFwdPkgsSet verifies that the batched fwd pkg +// load returns the same fwd pkgs as the per-channel call, that channels with +// no fwd pkgs are silently skipped, and that pkgs from distinct channels are +// not mixed up. +func TestSwitchPackagerLoadChannelFwdPkgsSet(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + // Channel 1: two pkgs (heights 0 and 1). + ch1 := lnwire.NewShortChanIDFromInt(1) + ch1Packager := channeldb.NewChannelPackager(ch1) + + // Channel 2: one pkg (height 7), distinct height to test multi-height + // ordering. + ch2 := lnwire.NewShortChanIDFromInt(2) + ch2Packager := channeldb.NewChannelPackager(ch2) + + // Channel 3: deliberately has no fwd pkgs on disk. It must be silently + // dropped from the result, not surface as an error. + ch3 := lnwire.NewShortChanIDFromInt(3) + + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + ch1Pkg0 := channeldb.NewFwdPkg( + ch1, 0, testAdds(), testSettleFails(), + ) + if err := ch1Packager.AddFwdPkg(tx, ch1Pkg0); err != nil { + return err + } + + ch1Pkg1 := channeldb.NewFwdPkg( + ch1, 1, testAdds(), testSettleFails(), + ) + if err := ch1Packager.AddFwdPkg(tx, ch1Pkg1); err != nil { + return err + } + + ch2Pkg := channeldb.NewFwdPkg( + ch2, 7, testAdds(), testSettleFails(), + ) + return ch2Packager.AddFwdPkg(tx, ch2Pkg) + }, func() {}) + require.NoError(t, err) + + // Validate against the per-channel call. Construct the expected + // concatenation in the same order LoadChannelFwdPkgsSet processes its + // inputs. + switchPkgr := channeldb.NewSwitchPackager() + var perChannel []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + for _, source := range []lnwire.ShortChannelID{ch1, ch3, ch2} { + pkgs, err := switchPkgr.LoadChannelFwdPkgs(tx, source) + if err != nil { + return err + } + perChannel = append(perChannel, pkgs...) + } + return nil + }, func() { perChannel = nil }) + require.NoError(t, err) + + var batched []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + var err error + batched, err = switchPkgr.LoadChannelFwdPkgsSet( + tx, []lnwire.ShortChannelID{ch1, ch3, ch2}, + ) + return err + }, func() { batched = nil }) + require.NoError(t, err) + + require.Len(t, batched, 3, "expected 3 fwd pkgs across the channel set") + require.Equal(t, perChannel, batched, + "batched load must match per-channel concatenation") + + // Sources should track which channel each pkg belongs to. + require.Equal(t, ch1, batched[0].Source) + require.Equal(t, ch1, batched[1].Source) + require.Equal(t, ch2, batched[2].Source) + + // Empty input should yield no work and no error. + var empty []*channeldb.FwdPkg + err = kvdb.View(db, func(tx kvdb.RTx) error { + var err error + empty, err = switchPkgr.LoadChannelFwdPkgsSet(tx, nil) + return err + }, func() { empty = nil }) + require.NoError(t, err) + require.Empty(t, empty) +} + // makeFwdPkgDB initializes a test database for forwarding packages. If the // provided path is an empty, it will create a temp dir/file to use. func makeFwdPkgDB(t *testing.T, path string) kvdb.Backend { // nolint:unparam diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a3aae809b93..50a4337f3da 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1868,6 +1868,10 @@ func (s *Switch) reforwardResponses() error { return err } + // Collect the set of channels that may have outstanding responses in + // their forwarding packages. We skip locally-initiated payments and + // pending channels, since neither will have fwd pkgs to reload. + sources := make([]lnwire.ShortChannelID, 0, len(openChannels)) for _, openChannel := range openChannels { shortChanID := openChannel.ShortChanID() @@ -1882,34 +1886,43 @@ func (s *Switch) reforwardResponses() error { continue } - // Channels in open or waiting-close may still have responses in - // their forwarding packages. We will continue to reattempt - // forwarding on startup until the channel is fully-closed. - // - // Load this channel's forwarding packages, and deliver them to - // the switch. - fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) - if err != nil { - log.Errorf("unable to load forwarding "+ - "packages for %v: %v", shortChanID, err) - return err - } + sources = append(sources, shortChanID) + } + + if len(sources) == 0 { + return nil + } - s.reforwardSettleFails(fwdPkgs) + // Channels in open or waiting-close may still have responses in their + // forwarding packages. We will continue to reattempt forwarding on + // startup until the channel is fully-closed. + // + // Load all fwd pkgs across the active channel set in a single read + // transaction, which is materially faster than opening one tx per + // channel for nodes with a large channel set. + fwdPkgs, err := s.loadChannelFwdPkgsSet(sources) + if err != nil { + log.Errorf("unable to load forwarding packages: %v", err) + return err } + s.reforwardSettleFails(fwdPkgs) + return nil } -// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short -// channel identifier. -func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { +// loadChannelFwdPkgsSet loads all forwarding packages owned by the provided +// set of `sources` short channel identifiers using a single read transaction. +// The returned slice contains the fwd pkgs across all channels; each FwdPkg +// carries its originating channel via its Source field. +func (s *Switch) loadChannelFwdPkgsSet( + sources []lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { var fwdPkgs []*channeldb.FwdPkg if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error { var err error - fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs( - tx, source, + fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgsSet( + tx, sources, ) return err }, func() {