Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions channeldb/forwarding_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ type GlobalFwdPkgReader interface {
// channel.
LoadChannelFwdPkgs(tx kvdb.RTx,
source lnwire.ShortChannelID) ([]*FwdPkg, error)

// LoadChannelFwdPkgsSet loads all known forwarding packages for the
// given set of channels within a single read transaction. The result
// is a flat slice across all channels. Each returned FwdPkg carries
// its originating channel via its Source field.
LoadChannelFwdPkgsSet(tx kvdb.RTx,
sources []lnwire.ShortChannelID) ([]*FwdPkg, error)
}

// FwdOperator defines the interfaces for managing forwarding packages that are
Expand Down Expand Up @@ -433,6 +440,16 @@ func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RTx,
return loadChannelFwdPkgs(tx, source)
}

// LoadChannelFwdPkgsSet loads all forwarding packages for the provided set of
// channels using a single read transaction. The returned slice contains the
// fwd pkgs for all channels concatenated together; each FwdPkg carries its
// originating channel via its Source field.
func (*SwitchPackager) LoadChannelFwdPkgsSet(tx kvdb.RTx,
sources []lnwire.ShortChannelID) ([]*FwdPkg, error) {

return loadChannelFwdPkgsSet(tx, sources)
}

// FwdPackager supports all operations required to modify fwd packages, such as
// creation, updates, reading, and removal. The interfaces are broken down in
// this way to support future delegation of the subinterfaces.
Expand Down Expand Up @@ -607,6 +624,67 @@ func loadChannelFwdPkgs(tx kvdb.RTx, source lnwire.ShortChannelID) ([]*FwdPkg, e
return fwdPkgs, nil
}

// loadChannelFwdPkgsSet loads all forwarding packages for the provided set of
// channels using a single read transaction. The returned slice contains the
// fwd pkgs across all channels concatenated together; each FwdPkg carries its
// originating channel via its Source field.
//
// This is the batched equivalent of calling loadChannelFwdPkgs in a loop, each
// within its own db transaction. By amortizing the cost of opening a read
// transaction across many channels, this is materially faster when reading
// fwd pkgs for a large number of channels (1k+), particularly on remote-tx
// backends such as etcd or postgres where each transaction carries network
// round-trip overhead.
func loadChannelFwdPkgsSet(tx kvdb.RTx,
sources []lnwire.ShortChannelID) ([]*FwdPkg, error) {

fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return nil, nil
}

// Pre-size optimistically assuming most channels carry one fwd pkg.
// The slice will grow as needed but this avoids early reallocations
// for the common case.
result := make([]*FwdPkg, 0, len(sources))

// heights is reused across channels to avoid per-channel slice
// allocation churn.
var heights []uint64
for _, source := range sources {
sourceKey := makeLogKey(source.ToUint64())
sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
if sourceBkt == nil {
continue
}

heights = heights[:0]
err := sourceBkt.ForEach(func(k, _ []byte) error {
if len(k) != 8 {
return ErrCorruptedFwdPkg
}

heights = append(heights, byteOrder.Uint64(k))

return nil
})
if err != nil {
return nil, err
}

for _, height := range heights {
fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of loadChannelFwdPkgsSet calls loadFwdPkg for each height, which internally re-fetches the sourceBkt from fwdPkgBkt using source. Since sourceBkt is already retrieved in the outer loop (line 656), this results in redundant bucket lookups for every forwarding package. For nodes with many channels and packages, this can add up, especially on remote backends where even in-transaction lookups may have overhead. Consider refactoring loadFwdPkg to accept the sourceBkt directly to avoid this redundancy.

if err != nil {
return nil, err
}

result = append(result, fwdPkg)
}
}

return result, nil
}

