Skip to content

Commit

Permalink
sweep: decrease coin selection lock scope
Browse files Browse the repository at this point in the history
This commit changes how `WithCoinSelectLock` is used - previously the
lock is held when creating the input sets, now it's only be held after
the input sets have been created and explicitly signal they need wallet
inputs.
  • Loading branch information
yyforyongyu committed Jan 29, 2024
1 parent 22fad78 commit 92ead93
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 80 deletions.
53 changes: 11 additions & 42 deletions sweep/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"sort"

"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)

Expand Down Expand Up @@ -36,9 +34,8 @@ type inputCluster struct {
// the configured maximum number of inputs. Negative yield inputs are skipped.
// No input sets with a total value after fees below the dust limit are
// returned.
func (c *inputCluster) createInputSets(
wallet Wallet, maxFeeRate chainfee.SatPerKWeight,
maxInputs int) ([]InputSet, error) {
func (c *inputCluster) createInputSets(maxFeeRate chainfee.SatPerKWeight,
maxInputs int) []InputSet {

// Turn the inputs into a slice so we can sort them.
inputList := make([]*pendingInput, 0, len(c.inputs))
Expand Down Expand Up @@ -91,9 +88,7 @@ func (c *inputCluster) createInputSets(
// Start building a set of positive-yield tx inputs under the
// condition that the tx will be published with the specified
// fee rate.
txInputs := newTxInputSet(
wallet, c.sweepFeeRate, maxFeeRate, maxInputs,
)
txInputs := newTxInputSet(c.sweepFeeRate, maxFeeRate, maxInputs)

// From the set of sweepable inputs, keep adding inputs to the
// input set until the tx output value no longer goes up or the
Expand All @@ -103,27 +98,7 @@ func (c *inputCluster) createInputSets(
// If there are no positive yield inputs, we can stop here.
inputCount := len(txInputs.inputs)
if inputCount == 0 {
return sets, nil
}

// Check the current output value and add wallet utxos if
// needed to push the output value to the lower limit.
if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil {
return nil, err
}

// If the output value of this block of inputs does not reach
// the dust limit, stop sweeping. Because of the sorting,
// continuing with the remaining inputs will only lead to sets
// with an even lower output value.
if !txInputs.enoughInput() {
// The change output is always a p2tr here.
dl := lnwallet.DustLimitForSize(input.P2TRSize)
log.Debugf("Input set value %v (required=%v, "+
"change=%v) below dust limit of %v",
txInputs.totalOutput(), txInputs.requiredOutput,
txInputs.changeOutput, dl)
return sets, nil
return sets
}

log.Infof("Candidate sweep set of size=%v (+%v wallet inputs),"+
Expand All @@ -136,15 +111,16 @@ func (c *inputCluster) createInputSets(
inputList = inputList[inputCount:]
}

return sets, nil
return sets
}

// UtxoAggregator defines an interface that takes a list of inputs and
// aggregate them into groups. Each group is used as the inputs to create a
// sweeping transaction.
type UtxoAggregator interface {
// ClusterInputs takes a list of inputs and groups them into clusters.
ClusterInputs(Wallet, pendingInputs) []InputSet
// ClusterInputs takes a list of inputs and groups them into input
// sets. Each input set will be used to create a sweeping transaction.
ClusterInputs(pendingInputs) []InputSet
}

type SimpleAggregator struct {
Expand Down Expand Up @@ -194,9 +170,7 @@ func NewSimpleUtxoAggregator(estimator chainfee.Estimator,
// inputs known by the UtxoSweeper. It clusters inputs by
// 1) Required tx locktime
// 2) Similar fee rates.
func (s *SimpleAggregator) ClusterInputs(
wallet Wallet, inputs pendingInputs) []InputSet {

func (s *SimpleAggregator) ClusterInputs(inputs pendingInputs) []InputSet {
// We start by getting the inputs clusters by locktime. Since the
// inputs commit to the locktime, they can only be clustered together
// if the locktime is equal.
Expand All @@ -218,14 +192,9 @@ func (s *SimpleAggregator) ClusterInputs(
// Now that we have the clusters, we can create the input sets.
var inputSets []InputSet
for _, cluster := range clusters {
sets, err := cluster.createInputSets(
wallet, s.MaxFeeRate, s.MaxInputsPerTx,
sets := cluster.createInputSets(
s.MaxFeeRate, s.MaxInputsPerTx,
)
if err != nil {
log.Errorf("Unable to create input sets: %v", err)
continue
}

inputSets = append(inputSets, sets...)
}

Expand Down
6 changes: 2 additions & 4 deletions sweep/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ type mockUtxoAggregator struct {
var _ UtxoAggregator = (*mockUtxoAggregator)(nil)

// ClusterInputs takes a list of inputs and groups them into clusters.
func (m *mockUtxoAggregator) ClusterInputs(wallet Wallet,
inputs pendingInputs) []InputSet {

args := m.Called(wallet, inputs)
func (m *mockUtxoAggregator) ClusterInputs(inputs pendingInputs) []InputSet {
args := m.Called(inputs)

return args.Get(0).([]InputSet)
}
56 changes: 34 additions & 22 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,33 +1536,45 @@ func (s *UtxoSweeper) updateSweeperInputs() pendingInputs {
// sweepPendingInputs is called when the ticker fires. It will create clusters
// and attempt to create and publish the sweeping transactions.
func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
// Execute the sweep within a coin select lock. Otherwise the coins
// that we are going to spend may be selected for other transactions
// like funding of a channel.
//
// TODO(yy): decrease the lock scope - we need to remove the wallet
// used here, which means we need to ask the aggregator to return input
// sets and specifying whether wallet utoxs are needed or not. Then, by
// calling `TxInput.NeedWalletInput`, we can then lock and add the
// wallet input, creating a much smaller lock scope.
err := s.cfg.Wallet.WithCoinSelectLock(func() error {
// Cluster all of our inputs based on the specific Aggregator.
inputSets := s.cfg.Aggregator.ClusterInputs(
s.cfg.Wallet, inputs,
)
// Cluster all of our inputs based on the specific Aggregator.
sets := s.cfg.Aggregator.ClusterInputs(inputs)

// sweepWithLock is a helper closure that executes the sweep within a
// coin select lock to prevent the coins being selected for other
// transactions like funding of a channel.
sweepWithLock := func(set InputSet) {
err := s.cfg.Wallet.WithCoinSelectLock(func() error {
// Try to add inputs from our wallet.
err := set.AddWalletInputs(s.cfg.Wallet)
if err != nil {
return err
}

// Create sweeping transaction for each set.
for _, inputs := range inputSets {
err := s.sweep(inputs)
// Create sweeping transaction for each set.
err = s.sweep(set)
if err != nil {
log.Errorf("sweep new inputs: %v", err)
return err
}

return nil
})
if err != nil {
log.Errorf("input cluster sweep: %v", err)
}
}

return nil
})
if err != nil {
log.Errorf("input cluster sweep: %v", err)
for _, set := range sets {
if set.NeedWalletInput() {
// Sweep the set of inputs that need the wallet inputs.
sweepWithLock(set)
} else {
// Sweep the set of inputs that don't need the wallet
// inputs.
err := s.sweep(set)
if err != nil {
log.Errorf("sweep new inputs: %v", err)
}
}
}
}

Expand Down
57 changes: 49 additions & 8 deletions sweep/tx_input_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const (
constraintsForce
)

var (
// ErrNotEnoughInputs is returned when there are not enough wallet
// inputs to construct a non-dust change output for an input set.
ErrNotEnoughInputs = fmt.Errorf("not enough inputs")
)

// InputSet defines an interface that's responsible for filtering a set of
// inputs that can be swept economically.
type InputSet interface {
Expand All @@ -38,6 +44,15 @@ type InputSet interface {

// FeeRate returns the fee rate that should be used for the tx.
FeeRate() chainfee.SatPerKWeight

// AddWalletInputs adds wallet inputs to the set until a non-dust
// change output can be made. Return an error if there are not enough
// wallet inputs.
AddWalletInputs(wallet Wallet) error

// NeedWalletInput returns true if the input set needs more wallet
// inputs.
NeedWalletInput() bool
}

type txInputSetState struct {
Expand Down Expand Up @@ -125,17 +140,13 @@ type txInputSet struct {
// maxInputs is the maximum number of inputs that will be accepted in
// the set.
maxInputs int

// wallet contains wallet functionality required by the input set to
// retrieve utxos.
wallet Wallet
}

// Compile-time constraint to ensure txInputSet implements InputSet.
var _ InputSet = (*txInputSet)(nil)

// newTxInputSet constructs a new, empty input set.
func newTxInputSet(wallet Wallet, feePerKW, maxFeeRate chainfee.SatPerKWeight,
func newTxInputSet(feePerKW, maxFeeRate chainfee.SatPerKWeight,
maxInputs int) *txInputSet {

state := txInputSetState{
Expand All @@ -145,7 +156,6 @@ func newTxInputSet(wallet Wallet, feePerKW, maxFeeRate chainfee.SatPerKWeight,

b := txInputSet{
maxInputs: maxInputs,
wallet: wallet,
txInputSetState: state,
}

Expand All @@ -162,6 +172,11 @@ func (t *txInputSet) FeeRate() chainfee.SatPerKWeight {
return t.feeRate
}

// NeedWalletInput returns true if the input set needs more wallet inputs.
func (t *txInputSet) NeedWalletInput() bool {
return !t.enoughInput()
}

// enoughInput returns true if we've accumulated enough inputs to pay the fees
// and have at least one output that meets the dust limit.
func (t *txInputSet) enoughInput() bool {
Expand Down Expand Up @@ -384,9 +399,35 @@ func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []*pendingInput) {
// We managed to add all inputs to the set.
}

// AddWalletInputs adds wallet inputs to the set until a non-dust change output
// can be made. Return an error if there are not enough wallet inputs.
func (t *txInputSet) AddWalletInputs(wallet Wallet) error {
// Check the current output value and add wallet utxos if needed to
// push the output value to the lower limit.
if err := t.tryAddWalletInputsIfNeeded(wallet); err != nil {
return err
}

// If the output value of this block of inputs does not reach the dust
// limit, stop sweeping. Because of the sorting, continuing with the
// remaining inputs will only lead to sets with an even lower output
// value.
if !t.enoughInput() {
// The change output is always a p2tr here.
dl := lnwallet.DustLimitForSize(input.P2TRSize)
log.Debugf("Input set value %v (required=%v, change=%v) "+
"below dust limit of %v", t.totalOutput(),
t.requiredOutput, t.changeOutput, dl)

return ErrNotEnoughInputs
}

return nil
}

// tryAddWalletInputsIfNeeded retrieves utxos from the wallet and tries adding
// as many as required to bring the tx output value above the given minimum.
func (t *txInputSet) tryAddWalletInputsIfNeeded() error {
func (t *txInputSet) tryAddWalletInputsIfNeeded(wallet Wallet) error {
// If we've already have enough to pay the transaction fees and have at
// least one output materialize, no action is needed.
if t.enoughInput() {
Expand All @@ -396,7 +437,7 @@ func (t *txInputSet) tryAddWalletInputsIfNeeded() error {
// Retrieve wallet utxos. Only consider confirmed utxos to prevent
// problems around RBF rules for unconfirmed inputs. This currently
// ignores the configured coin selection strategy.
utxos, err := t.wallet.ListUnspentWitnessFromDefaultAccount(
utxos, err := wallet.ListUnspentWitnessFromDefaultAccount(
1, math.MaxInt32,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions sweep/tx_input_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestTxInputSet(t *testing.T) {
feeRate = 1000
maxInputs = 10
)
set := newTxInputSet(nil, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)

// Create a 300 sat input. The fee to sweep this input to a P2WKH output
// is 439 sats. That means that this input yields -139 sats and we
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestTxInputSetFromWallet(t *testing.T) {
)

wallet := &mockWallet{}
set := newTxInputSet(wallet, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)

// Add a 500 sat input to the set. It yields positively, but doesn't
// reach the output dust limit.
Expand All @@ -86,7 +86,7 @@ func TestTxInputSetFromWallet(t *testing.T) {
t.Fatal("expected forced add to succeed")
}

err := set.tryAddWalletInputsIfNeeded()
err := set.AddWalletInputs(wallet)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestTxInputSetRequiredOutput(t *testing.T) {
feeRate = 1000
maxInputs = 10
)
set := newTxInputSet(nil, feeRate, 0, maxInputs)
set := newTxInputSet(feeRate, 0, maxInputs)

// Attempt to add an input with a required txout below the dust limit.
// This should fail since we cannot trim such outputs.
Expand Down

0 comments on commit 92ead93

Please sign in to comment.