Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/release-notes/release-notes-0.22.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
[clarifies](https://github.com/lightningnetwork/lnd/issues/10568) the ZMQ
port-mismatch warnings so they no longer suggest that the connection failed.

* The sweeper [now leases wallet UTXOs](https://github.com/lightningnetwork/lnd/pull/10816)
it claims for fee-paying inputs, preventing concurrent sweeps within a
single node from contending for the same UTXO. It also no longer
ratchets the starting fee rate on failures that are not fee-related
(e.g. when there are no UTXOs available to cover the fees), so inputs
whose intrinsic budget cannot accommodate a higher rate are not
stranded. This resolves [#7397](https://github.com/lightningnetwork/lnd/issues/7397)
and addresses item 3 of [#8680](https://github.com/lightningnetwork/lnd/issues/8680).

# New Features

## Functional Enhancements
Expand Down Expand Up @@ -71,3 +80,4 @@

* Boris Nagaev
* Erick Cestari
* Jared Tobin
72 changes: 44 additions & 28 deletions sweep/fee_bumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,20 +1114,13 @@ func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {

// When the error is due to budget being used up, we'll send a TxFailed
// so these inputs can be retried with a different group in the next
// block.
// block. The error is fee-related (the linear fee function has run
// past its budget), so we still calculate a retry fee rate.
case errors.Is(err, ErrMaxPosition):
fallthrough

// If the tx doesn't not have enough budget, or if the inputs amounts
// are not sufficient to cover the budget, we will return a TxFailed
// event so the sweeper can handle it by re-clustering the utxos.
case errors.Is(err, ErrNotEnoughInputs),
errors.Is(err, ErrNotEnoughBudget):

result.Event = TxFailed

// Calculate the starting fee rate to be used when retry
// sweeping these inputs.
// Calculate the starting fee rate to be used when retrying
// to sweep these inputs.
feeRate, err := t.calculateRetryFeeRate(r)
if err != nil {
result.Event = TxFatal
Expand All @@ -1137,6 +1130,21 @@ func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
// Attach the new fee rate.
result.FeeRate = feeRate

// If the tx doesn't have enough budget, or if there aren't enough
// wallet inputs available to cover fees, we return a TxFailed event
// so the sweeper can re-cluster and retry on the next block. We
// intentionally do NOT call calculateRetryFeeRate here: these
// failures have no fee-rate dimension to them (an absent wallet
// UTXO is a resource problem, not a fee problem), and ratcheting
// the starting fee rate upward on each retry can permanently
// strand an input whose intrinsic budget cannot cover the higher
// starting rate. Leaving FeeRate at zero signals the sweeper to
// preserve the input's existing starting fee rate.
case errors.Is(err, ErrNotEnoughInputs),
errors.Is(err, ErrNotEnoughBudget):

result.Event = TxFailed

// When there are missing inputs, we'll create a TxUnknownSpend bump
// result here so the rest of the inputs can be retried.
case errors.Is(err, ErrInputMissing):
Expand Down Expand Up @@ -1857,19 +1865,38 @@ func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
return fn.Some(*bumpResult)
}

// Return a failed event to retry the sweep.
event := TxFailed
// If the tx doesn't have enough budget, or if there aren't enough
// wallet inputs available to cover fees, we return a TxFailed event
// so the sweeper can re-cluster and retry on the next block. We
// intentionally do NOT call calculateRetryFeeRate for these failure
// modes: they have no fee-rate dimension to them (an absent wallet
// UTXO is a resource problem, not a fee problem). Leaving FeeRate
// at zero signals the sweeper to preserve the input's existing
// starting fee rate; otherwise repeated resource failures would
// ratchet the rate past the input's intrinsic budget and strand it.
if errors.Is(err, ErrNotEnoughBudget) ||
errors.Is(err, ErrNotEnoughInputs) {

// Calculate the next fee rate for the retry.
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: r.requestID,
})
}

// For all other fee-bump failures, treat them as fee-related: ask
// the fee function for the next rate and carry it on the result so
// the sweeper can use it as the starting rate on retry.
event := TxFailed
feeRate, ferr := t.calculateRetryFeeRate(r)
if ferr != nil {
// If there's an error with the fee calculation, we need to
// abort the sweep.
event = TxFatal
}
Comment on lines 1894 to 1898
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with handleInitialTxError (lines 1124-1128), when calculateRetryFeeRate returns an error, the err variable should be updated to the calculation error. This ensures that the BumpResult and the subsequent error log (line 1909) reflect the reason why the sweep was aborted with a TxFatal event.

Suggested change
if ferr != nil {
// If there's an error with the fee calculation, we need to
// abort the sweep.
event = TxFatal
}
if ferr != nil {
// If there's an error with the fee calculation, we need to
// abort the sweep.
event = TxFatal
err = ferr
}


// If the error is not fee related, we will return a `TxFailed` event so
// this input can be retried.
result := fn.Some(BumpResult{
Event: event,
Tx: oldTx,
Expand All @@ -1878,18 +1905,7 @@ func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
FeeRate: feeRate,
})

// If the tx doesn't not have enough budget, or if the inputs amounts
// are not sufficient to cover the budget, we will return a result so
// the sweeper can handle it by re-clustering the utxos.
if errors.Is(err, ErrNotEnoughBudget) ||
errors.Is(err, ErrNotEnoughInputs) {

log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return result
}

// Otherwise, an unexpected error occurred, we will log an error and let
// the sweeper retry the whole process.
// Log an error and let the sweeper retry the whole process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)

return result
Expand Down
76 changes: 75 additions & 1 deletion sweep/fee_bumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,14 @@ func TestCreateAndPublishFail(t *testing.T) {
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
m.feeFunc.On("Increment").Return(true, nil).Once()

// Note: we deliberately do not expect Increment() to be called for
// ErrNotEnoughBudget. Ratcheting the starting fee rate on a non-fee
// failure mode would only strand inputs whose intrinsic budget can't
// accommodate a higher rate; the patch in createAndPublishTx (and
// the matching path in handleInitialTxError) skips the fee-function
// increment for ErrNotEnoughBudget and ErrNotEnoughInputs precisely
// for that reason.

// Create a testing monitor record.
req := createTestBumpRequest()
Expand Down Expand Up @@ -1168,6 +1175,10 @@ func TestCreateAndPublishFail(t *testing.T) {
require.ErrorIs(t, result.Err, ErrNotEnoughBudget)
require.Equal(t, requestID, result.requestID)

// The result must NOT carry a fee rate; ErrNotEnoughBudget is not a
// fee-related failure and ratcheting would be wrong.
require.Zero(t, result.FeeRate)

// Increase the budget and call it again. This time we will mock an
// error to be returned from CheckMempoolAcceptance.
req.Budget = 1000
Expand Down Expand Up @@ -1988,6 +1999,69 @@ func TestHandleInitialBroadcastFail(t *testing.T) {
require.Equal(t, 0, tp.subscriberChans.Len())
}

// TestHandleInitialTxErrorNoRatchetOnResourceFailure asserts that
// handleInitialTxError does not call the fee function's Increment method
// when the failure is ErrNotEnoughInputs or ErrNotEnoughBudget. Those
// failure modes carry no fee-rate signal; ratcheting the rate on each
// retry would only strand inputs whose intrinsic budget cannot accommodate
// the increased starting rate.
func TestHandleInitialTxErrorNoRatchetOnResourceFailure(t *testing.T) {
t.Parallel()

cases := []struct {
name string
err error
}{
{"ErrNotEnoughInputs", ErrNotEnoughInputs},
{"ErrNotEnoughBudget", ErrNotEnoughBudget},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

tp, _ := createTestPublisher(t)

// We intentionally do not stub Increment() on the
// mock fee function: if the code under test calls
// it on a resource failure, testify will panic and
// the test will fail loudly.

inp := createTestInput(1000, input.WitnessKeyHash)
req := &BumpRequest{
DeliveryAddress: changePkScript,
Inputs: []input.Input{&inp},
Budget: btcutil.Amount(1000),
MaxFeeRate: chainfee.SatPerKWeight(10000),
DeadlineHeight: 10,
}

rec := &monitorRecord{
requestID: 1,
req: req,
}

// Subscribe so we can observe the bump result.
subChan := make(chan *BumpResult, 1)
tp.subscriberChans.Store(uint64(1), subChan)

tp.handleInitialTxError(rec, tc.err)

select {
case result := <-subChan:
require.Equal(t, TxFailed, result.Event)
require.ErrorIs(t, result.Err, tc.err)
require.Zero(t, result.FeeRate,
"FeeRate must be zero for "+
"resource failures")
case <-time.After(time.Second):
t.Fatal("timeout waiting for result")
}
})
}
}

// TestHasInputsSpent checks the expected outpoint:tx map is returned.
func TestHasInputsSpent(t *testing.T) {
t.Parallel()
Expand Down
15 changes: 15 additions & 0 deletions sweep/interface.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package sweep

import (
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
Expand Down Expand Up @@ -60,6 +63,18 @@ type Wallet interface {
// which could be e.g. btcd, bitcoind, neutrino, or another consensus
// service.
BackEnd() string

// LeaseOutput leases a wallet output for the given lock ID and
// duration, preventing it from being returned by subsequent coin
// selection calls (including ListUnspentWitnessFromDefaultAccount).
// The sweeper uses this so that wallet UTXOs it selects as fee inputs
// for one InputSet are not also selected for a sibling InputSet
// processed under the same coin-select lock cycle. The lease's expiry
// reclaims the UTXO if a sweep is abandoned, so no companion release
// API is needed here -- add one if and when an explicit-release
// caller emerges.
LeaseOutput(id wtxmgr.LockID, op wire.OutPoint,
duration time.Duration) (time.Time, error)
}

// SweepOutput is an output used to sweep funds from a channel output.
Expand Down
12 changes: 12 additions & 0 deletions sweep/mock_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package sweep

import (
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
Expand Down Expand Up @@ -204,6 +207,15 @@ func (m *MockWallet) GetTransactionDetails(txHash *chainhash.Hash) (
return args.Get(0).(*lnwallet.TransactionDetail), args.Error(1)
}

// LeaseOutput leases a wallet output for the given lock ID and duration.
func (m *MockWallet) LeaseOutput(id wtxmgr.LockID, op wire.OutPoint,
duration time.Duration) (time.Time, error) {

args := m.Called(id, op, duration)

return args.Get(0).(time.Time), args.Error(1)
}

// MockInputSet is a mock implementation of the InputSet interface.
type MockInputSet struct {
mock.Mock
Expand Down
14 changes: 14 additions & 0 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,20 @@ func (s *UtxoSweeper) markInputsPublishFailed(set InputSet,
// Update the input's state.
pi.state = PublishFailed

// Only ratchet the starting fee rate when the BumpResult
// carries an actual rate. Failures with no fee dimension to
// them (e.g. ErrNotEnoughInputs from the wallet) leave
// FeeRate at zero; preserving the input's existing starting
// rate in that case avoids stranding inputs whose intrinsic
// budget can't accommodate a higher rate.
if feeRate == 0 {
log.Debugf("Input(%v): preserving starting fee rate "+
"%v across non-fee failure", op,
pi.params.StartingFeeRate)

continue
}

log.Debugf("Input(%v): updating params: starting fee rate "+
"[%v -> %v]", op, pi.params.StartingFeeRate,
feeRate)
Expand Down
40 changes: 40 additions & 0 deletions sweep/sweeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,46 @@ func TestMarkInputsPublishFailed(t *testing.T) {
mockStore.AssertExpectations(t)
}

// TestMarkInputsPublishFailedPreservesRateOnZero asserts that when
// markInputsPublishFailed is invoked with feeRate=0, the input's existing
// StartingFeeRate is preserved instead of being overwritten. This is the
// sweeper-side contract that resource-failure callers (handleInitialTxError
// and handleReplacementTxError for ErrNotEnoughInputs / ErrNotEnoughBudget)
// rely on: those failures carry no fee-rate signal, so ratcheting the
// starting rate upward on each retry would be incorrect.
func TestMarkInputsPublishFailedPreservesRateOnZero(t *testing.T) {
t.Parallel()

mockStore := NewMockSweeperStore()
s := New(&UtxoSweeperConfig{Store: mockStore})

// Stage an input in PendingPublish with an existing starting fee
// rate. The rate we set here must not be touched by the call below.
existingRate := chainfee.SatPerKWeight(2500)
inp := createMockInput(t, s, PendingPublish)
s.inputs[inp.OutPoint()].params.StartingFeeRate =
fn.Some(existingRate)

set := &MockInputSet{}
defer set.AssertExpectations(t)
set.On("Inputs").Return([]input.Input{inp})

// Mark the input as publish-failed with feeRate=0, signalling a
// non-fee failure (e.g. ErrNotEnoughInputs).
s.markInputsPublishFailed(set, 0)

// The input must still be in PublishFailed (state transition is
// independent of the rate signal) but its StartingFeeRate must
// remain the value we set above, not Some(0).
pi := s.inputs[inp.OutPoint()]
require.Equal(t, PublishFailed, pi.state)
require.True(t, pi.params.StartingFeeRate.IsSome())
require.Equal(t, existingRate,
pi.params.StartingFeeRate.UnsafeFromSome())

mockStore.AssertExpectations(t)
}

// TestMarkInputsSwept checks that given a list of inputs with different
// states, only the non-terminal state will be marked as `Swept`.
func TestMarkInputsSwept(t *testing.T) {
Expand Down
Loading
Loading