// loadFwdPkg reads the packager's fwd pkg at a given height, and determines the
// appropriate FwdState.
func loadFwdPkg(fwdPkgBkt kvdb.RBucket, source lnwire.ShortChannelID,
Expand Down
195 changes: 195 additions & 0 deletions channeldb/forwarding_package_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package channeldb_test

import (
"testing"

"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)

// benchFwdPkgScenario describes the shape of the synthetic fwd pkg DB used by
// the load benchmarks: numChannels independent channels, each carrying
// pkgsPerChannel forwarding packages with a mix of adds and settle/fails.
type benchFwdPkgScenario struct {
numChannels int
pkgsPerChannel int
}

// populateFwdPkgs writes a synthetic fwd pkg database for the provided
// scenario. All channels are populated within a single write transaction so
// that benchmark setup is dominated by the work under test rather than by the
// fsync cost of one write tx per channel.
func populateFwdPkgs(b *testing.B, db kvdb.Backend,
scenario benchFwdPkgScenario) []lnwire.ShortChannelID {

b.Helper()

sources := make([]lnwire.ShortChannelID, 0, scenario.numChannels)
for i := 0; i < scenario.numChannels; i++ {
// Skip 0 to avoid collisions with hop.Source-style sentinels.
shortChanID := lnwire.NewShortChanIDFromInt(uint64(i + 1))
sources = append(sources, shortChanID)
}

err := kvdb.Update(db, func(tx kvdb.RwTx) error {
for _, shortChanID := range sources {
packager := channeldb.NewChannelPackager(shortChanID)
for h := 0; h < scenario.pkgsPerChannel; h++ {
fwdPkg := channeldb.NewFwdPkg(
shortChanID, uint64(h),
testAdds(), testSettleFails(),
)
if err := packager.AddFwdPkg(
tx, fwdPkg,
); err != nil {
return err
}
}
}

return nil
}, func() {})
require.NoError(b, err)

return sources
}

// benchScenarios is the matrix exercised by the load benchmarks. The 1k+ case
// matches the motivating production scenario.
var benchScenarios = []benchFwdPkgScenario{
{numChannels: 100, pkgsPerChannel: 1},
{numChannels: 500, pkgsPerChannel: 1},
{numChannels: 1000, pkgsPerChannel: 1},
{numChannels: 2000, pkgsPerChannel: 1},
{numChannels: 1000, pkgsPerChannel: 4},
}

// BenchmarkLoadChannelFwdPkgsPerTx measures the prior strategy of opening one
// read transaction per channel to load that channel's forwarding packages.
func BenchmarkLoadChannelFwdPkgsPerTx(b *testing.B) {
for _, scenario := range benchScenarios {
scenario := scenario
name := benchName(scenario)
b.Run(name, func(b *testing.B) {
db := makeFwdPkgBenchDB(b)
sources := populateFwdPkgs(b, db, scenario)

pkgr := channeldb.NewSwitchPackager()

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
totalPkgs := 0
for _, source := range sources {
var fwdPkgs []*channeldb.FwdPkg
err := kvdb.View(db,
func(tx kvdb.RTx) error {
var err error
fwdPkgs, err = pkgr.LoadChannelFwdPkgs( //nolint:lll

Check failure on line 91 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 108 characters long, which exceeds the maximum of 80 characters. (ll)
tx, source,
)
return err

Check failure on line 94 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

return with no blank line before (nlreturn)
}, func() {
fwdPkgs = nil
},
)
if err != nil {
b.Fatal(err)
}
totalPkgs += len(fwdPkgs)
}

if totalPkgs == 0 {
b.Fatal("expected fwd pkgs to be loaded")

Check failure on line 106 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 81 characters long, which exceeds the maximum of 80 characters. (ll)
}
}
})
}
}

// BenchmarkLoadChannelFwdPkgsSet measures the batched query which loads
// forwarding packages for the entire set of channels in a single read
// transaction.
func BenchmarkLoadChannelFwdPkgsSet(b *testing.B) {
for _, scenario := range benchScenarios {
scenario := scenario
name := benchName(scenario)
b.Run(name, func(b *testing.B) {
db := makeFwdPkgBenchDB(b)
sources := populateFwdPkgs(b, db, scenario)

pkgr := channeldb.NewSwitchPackager()

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
var fwdPkgs []*channeldb.FwdPkg
err := kvdb.View(db, func(tx kvdb.RTx) error {
var err error
fwdPkgs, err = pkgr.LoadChannelFwdPkgsSet( //nolint:lll

Check failure on line 133 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 95 characters long, which exceeds the maximum of 80 characters. (ll)
tx, sources,
)
return err

Check failure on line 136 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

return with no blank line before (nlreturn)
}, func() {
fwdPkgs = nil
})
if err != nil {
b.Fatal(err)
}

if len(fwdPkgs) == 0 {
b.Fatal("expected fwd pkgs to be loaded")

Check failure on line 145 in channeldb/forwarding_package_bench_test.go

View workflow job for this annotation

GitHub Actions / Lint code

the line is 81 characters long, which exceeds the maximum of 80 characters. (ll)
}
}
})
}
}

// benchName produces a deterministic, parseable sub-benchmark name. This shape
// is important so that benchstat can pair the per-tx and batched variants when
// computing deltas.
func benchName(s benchFwdPkgScenario) string {
return "channels=" + itoa(s.numChannels) +
"/pkgs=" + itoa(s.pkgsPerChannel)
}

// itoa is a tiny strconv.Itoa stand-in to avoid pulling strconv into the test
// file solely for benchmark naming.
func itoa(n int) string {
if n == 0 {
return "0"
}

var buf [20]byte
i := len(buf)
for n > 0 {
i--
buf[i] = byte('0' + n%10)
n /= 10
}

return string(buf[i:])
}
Comment on lines +162 to +176
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Reimplementing itoa is unnecessary as the standard library provides strconv.Itoa. Using the standard library is more idiomatic, robust (e.g., handling negative numbers), and easier to maintain. Since this is a test file, the overhead of an additional import is negligible and preferred over custom implementations of basic utilities.


