From af48103293819c3541d9fc50d66e3d0fcf17f357 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:04:21 +0800 Subject: [PATCH 1/6] sweep: probe rejected input sets without broadcast Add internal helpers that build subset sweep transactions and test them with `testmempoolaccept` only. Classify probe errors conservatively so construction, fee, and timelock failures do not identify bad inputs. --- sweep/fee_bumper.go | 215 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 211 insertions(+), 4 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index e0d5d751616..f70351eddeb 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -47,6 +47,11 @@ var ( // ErrInputMissing is returned when a given input no longer exists, // e.g., spending from an orphan tx. ErrInputMissing = errors.New("input no longer exists") + + // errMempoolRejected marks errors that came from mempool acceptance + // checks. It is used internally to avoid probing unrelated construction + // or signing errors. + errMempoolRejected = errors.New("mempool rejected tx") ) var ( @@ -600,10 +605,6 @@ func (t *TxPublisher) createRBFCompliantTx( } } - // TODO(yy): suppose there's only one bad input, we can do a - // binary search to find out which input is causing this error - // by recreating a tx using half of the inputs and check its - // mempool acceptance. default: log.Debugf("Failed to create RBF-compliant tx: %v", err) return nil, err @@ -677,6 +678,212 @@ func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { sweepCtx.tx.TxHash(), err) } +// shouldDiagnoseBadInputs returns true if a mempool rejection should be +// isolated with no-broadcast subset probes. +func (t *TxPublisher) shouldDiagnoseBadInputs(r *monitorRecord, + err error) bool { + + if !errors.Is(err, errMempoolRejected) { + return false + } + + if len(r.req.Inputs) <= 1 { + return false + } + + if t.cfg.AuxSweeper.IsSome() { + log.Debugf( + "Skipping bad-input diagnosis for requestID=%v: aux "+ + "sweeper is active", r.requestID, + ) + + return false + } + + return true +} + +// badInputProbeLimit returns the maximum number of no-broadcast probe txns used +// to diagnose a failed batch. The limit is large enough to descend to all +// singletons in normal batches while still bounding pathological retries. +var badInputProbeLimit = defaultBadInputProbeLimit + +// defaultBadInputProbeLimit returns the default probe cap for a batch. +func defaultBadInputProbeLimit(numInputs int) int { + if numInputs <= 1 { + return 0 + } + + return 2 * numInputs +} + +// probeInputSet builds and mempool-tests a sweep transaction for the given +// inputs without publishing, storing, or monitoring it. +func (t *TxPublisher) probeInputSet(r *monitorRecord, + inputs []input.Input) error { + + sweepCtx, err := t.createSweepTx( + inputs, r.req.DeliveryAddress, r.feeFunction.FeeRate(), + ) + if err != nil { + return fmt.Errorf("create probe sweep tx: %w", err) + } + + if sweepCtx.fee > r.req.Budget { + return fmt.Errorf("%w: budget=%v, fee=%v", + ErrNotEnoughBudget, r.req.Budget, sweepCtx.fee) + } + + err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx) + if err == nil { + return nil + } + + if errors.Is(err, chain.ErrMissingInputs) { + log.Debugf("Probe tx %v missing inputs", sweepCtx.tx.TxHash()) + + return ErrInputMissing + } + + return fmt.Errorf("%w: probe tx=%v failed mempool check: %w", + errMempoolRejected, sweepCtx.tx.TxHash(), err) +} + +// probeFailureCanIdentifyBadInput returns true when a probe failure is evidence +// that the subset should be split further. Fee, missing-input, and construction +// failures are deliberately treated outside this category. +func probeFailureCanIdentifyBadInput(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, errMempoolRejected) { + return true + } + + return false +} + +// probeFailureIndeterminate returns true when a probe failure does not prove +// that a subset contains a bad input. +func probeFailureIndeterminate(err error) bool { + switch { + case errors.Is(err, lnwallet.ErrMempoolFee), + errors.Is(err, chain.ErrInsufficientFee), + errors.Is(err, chain.ErrMinRelayFeeNotMet), + errors.Is(err, chain.ErrMempoolMinFeeNotMet), + errors.Is(err, rpcclient.ErrBackendVersion), + errors.Is(err, chain.ErrUnimplemented), + errors.Is(err, ErrTxNoOutput), + errors.Is(err, ErrNotEnoughBudget), + errors.Is(err, ErrNotEnoughInputs), + errors.Is(err, ErrLocktimeImmature), + errors.Is(err, ErrLocktimeConflict): + + return true + } + + return false +} + +// findBadInputs probes subsets of a failed input batch to find inputs that fail +// by themselves. The returned bool is true only when probing completed without +// indeterminate errors or hitting the probe cap. +func (t *TxPublisher) findBadInputs( + r *monitorRecord) ([]wire.OutPoint, bool, error) { + + var ( + inputs = r.req.Inputs + limit = badInputProbeLimit(len(inputs)) + probeCnt int + complete = true + badInputs []wire.OutPoint + ) + + var search func([]input.Input) error + search = func(inputs []input.Input) error { + if len(inputs) == 0 || !complete { + return nil + } + + if probeCnt >= limit { + log.Warnf( + "Stop bad-input diagnosis: requestID=%v, "+ + "probe cap %v hit", + r.requestID, limit, + ) + + complete = false + + return nil + } + + probeCnt++ + err := t.probeInputSet(r, inputs) + switch { + case err == nil: + return nil + + case errors.Is(err, ErrInputMissing): + return ErrInputMissing + + case probeFailureIndeterminate(err): + log.Warnf( + "Stop bad-input diagnosis: requestID=%v, "+ + "probe for %v inputs indeterminate: %v", + r.requestID, len(inputs), err, + ) + + complete = false + + return nil + + case !probeFailureCanIdentifyBadInput(err): + log.Warnf( + "Stop bad-input diagnosis: requestID=%v, "+ + "unexpected probe error: %v", + r.requestID, + err, + ) + + complete = false + + return nil + } + + if len(inputs) == 1 { + badInputs = append(badInputs, inputs[0].OutPoint()) + return nil + } + + mid := len(inputs) / 2 + if err := search(inputs[:mid]); err != nil { + return err + } + + return search(inputs[mid:]) + } + + mid := len(inputs) / 2 + if err := search(inputs[:mid]); err != nil { + return nil, false, err + } + + if err := search(inputs[mid:]); err != nil { + return nil, false, err + } + + if !complete { + return nil, false, nil + } + + if len(badInputs) == 0 { + badInputs = []wire.OutPoint{} + } + + return badInputs, true, nil +} + // handleMissingInputs handles the case when the chain backend reports back a // missing inputs error, which could happen when one of the input has been spent // in another tx, or the input is referencing an orphan. When the input is From de9a0755fe6c846367b6e8eef1bb57181a569dd9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:04:21 +0800 Subject: [PATCH 2/6] sweep: route mempool failures through probes Route multi-input mempool rejections through the probe helpers and report completed diagnoses with `BumpResult.BadInputs`. Singleton, aux, missing-input, and indeterminate cases keep their existing behavior. --- sweep/fee_bumper.go | 272 +++++++++++++++++++------------------------- 1 file changed, 119 insertions(+), 153 deletions(-) diff --git a/sweep/fee_bumper.go b/sweep/fee_bumper.go index f70351eddeb..c7f2478fbd6 100644 --- a/sweep/fee_bumper.go +++ b/sweep/fee_bumper.go @@ -52,6 +52,10 @@ var ( // checks. It is used internally to avoid probing unrelated construction // or signing errors. errMempoolRejected = errors.New("mempool rejected tx") + + // errBadInputNotFound is returned when bad-input diagnosis finishes + // without finding a singleton input that fails mempool acceptance. + errBadInputNotFound = errors.New("bad input not found") ) var ( @@ -286,6 +290,12 @@ type BumpResult struct { // current tx to be failed. SpentInputs map[wire.OutPoint]*wire.MsgTx + // BadInputs are inputs that failed a singleton mempool acceptance + // probe. The fee bumper only diagnoses one bad input per failed batch, + // so this slice contains at most one outpoint. A nil slice means no bad + // input was diagnosed. + BadInputs []wire.OutPoint + // requestID is the ID of the request that created this record. requestID uint64 } @@ -674,8 +684,8 @@ func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { return sweepCtx, ErrInputMissing } - return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w", - sweepCtx.tx.TxHash(), err) + return sweepCtx, fmt.Errorf("%w: tx=%v failed mempool check: %w", + errMempoolRejected, sweepCtx.tx.TxHash(), err) } // shouldDiagnoseBadInputs returns true if a mempool rejection should be @@ -683,16 +693,23 @@ func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) { func (t *TxPublisher) shouldDiagnoseBadInputs(r *monitorRecord, err error) bool { + // Only mempool rejections can be diagnosed with mempool probes. Other + // errors happen during construction or signing and keep their existing + // fatal handling. if !errors.Is(err, errMempoolRejected) { return false } + // A singleton rejection already identifies the only input in the batch, + // so there is nothing to isolate with subset probes. if len(r.req.Inputs) <= 1 { return false } + // Aux sweepers may derive addresses or other sweep details from the + // full input set, so probing subsets would bypass their custom logic. if t.cfg.AuxSweeper.IsSome() { - log.Debugf( + log.Infof( "Skipping bad-input diagnosis for requestID=%v: aux "+ "sweeper is active", r.requestID, ) @@ -703,20 +720,6 @@ func (t *TxPublisher) shouldDiagnoseBadInputs(r *monitorRecord, return true } -// badInputProbeLimit returns the maximum number of no-broadcast probe txns used -// to diagnose a failed batch. The limit is large enough to descend to all -// singletons in normal batches while still bounding pathological retries. -var badInputProbeLimit = defaultBadInputProbeLimit - -// defaultBadInputProbeLimit returns the default probe cap for a batch. -func defaultBadInputProbeLimit(numInputs int) int { - if numInputs <= 1 { - return 0 - } - - return 2 * numInputs -} - // probeInputSet builds and mempool-tests a sweep transaction for the given // inputs without publishing, storing, or monitoring it. func (t *TxPublisher) probeInputSet(r *monitorRecord, @@ -729,159 +732,91 @@ func (t *TxPublisher) probeInputSet(r *monitorRecord, return fmt.Errorf("create probe sweep tx: %w", err) } - if sweepCtx.fee > r.req.Budget { - return fmt.Errorf("%w: budget=%v, fee=%v", - ErrNotEnoughBudget, r.req.Budget, sweepCtx.fee) - } - err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx) if err == nil { return nil } + // Missing-input failures have a dedicated handler that can inspect the + // spend and retry the unspent inputs. Keep that flow distinct from the + // generic mempool rejection sentinel below. if errors.Is(err, chain.ErrMissingInputs) { log.Debugf("Probe tx %v missing inputs", sweepCtx.tx.TxHash()) return ErrInputMissing } - return fmt.Errorf("%w: probe tx=%v failed mempool check: %w", - errMempoolRejected, sweepCtx.tx.TxHash(), err) -} + log.Infof("Probe tx=%v with %v inputs failed mempool check: %v", + sweepCtx.tx.TxHash(), len(inputs), err) -// probeFailureCanIdentifyBadInput returns true when a probe failure is evidence -// that the subset should be split further. Fee, missing-input, and construction -// failures are deliberately treated outside this category. -func probeFailureCanIdentifyBadInput(err error) bool { - if err == nil { - return false - } - - if errors.Is(err, errMempoolRejected) { - return true - } - - return false + return errMempoolRejected } -// probeFailureIndeterminate returns true when a probe failure does not prove -// that a subset contains a bad input. -func probeFailureIndeterminate(err error) bool { - switch { - case errors.Is(err, lnwallet.ErrMempoolFee), - errors.Is(err, chain.ErrInsufficientFee), - errors.Is(err, chain.ErrMinRelayFeeNotMet), - errors.Is(err, chain.ErrMempoolMinFeeNotMet), - errors.Is(err, rpcclient.ErrBackendVersion), - errors.Is(err, chain.ErrUnimplemented), - errors.Is(err, ErrTxNoOutput), - errors.Is(err, ErrNotEnoughBudget), - errors.Is(err, ErrNotEnoughInputs), - errors.Is(err, ErrLocktimeImmature), - errors.Is(err, ErrLocktimeConflict): - - return true - } - - return false -} - -// findBadInputs probes subsets of a failed input batch to find inputs that fail -// by themselves. The returned bool is true only when probing completed without -// indeterminate errors or hitting the probe cap. -func (t *TxPublisher) findBadInputs( - r *monitorRecord) ([]wire.OutPoint, bool, error) { - - var ( - inputs = r.req.Inputs - limit = badInputProbeLimit(len(inputs)) - probeCnt int - complete = true - badInputs []wire.OutPoint - ) - - var search func([]input.Input) error - search = func(inputs []input.Input) error { - if len(inputs) == 0 || !complete { - return nil - } - - if probeCnt >= limit { - log.Warnf( - "Stop bad-input diagnosis: requestID=%v, "+ - "probe cap %v hit", - r.requestID, limit, - ) - - complete = false - - return nil +// findBadInput binary-searches a rejected input batch with no-broadcast +// mempool probes to find a single input that fails by itself. Each iteration +// probes the left half of the current set. If that half is rejected, the search +// narrows left; otherwise it searches the right half. The search stops as +// soon as a singleton mempool rejection is found. If the final singleton is +// accepted, then no individual bad input could be identified. +func (t *TxPublisher) findBadInput(r *monitorRecord) (wire.OutPoint, error) { + var search func([]input.Input) (wire.OutPoint, error) + search = func(inputs []input.Input) (wire.OutPoint, error) { + switch len(inputs) { + case 0: + return wire.OutPoint{}, errBadInputNotFound + + case 1: + err := t.probeInputSet(r, inputs) + switch { + case err == nil: + log.Warnf( + "Bad-input diagnosis for requestID=%v "+ + "found no singleton bad input", + r.requestID, + ) + + return wire.OutPoint{}, errBadInputNotFound + + case errors.Is(err, ErrInputMissing): + return wire.OutPoint{}, ErrInputMissing + + case errors.Is(err, errMempoolRejected): + return inputs[0].OutPoint(), nil + + default: + log.Warnf( + "Stop diagnosis: requestID=%v, "+ + "singleton probe err: %v", + r.requestID, err, + ) + + return wire.OutPoint{}, err + } } - probeCnt++ - err := t.probeInputSet(r, inputs) + mid := len(inputs) / 2 + left := inputs[:mid] + err := t.probeInputSet(r, left) switch { case err == nil: - return nil + return search(inputs[mid:]) case errors.Is(err, ErrInputMissing): - return ErrInputMissing - - case probeFailureIndeterminate(err): - log.Warnf( - "Stop bad-input diagnosis: requestID=%v, "+ - "probe for %v inputs indeterminate: %v", - r.requestID, len(inputs), err, - ) - - complete = false + return wire.OutPoint{}, ErrInputMissing - return nil - - case !probeFailureCanIdentifyBadInput(err): - log.Warnf( - "Stop bad-input diagnosis: requestID=%v, "+ - "unexpected probe error: %v", - r.requestID, - err, - ) - - complete = false - - return nil - } + case errors.Is(err, errMempoolRejected): + return search(left) - if len(inputs) == 1 { - badInputs = append(badInputs, inputs[0].OutPoint()) - return nil - } + default: + log.Warnf("Stopping bad-input diagnosis for "+ + "requestID=%v: probe for %v inputs failed: %v", + r.requestID, len(left), err) - mid := len(inputs) / 2 - if err := search(inputs[:mid]); err != nil { - return err + return wire.OutPoint{}, err } - - return search(inputs[mid:]) - } - - mid := len(inputs) / 2 - if err := search(inputs[:mid]); err != nil { - return nil, false, err } - if err := search(inputs[mid:]); err != nil { - return nil, false, err - } - - if !complete { - return nil, false, nil - } - - if len(badInputs) == 0 { - badInputs = []wire.OutPoint{} - } - - return badInputs, true, nil + return search(r.req.Inputs) } // handleMissingInputs handles the case when the chain backend reports back a @@ -921,7 +856,7 @@ func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult { // current sweeping tx has been failed due to missing inputs, the // spending tx must be a different tx, thus it should NOT be matched. We // perform a sanity check here to catch the unexpected state. - if !t.isUnknownSpent(r, spends) { + if r.tx != nil && !t.isUnknownSpent(r, spends) { log.Errorf("Sweeping tx %v has missing inputs, yet the "+ "spending tx is the sweeping tx itself: %v", r.tx.TxHash(), r.spentInputs) @@ -1297,6 +1232,41 @@ func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) { t.handleResult(result) } +// handleBadInputs handles a non-fee mempool rejection by trying to identify a +// single input that fails mempool acceptance by itself. +func (t *TxPublisher) handleBadInputs(r *monitorRecord, + err error) *BumpResult { + + result := &BumpResult{ + Err: err, + requestID: r.requestID, + } + + if !t.shouldDiagnoseBadInputs(r, err) { + result.Event = TxFatal + + return result + } + + badInput, probeErr := t.findBadInput(r) + if errors.Is(probeErr, ErrInputMissing) { + return t.handleMissingInputs(r) + } + + result.Event = TxFailed + if r.feeFunction != nil { + result.FeeRate = r.feeFunction.FeeRate() + } + + if probeErr != nil { + return result + } + + result.BadInputs = []wire.OutPoint{badInput} + + return result +} + // handleInitialTxError takes the error from `initializeTx` and decides the // bump event. It will construct a BumpResult and handles it. func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) { @@ -1349,15 +1319,11 @@ func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) { case errors.Is(err, ErrInputMissing): result = t.handleMissingInputs(r) - // Otherwise this is not a fee-related error and the tx cannot be - // retried. In that case we will fail ALL the inputs in this tx, which - // means they will be removed from the sweeper and never be tried - // again. - // - // TODO(yy): Find out which input is causing the failure and fail that - // one only. + // Otherwise this may be a non-fee mempool rejection. For multi-input + // batches, try to isolate singleton bad inputs before deciding whether + // the whole set is fatal. default: - result.Event = TxFatal + result = t.handleBadInputs(r, err) } t.handleResult(result) From 4f7722fff91c798fb804c90392a4ecffa6d15e0a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:04:21 +0800 Subject: [PATCH 3/6] sweep: apply bad input diagnosis Consume non-nil `BadInputs` on `TxFailed`: mark diagnosed inputs fatal and mark the complement publish-failed for normal retry. Empty diagnoses are retried as a whole and logged. --- sweep/sweeper.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index e2163f6373c..5a2abe5d0ca 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -971,6 +971,7 @@ func (s *UtxoSweeper) markInputsPublishFailed(set InputSet, // Reschedule sweep. for _, inp := range set.Inputs() { op := inp.OutPoint() + pi, ok := s.inputs[op] if !ok { // It could be that this input is an additional wallet @@ -1714,6 +1715,11 @@ func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) { err) } + if r.BadInputs != nil { + s.handleBumpEventBadInputs(resp) + return + } + // NOTE: When marking the inputs as failed, we are using the input set // instead of the inputs found in the tx. This is fine for current // version of the sweeper because we always create a tx using ALL of @@ -1723,6 +1729,46 @@ func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) { s.markInputsPublishFailed(resp.set, resp.result.FeeRate) } +// handleBumpEventBadInputs handles a failed tx after the fee bumper diagnosed +// singleton bad inputs using no-broadcast mempool probes. +func (s *UtxoSweeper) handleBumpEventBadInputs(resp *bumpResp) { + r := resp.result + inputs := resp.set.Inputs() + + log.Warnf("Fee bump attempt failed for requestID=%v after "+ + "bad-input diagnosis: %v; inputs:\n%v", r.requestID, r.Err, + inputTypeSummary(inputs)) + + if len(r.BadInputs) != 1 { + log.Errorf("Bad-input diagnosis for requestID=%v returned %v "+ + "bad inputs; retrying full set", r.requestID, + len(r.BadInputs)) + + s.markInputsPublishFailed(resp.set, r.FeeRate) + + return + } + + badInput := r.BadInputs[0] + s.markInputsPublishFailed(resp.set, r.FeeRate) + + pi, ok := s.inputs[badInput] + if !ok { + log.Tracef("Bad input %v not found; skip fatal", badInput) + + return + } + + if pi.terminated() { + log.Errorf("Skipped marking bad input=%v as fatal due to "+ + "unexpected state=%v", badInput, pi.state) + + return + } + + s.markInputFatal(pi, nil, r.Err) +} + // handleBumpEventTxReplaced handles the case where the sweeping tx has been // replaced by a new one. func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error { From a5f0276cb44b3c40fbae0ab0edd5cf849ee725a9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:04:21 +0800 Subject: [PATCH 4/6] sweep: test mempool rejection probing Cover publisher probing decisions and sweeper state application, including aux skip, probe missing-input abort, probe-cap abort, singleton fatal behavior, and complement retries. --- sweep/fee_bumper_test.go | 478 ++++++++++++++++++++++++++++++++++++++- sweep/sweeper_test.go | 85 +++++++ 2 files changed, 560 insertions(+), 3 deletions(-) diff --git a/sweep/fee_bumper_test.go b/sweep/fee_bumper_test.go index d697f906b2a..b82587a6ca4 100644 --- a/sweep/fee_bumper_test.go +++ b/sweep/fee_bumper_test.go @@ -389,8 +389,11 @@ type mockers struct { feeFunc *MockFeeFunction } -// createTestPublisher creates a new tx publisher using the provided mockers. -func createTestPublisher(t *testing.T) (*TxPublisher, *mockers) { +// createTestPublisherWithAux creates a new tx publisher using the provided +// mockers and aux sweeper option. +func createTestPublisherWithAux(t *testing.T, + auxSweeper fn.Option[AuxSweeper]) (*TxPublisher, *mockers) { + // Create a mock fee estimator. estimator := &chainfee.MockEstimator{} @@ -428,12 +431,47 @@ func createTestPublisher(t *testing.T) (*TxPublisher, *mockers) { Signer: m.signer, Wallet: m.wallet, Notifier: m.notifier, - AuxSweeper: fn.Some[AuxSweeper](&MockAuxSweeper{}), + AuxSweeper: auxSweeper, }) return tp, m } +// createTestPublisher creates a new tx publisher using the provided mockers. +func createTestPublisher(t *testing.T) (*TxPublisher, *mockers) { + return createTestPublisherWithAux( + t, fn.Some[AuxSweeper](&MockAuxSweeper{}), + ) +} + +// createTestPublisherNoAux creates a new tx publisher without an aux sweeper. +func createTestPublisherNoAux(t *testing.T) (*TxPublisher, *mockers) { + return createTestPublisherWithAux(t, fn.None[AuxSweeper]()) +} + +// txSpendsInputs returns a matcher that checks that a tx spends the exact input +// set, regardless of ordering. +func txSpendsInputs(inputs ...input.Input) func(*wire.MsgTx) bool { + want := make(map[wire.OutPoint]struct{}, len(inputs)) + for _, inp := range inputs { + want[inp.OutPoint()] = struct{}{} + } + + return func(tx *wire.MsgTx) bool { + if tx == nil || len(tx.TxIn) != len(want) { + return false + } + + for _, txIn := range tx.TxIn { + if _, ok := want[txIn.PreviousOutPoint]; !ok { + return false + } + } + + return true + } +} + // TestCreateAndCheckTx checks `createAndCheckTx` behaves as expected. func TestCreateAndCheckTx(t *testing.T) { t.Parallel() @@ -706,6 +744,440 @@ func TestCreateRBFCompliantTx(t *testing.T) { } } +// createBadInputTestRequest creates a bump request with the given number of +// inputs and enough budget for subset probes. +func createBadInputTestRequest(numInputs int) *BumpRequest { + inputs := make([]input.Input, 0, numInputs) + for i := 0; i < numInputs; i++ { + inp := createTestInput(10_000, input.WitnessKeyHash) + inputs = append(inputs, &inp) + } + + return &BumpRequest{ + DeliveryAddress: changePkScript, + Inputs: inputs, + Budget: btcutil.Amount(10_000), + } +} + +// createBadInputTestRecord creates a monitored record and subscriber used by +// initial-broadcast error tests. +func createBadInputTestRecord(tp *TxPublisher, req *BumpRequest, + feeFunc FeeFunction) (*monitorRecord, chan *BumpResult) { + + const requestID = uint64(1) + record := &monitorRecord{ + requestID: requestID, + req: req, + feeFunction: feeFunc, + } + + subscriber := make(chan *BumpResult, 1) + tp.subscriberChans.Store(requestID, subscriber) + tp.records.Store(requestID, record) + + return record, subscriber +} + +// TestShouldDiagnoseBadInputsMempoolError checks that multi-input mempool +// rejections are eligible for bad-input diagnosis. +func TestShouldDiagnoseBadInputsMempoolError(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(2) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + err := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + diagnose := tp.shouldDiagnoseBadInputs(record, err) + + // Assert. + require.True(t, diagnose) +} + +// TestShouldDiagnoseBadInputsNonMempoolError checks that non-mempool errors +// keep their existing fatal handling. +func TestShouldDiagnoseBadInputsNonMempoolError(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(2) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + // Act. + diagnose := tp.shouldDiagnoseBadInputs(record, errDummy) + + // Assert. + require.False(t, diagnose) +} + +// TestShouldDiagnoseBadInputsSingleton checks that singleton rejections are not +// probed because the rejected input is already identified. +func TestShouldDiagnoseBadInputsSingleton(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(1) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + err := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + diagnose := tp.shouldDiagnoseBadInputs(record, err) + + // Assert. + require.False(t, diagnose) +} + +// TestShouldDiagnoseBadInputsAuxSweeper checks that aux sweeps are not probed +// with subsets because the aux sweeper owns custom input-set logic. +func TestShouldDiagnoseBadInputsAuxSweeper(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisher(t) + req := createBadInputTestRequest(2) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + err := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + diagnose := tp.shouldDiagnoseBadInputs(record, err) + + // Assert. + require.False(t, diagnose) +} + +// TestProbeInputSetMempoolRejected checks that probe mempool failures are +// unified under the internal mempool rejection sentinel. +func TestProbeInputSetMempoolRejected(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(2) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + inputs := req.Inputs + m.feeFunc.On("FeeRate").Return(chainfee.SatPerKWeight(1000)) + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Times(2) + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs...)), + ).Return(errDummy).Once() + + // Act. + err := tp.probeInputSet(record, inputs) + + // Assert. + require.ErrorIs(t, err, errMempoolRejected) + m.wallet.AssertNotCalled(t, "PublishTransaction", mock.Anything, + mock.Anything) +} + +// TestFindBadInputIdentifiesSingleton checks that the binary search returns the +// first singleton input that fails a mempool probe. +func TestFindBadInputIdentifiesSingleton(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(4) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + m.feeFunc.On("FeeRate").Return(chainfee.SatPerKWeight(1000)) + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Times(4) + + inputs := req.Inputs + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0], inputs[1])), + ).Return( + errDummy, + ).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0])), + ).Return(nil).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[1])), + ).Return(errDummy).Once() + + // Act. + badInput, err := tp.findBadInput(record) + + // Assert. + require.NoError(t, err) + require.Equal(t, inputs[1].OutPoint(), badInput) + m.wallet.AssertNumberOfCalls(t, "CheckMempoolAcceptance", 3) +} + +// TestHandleBadInputsDiagnosesBadInput checks that a multi-input mempool script +// failure becomes TxFailed with the singleton input that fails its probe. +func TestHandleBadInputsDiagnosesBadInput(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(4) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + feeRate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feeRate) + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Times(4) + + inputs := req.Inputs + badInput := inputs[1] + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0], inputs[1])), + ).Return( + errDummy, + ).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0])), + ).Return(nil).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[1])), + ).Return(errDummy).Once() + + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + result := tp.handleBadInputs(record, initialErr) + + // Assert. + require.Equal(t, TxFailed, result.Event) + require.ErrorIs(t, result.Err, errDummy) + require.Equal(t, []wire.OutPoint{badInput.OutPoint()}, + result.BadInputs) + require.Equal(t, feeRate, result.FeeRate) + m.wallet.AssertNotCalled(t, "PublishTransaction", mock.Anything, + mock.Anything) +} + +// TestHandleBadInputsNoSingletonBadInput checks that diagnosis keeps BadInputs +// nil when no singleton input fails by itself. +func TestHandleBadInputsNoSingletonBadInput(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(4) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + feeRate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feeRate) + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Times(4) + + inputs := req.Inputs + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0], inputs[1])), + ).Return(nil).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[2])), + ).Return(nil).Once() + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[3])), + ).Return(nil).Once() + + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + result := tp.handleBadInputs(record, initialErr) + + // Assert. + require.Equal(t, TxFailed, result.Event) + require.ErrorIs(t, result.Err, errDummy) + require.Nil(t, result.BadInputs) + require.Equal(t, feeRate, result.FeeRate) +} + +// TestHandleBadInputsProbeConstructionError checks that non-mempool probe +// errors abort diagnosis and keep BadInputs nil. +func TestHandleBadInputsProbeConstructionError(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(4) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + feeRate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feeRate) + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(nil, errDummy).Once() + + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + result := tp.handleBadInputs(record, initialErr) + + // Assert. + require.Equal(t, TxFailed, result.Event) + require.ErrorIs(t, result.Err, errDummy) + require.Nil(t, result.BadInputs) + require.Equal(t, feeRate, result.FeeRate) + m.wallet.AssertNumberOfCalls(t, "CheckMempoolAcceptance", 0) +} + +// TestHandleBadInputsProbeMissingInputs checks that a missing-input error +// during probing aborts diagnosis and uses the existing TxUnknownSpend flow. +func TestHandleBadInputsProbeMissingInputs(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(4) + record := &monitorRecord{ + requestID: 1, + req: req, + feeFunction: m.feeFunc, + } + + feeRate := chainfee.SatPerKWeight(1000) + m.feeFunc.On("FeeRate").Return(feeRate) + m.feeFunc.On("Increment").Return(true, nil).Once() + m.signer.On("ComputeInputScript", mock.Anything, + mock.Anything).Return(&input.Script{}, nil).Times(2) + + inputs := req.Inputs + m.wallet.On( + "CheckMempoolAcceptance", + mock.MatchedBy(txSpendsInputs(inputs[0], inputs[1])), + ).Return( + chain.ErrMissingInputs, + ).Once() + + spendingTx := &wire.MsgTx{LockTime: 1} + for i, inp := range inputs { + op := inp.OutPoint() + spendEvent := &chainntnfs.SpendEvent{ + Spend: make(chan *chainntnfs.SpendDetail), + Cancel: func() {}, + } + if i == 0 { + spendEvent = createTestSpendEvent(spendingTx) + } + + m.notifier.On("RegisterSpendNtfn", &op, mock.Anything, + mock.Anything).Return(spendEvent, nil).Once() + } + + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + result := tp.handleBadInputs(record, initialErr) + + // Assert. + require.Equal(t, TxUnknownSpend, result.Event) + require.ErrorIs(t, result.Err, ErrUnknownSpent) + require.Nil(t, result.BadInputs) + require.Contains(t, result.SpentInputs, inputs[0].OutPoint()) + require.Equal(t, feeRate, result.FeeRate) +} + +// TestInitialMempoolFailureSingletonFatal checks that singleton non-fee mempool +// rejections keep the existing TxFatal behavior. +func TestInitialMempoolFailureSingletonFatal(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisherNoAux(t) + req := createBadInputTestRequest(1) + record, subscriber := createBadInputTestRecord(tp, req, m.feeFunc) + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + tp.handleInitialTxError(record, initialErr) + + // Assert. + select { + case result := <-subscriber: + require.Equal(t, TxFatal, result.Event) + require.Nil(t, result.BadInputs) + + case <-time.After(time.Second): + t.Fatal("timeout waiting for fatal result") + } +} + +// TestInitialMempoolFailureAuxSweeperSkipsDiagnosis checks that aux-enabled +// sweep transactions preserve the existing whole-set fatal behavior and do not +// run subset probes. +func TestInitialMempoolFailureAuxSweeperSkipsDiagnosis(t *testing.T) { + t.Parallel() + + // Arrange. + tp, m := createTestPublisher(t) + req := createBadInputTestRequest(4) + record, subscriber := createBadInputTestRecord(tp, req, m.feeFunc) + initialErr := fmt.Errorf("%w: %w", errMempoolRejected, errDummy) + + // Act. + tp.handleInitialTxError(record, initialErr) + + // Assert. + select { + case result := <-subscriber: + require.Equal(t, TxFatal, result.Event) + require.Nil(t, result.BadInputs) + + case <-time.After(time.Second): + t.Fatal("timeout waiting for fatal result") + } + + m.wallet.AssertNumberOfCalls(t, "CheckMempoolAcceptance", 0) +} + // TestTxPublisherBroadcast checks the internal `broadcast` method behaves as // expected. func TestTxPublisherBroadcast(t *testing.T) { diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index d97fd992504..1b496f6374b 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -760,6 +760,91 @@ func TestHandleBumpEventTxFailed(t *testing.T) { require.NotContains(t, s.inputs, opNotExist) } +// TestHandleBumpEventTxFailedBadInputs checks that diagnosed bad inputs are +// marked fatal while the rest of the set is retried. +func TestHandleBumpEventTxFailedBadInputs(t *testing.T) { + t.Parallel() + + // Arrange. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + s := New(&UtxoSweeperConfig{}) + + var ( + input1 = createMockInput(t, s, PendingPublish) + input2 = createMockInput(t, s, PendingPublish) + input3 = createMockInput(t, s, PendingPublish) + ) + + op1 := input1.OutPoint() + op2 := input2.OutPoint() + op3 := input3.OutPoint() + + set.On("Inputs").Return([]input.Input{input1, input2, input3}) + + feeRate := chainfee.SatPerKWeight(123) + br := &BumpResult{ + Event: TxFailed, + Err: errDummy, + FeeRate: feeRate, + BadInputs: []wire.OutPoint{op2}, + } + + // Act. + err := s.handleBumpEvent(&bumpResp{ + result: br, + set: set, + }) + + // Assert. + require.NoError(t, err) + require.Equal(t, PublishFailed, s.inputs[op1].state) + require.Equal(t, Fatal, s.inputs[op2].state) + require.Equal(t, PublishFailed, s.inputs[op3].state) + require.Equal(t, fn.Some(feeRate), s.inputs[op1].params.StartingFeeRate) + require.Equal(t, fn.Some(feeRate), s.inputs[op3].params.StartingFeeRate) +} + +// TestHandleBumpEventTxFailedNoBadInputs checks that malformed BadInputs +// diagnosis retries the whole set without marking anything fatal. +func TestHandleBumpEventTxFailedNoBadInputs(t *testing.T) { + t.Parallel() + + // Arrange. + set := &MockInputSet{} + defer set.AssertExpectations(t) + + s := New(&UtxoSweeperConfig{}) + + var ( + input1 = createMockInput(t, s, PendingPublish) + input2 = createMockInput(t, s, PendingPublish) + ) + + op1 := input1.OutPoint() + op2 := input2.OutPoint() + + set.On("Inputs").Return([]input.Input{input1, input2}) + + br := &BumpResult{ + Event: TxFailed, + Err: errDummy, + BadInputs: []wire.OutPoint{}, + } + + // Act. + err := s.handleBumpEvent(&bumpResp{ + result: br, + set: set, + }) + + // Assert. + require.NoError(t, err) + require.Equal(t, PublishFailed, s.inputs[op1].state) + require.Equal(t, PublishFailed, s.inputs[op2].state) +} + // TestHandleBumpEventTxReplaced checks that the sweeper correctly handles the // case where the bump event tx is replaced. func TestHandleBumpEventTxReplaced(t *testing.T) { From fca864f682e8aa91c9f2819705f8d78f17e80b55 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:07:47 +0800 Subject: [PATCH 5/6] sweep: document failure attribution roles Document that `TxPublisher` performs failure attribution while `UtxoSweeper` applies state changes. --- sweep/README.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sweep/README.md b/sweep/README.md index 5c39fa3e860..b4a6b165403 100644 --- a/sweep/README.md +++ b/sweep/README.md @@ -111,6 +111,23 @@ perform an RBF again in the coming blocks. for the first time, unless its budget has been used up, `TxPublisher` will guarantee that the initial publish meets the RBF requirements. +`TxPublisher` is also responsible for diagnosing failures that require access to +the exact transaction candidate and the chain backend's mempool acceptance +oracle. If `testmempoolaccept` reports missing inputs, `TxPublisher` uses spend +notifications to identify inputs spent by another transaction and reports them +as `TxUnknownSpend`. If a multi-input batch is rejected with an unattributed +non-fee mempool or script error, `TxPublisher` can build no-broadcast probe +transactions for subsets of the original inputs to identify any input that fails +by itself. Probe transactions are only tested for mempool acceptance; they are +not published, stored, or monitored. + +`UtxoSweeper` owns pending input state and applies these diagnoses. Missing or +unknown-spent inputs are removed through the existing `TxUnknownSpend` flow. For +diagnosed mempool/script failures, only singleton inputs reported as bad are +marked fatal; the remaining inputs in the batch are marked publish-failed so +they can be retried by normal clustering. If probing is skipped or inconclusive, +the batch remains retryable unless the failure is a singleton fatal error. + #### `FeeFunction` `FeeFunction` is an interface that specifies a function over a starting fee @@ -210,4 +227,3 @@ outputs minus the sum of their budgets. By default, 50% of this value is used as the budget, to customize it, either use `--sweeper.budget.anchorcpfp` to specify sats, or use `--sweeper.budget.anchorcpfpratio` to specify a ratio. - From 75b9fe105b3ec9ea6df44ca1da7cf947d75ba44c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 25 May 2026 21:07:58 +0800 Subject: [PATCH 6/6] docs: release-note sweeper input isolation Add the 0.21.1 release note for isolating singleton bad inputs from rejected sweeper batches. --- docs/release-notes/release-notes-0.21.1.md | 62 ++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 docs/release-notes/release-notes-0.21.1.md diff --git a/docs/release-notes/release-notes-0.21.1.md b/docs/release-notes/release-notes-0.21.1.md new file mode 100644 index 00000000000..fdf3ba14524 --- /dev/null +++ b/docs/release-notes/release-notes-0.21.1.md @@ -0,0 +1,62 @@ +# Release Notes +- [Bug Fixes](#bug-fixes) +- [New Features](#new-features) + - [Functional Enhancements](#functional-enhancements) + - [RPC Additions](#rpc-additions) + - [lncli Additions](#lncli-additions) +- [Improvements](#improvements) + - [Functional Updates](#functional-updates) + - [RPC Updates](#rpc-updates) + - [lncli Updates](#lncli-updates) + - [Breaking Changes](#breaking-changes) + - [Performance Improvements](#performance-improvements) + - [Deprecations](#deprecations) +- [Technical and Architectural Updates](#technical-and-architectural-updates) + - [BOLT Spec Updates](#bolt-spec-updates) + - [Testing](#testing) + - [Database](#database) + - [Code Health](#code-health) + - [Tooling and Documentation](#tooling-and-documentation) +- [Contributors (Alphabetical Order)](#contributors-alphabetical-order) + +# Bug Fixes + +* The sweeper now isolates singleton inputs from a rejected sweep batch when + `testmempoolaccept` returns an unattributed non-fee mempool or script error, + avoiding fatal failure of the entire batch when only one input is invalid. + +# New Features + +## Functional Enhancements + +## RPC Additions + +## lncli Additions + +# Improvements + +## Functional Updates + +## RPC Updates + +## lncli Updates + +## Breaking Changes + +## Performance Improvements + +## Deprecations + +# Technical and Architectural Updates + +## BOLT Spec Updates + +## Testing + +## Database + +## Code Health + +## Tooling and Documentation + +# Contributors (Alphabetical Order)