// makeFwdPkgBenchDB constructs a fresh on-disk fwd pkg database for the
// benchmark to operate on. We use the standard bolt backend here since this is
// the same backend used by lnd in practice for fwd pkg storage.
func makeFwdPkgBenchDB(b *testing.B) kvdb.Backend {
b.Helper()

path := b.TempDir() + "/fwdpkg.db"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use filepath.Join instead of manual string concatenation for file paths. This ensures the code is cross-platform compatible and correctly handles path separators across different operating systems.

Suggested change
path := b.TempDir() + "/fwdpkg.db"
path := filepath.Join(b.TempDir(), "fwdpkg.db")

bdb, err := kvdb.Create(
kvdb.BoltBackendName, path, true, kvdb.DefaultDBTimeout, false,
)
require.NoError(b, err, "unable to open boltdb")

b.Cleanup(func() {
_ = bdb.Close()
})

return bdb
}
91 changes: 91 additions & 0 deletions channeldb/forwarding_package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,97 @@
return fwdPkgs
}

// TestSwitchPackagerLoadChannelFwdPkgsSet verifies that the batched fwd pkg
// load returns the same fwd pkgs as the per-channel call, that channels with
// no fwd pkgs are silently skipped, and that pkgs from distinct channels are
// not mixed up.
func TestSwitchPackagerLoadChannelFwdPkgsSet(t *testing.T) {
t.Parallel()

db := makeFwdPkgDB(t, "")

// Channel 1: two pkgs (heights 0 and 1).
ch1 := lnwire.NewShortChanIDFromInt(1)
ch1Packager := channeldb.NewChannelPackager(ch1)

// Channel 2: one pkg (height 7), distinct height to test multi-height
// ordering.
ch2 := lnwire.NewShortChanIDFromInt(2)
ch2Packager := channeldb.NewChannelPackager(ch2)

// Channel 3: deliberately has no fwd pkgs on disk. It must be silently
// dropped from the result, not surface as an error.
ch3 := lnwire.NewShortChanIDFromInt(3)

err := kvdb.Update(db, func(tx kvdb.RwTx) error {
ch1Pkg0 := channeldb.NewFwdPkg(
ch1, 0, testAdds(), testSettleFails(),
)
if err := ch1Packager.AddFwdPkg(tx, ch1Pkg0); err != nil {
return err
}

ch1Pkg1 := channeldb.NewFwdPkg(
ch1, 1, testAdds(), testSettleFails(),
)
if err := ch1Packager.AddFwdPkg(tx, ch1Pkg1); err != nil {
return err
}

ch2Pkg := channeldb.NewFwdPkg(
ch2, 7, testAdds(), testSettleFails(),
)
return ch2Packager.AddFwdPkg(tx, ch2Pkg)

Check failure on line 894 in channeldb/forwarding_package_test.go

View workflow job for this annotation

GitHub Actions / Lint code

return with no blank line before (nlreturn)
}, func() {})
require.NoError(t, err)

// Validate against the per-channel call. Construct the expected
// concatenation in the same order LoadChannelFwdPkgsSet processes its
// inputs.
switchPkgr := channeldb.NewSwitchPackager()
var perChannel []*channeldb.FwdPkg
err = kvdb.View(db, func(tx kvdb.RTx) error {
for _, source := range []lnwire.ShortChannelID{ch1, ch3, ch2} {
pkgs, err := switchPkgr.LoadChannelFwdPkgs(tx, source)
if err != nil {
return err
}
perChannel = append(perChannel, pkgs...)
}
return nil
}, func() { perChannel = nil })
require.NoError(t, err)

var batched []*channeldb.FwdPkg
err = kvdb.View(db, func(tx kvdb.RTx) error {
var err error
batched, err = switchPkgr.LoadChannelFwdPkgsSet(
tx, []lnwire.ShortChannelID{ch1, ch3, ch2},
)
return err
}, func() { batched = nil })
require.NoError(t, err)

require.Len(t, batched, 3, "expected 3 fwd pkgs across the channel set")
require.Equal(t, perChannel, batched,
"batched load must match per-channel concatenation")

// Sources should track which channel each pkg belongs to.
require.Equal(t, ch1, batched[0].Source)
require.Equal(t, ch1, batched[1].Source)
require.Equal(t, ch2, batched[2].Source)

// Empty input should yield no work and no error.
var empty []*channeldb.FwdPkg
err = kvdb.View(db, func(tx kvdb.RTx) error {
var err error
empty, err = switchPkgr.LoadChannelFwdPkgsSet(tx, nil)
return err
}, func() { empty = nil })
require.NoError(t, err)
require.Empty(t, empty)
}

// makeFwdPkgDB initializes a test database for forwarding packages. If the
// provided path is an empty, it will create a temp dir/file to use.
func makeFwdPkgDB(t *testing.T, path string) kvdb.Backend { // nolint:unparam
Expand Down
Loading
Loading