Skip to content

Commit

Permalink
Merge pull request #7702 from ellemouton/towerClientMux
Browse files Browse the repository at this point in the history
wtclient: Tower Client Multiplexer
  • Loading branch information
ellemouton authored Dec 5, 2023
2 parents ad88396 + 59ebe02 commit 4fa483f
Show file tree
Hide file tree
Showing 14 changed files with 1,256 additions and 1,006 deletions.
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@
In particular, the complexity involved in the lifecycle loop has been
decoupled into logical steps, with each step having its own responsibility,
making it easier to reason about the payment flow.

* [Add a watchtower tower client
multiplexer](https://github.com/lightningnetwork/lnd/pull/7702) to manage
tower clients of different types.

## Breaking Changes
## Performance Improvements
Expand Down
2 changes: 1 addition & 1 deletion htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ type TowerClient interface {
// parameters within the client. This should be called during link
// startup to ensure that the client is able to support the link during
// operation.
RegisterChannel(lnwire.ChannelID) error
RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error

// BackupState initiates a request to back up a particular revoked
// state. If the method returns nil, the backup is guaranteed to be
Expand Down
4 changes: 3 additions & 1 deletion htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ func (l *channelLink) Start() error {
// If the config supplied watchtower client, ensure the channel is
// registered before trying to use it during operation.
if l.cfg.TowerClient != nil {
err := l.cfg.TowerClient.RegisterChannel(l.ChanID())
err := l.cfg.TowerClient.RegisterChannel(
l.ChanID(), l.channel.State().ChanType,
)
if err != nil {
return err
}
Expand Down
10 changes: 3 additions & 7 deletions lnrpc/wtclientrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@ type Config struct {
// Active indicates if the watchtower client is enabled.
Active bool

// Client is the backing watchtower client that we'll interact with
// through the watchtower RPC subserver.
Client wtclient.Client

// AnchorClient is the backing watchtower client for anchor channels that
// we'll interact through the watchtower RPC subserver.
AnchorClient wtclient.Client
// ClientMgr is a tower client manager that manages a set of tower
// clients.
ClientMgr wtclient.ClientManager

// Resolver is a custom resolver that will be used to resolve watchtower
// addresses to ensure we don't leak any information when running over
Expand Down
156 changes: 67 additions & 89 deletions lnrpc/wtclientrpc/wtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower"
"github.com/lightningnetwork/lnd/watchtower/blob"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
Expand Down Expand Up @@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context,
Address: addr,
}

// TODO(conner): make atomic via multiplexed client
if err := c.cfg.Client.AddTower(towerAddr); err != nil {
return nil, err
}
if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil {
if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil {
return nil, err
}

Expand Down Expand Up @@ -247,12 +243,7 @@ func (c *WatchtowerClient) RemoveTower(ctx context.Context,
}
}

// TODO(conner): make atomic via multiplexed client
err = c.cfg.Client.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
err = c.cfg.AnchorClient.RemoveTower(pubKey, addr)
err = c.cfg.ClientMgr.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
Expand All @@ -272,44 +263,40 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)

anchorTowers, err := c.cfg.AnchorClient.RegisteredTowers(opts...)
if err != nil {
return nil, err
}

// Collect all the anchor client towers.
rpcTowers := make(map[wtdb.TowerID]*Tower)
for _, tower := range anchorTowers {
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

rpcTowers[tower.ID] = rpcTower
}

legacyTowers, err := c.cfg.Client.RegisteredTowers(opts...)
towersPerBlobType, err := c.cfg.ClientMgr.RegisteredTowers(opts...)
if err != nil {
return nil, err
}

// Collect all the legacy client towers. If it has any of the same
// towers that the anchors client has, then just add the session info
// for the legacy client to the existing tower.
for _, tower := range legacyTowers {
rpcTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
rpcTowers := make(map[wtdb.TowerID]*Tower)
for blobType, towers := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}

t.SessionInfo = append(t.SessionInfo, rpcTower.SessionInfo...)
t.Sessions = append(t.Sessions, rpcTower.Sessions...)
for _, tower := range towers {
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
}

t.SessionInfo = append(
t.SessionInfo, rpcTower.SessionInfo...,
)
t.Sessions = append(
t.Sessions, rpcTower.Sessions...,
)
}
}

towers := make([]*Tower, 0, len(rpcTowers))
Expand Down Expand Up @@ -337,40 +324,42 @@ func (c *WatchtowerClient) GetTowerInfo(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)

// Get the tower and its sessions from anchors client.
tower, err := c.cfg.AnchorClient.LookupTower(pubKey, opts...)
towersPerBlobType, err := c.cfg.ClientMgr.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)

// Get the tower and its sessions from legacy client.
tower, err = c.cfg.Client.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
var resTower *Tower
for blobType, tower := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}

rpcLegacyTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

if !bytes.Equal(rpcTower.Pubkey, rpcLegacyTower.Pubkey) {
return nil, fmt.Errorf("legacy and anchor clients returned " +
"inconsistent results for the given tower")
}
if resTower == nil {
resTower = rpcTower
continue
}

rpcTower.SessionInfo = append(
rpcTower.SessionInfo, rpcLegacyTower.SessionInfo...,
)
rpcTower.Sessions = append(
rpcTower.Sessions, rpcLegacyTower.Sessions...,
)
if !bytes.Equal(rpcTower.Pubkey, resTower.Pubkey) {
return nil, fmt.Errorf("tower clients returned " +
"inconsistent results for the given tower")
}

return rpcTower, nil
resTower.SessionInfo = append(
resTower.SessionInfo, rpcTower.SessionInfo...,
)
resTower.Sessions = append(
resTower.Sessions, rpcTower.Sessions...,
)
}

return resTower, nil
}

// constructFunctionalOptions is a helper function that constructs a list of
Expand Down Expand Up @@ -422,30 +411,14 @@ func constructFunctionalOptions(includeSessions,
}

// Stats returns the in-memory statistics of the client since startup.
func (c *WatchtowerClient) Stats(ctx context.Context,
req *StatsRequest) (*StatsResponse, error) {
func (c *WatchtowerClient) Stats(_ context.Context,
_ *StatsRequest) (*StatsResponse, error) {

if err := c.isActive(); err != nil {
return nil, err
}

clientStats := []wtclient.ClientStats{
c.cfg.Client.Stats(),
c.cfg.AnchorClient.Stats(),
}

var stats wtclient.ClientStats
for i := range clientStats {
// Grab a reference to the slice index rather than copying bc
// ClientStats contains a lock which cannot be copied by value.
stat := &clientStats[i]

stats.NumTasksAccepted += stat.NumTasksAccepted
stats.NumTasksIneligible += stat.NumTasksIneligible
stats.NumTasksPending += stat.NumTasksPending
stats.NumSessionsAcquired += stat.NumSessionsAcquired
stats.NumSessionsExhausted += stat.NumSessionsExhausted
}
stats := c.cfg.ClientMgr.Stats()

return &StatsResponse{
NumBackups: uint32(stats.NumTasksAccepted),
Expand All @@ -464,17 +437,22 @@ func (c *WatchtowerClient) Policy(ctx context.Context,
return nil, err
}

var policy wtpolicy.Policy
var blobType blob.Type
switch req.PolicyType {
case PolicyType_LEGACY:
policy = c.cfg.Client.Policy()
blobType = blob.TypeAltruistCommit
case PolicyType_ANCHOR:
policy = c.cfg.AnchorClient.Policy()
blobType = blob.TypeAltruistAnchorCommit
default:
return nil, fmt.Errorf("unknown policy type: %v",
req.PolicyType)
}

policy, err := c.cfg.ClientMgr.Policy(blobType)
if err != nil {
return nil, err
}

return &PolicyResponse{
MaxUpdates: uint32(policy.MaxUpdates),
SweepSatPerVbyte: uint32(policy.SweepFeeRate.FeePerVByte()),
Expand Down
22 changes: 5 additions & 17 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,8 @@ type Config struct {
// HtlcNotifier is used when creating a ChannelLink.
HtlcNotifier *htlcswitch.HtlcNotifier

// TowerClient is used by legacy channels to backup revoked states.
TowerClient wtclient.Client

// AnchorTowerClient is used by anchor channels to backup revoked
// states.
AnchorTowerClient wtclient.Client
// TowerClient is used to backup revoked states.
TowerClient wtclient.ClientManager

// DisconnectPeer is used to disconnect this peer if the cooperative close
// process fails.
Expand Down Expand Up @@ -1040,14 +1036,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update)
}

chanType := lnChan.State().ChanType

// Select the appropriate tower client based on the channel type. It's
// okay if the clients are disabled altogether and these values are nil,
// as the link will check for nilness before using either.
var towerClient htlcswitch.TowerClient
switch {
case chanType.IsTaproot():
var towerClient wtclient.ClientManager
if lnChan.ChanType().IsTaproot() {
// Leave the tower client as nil for now until the tower client
// has support for taproot channels.
//
Expand All @@ -1060,9 +1050,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
"are not yet taproot channel compatible",
chanPoint)
}
case chanType.HasAnchors():
towerClient = p.cfg.AnchorTowerClient
default:
} else {
towerClient = p.cfg.TowerClient
}

Expand Down
9 changes: 4 additions & 5 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,10 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB,
s.sweeper, tower, s.towerClient, s.anchorTowerClient,
r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
genAmpInvoiceFeatures, s.getNodeAnnouncement,
s.updateAndBrodcastSelfNode, parseAddr, rpcsLog,
s.aliasMgr.GetPeerAlias,
s.sweeper, tower, s.towerClientMgr, r.cfg.net.ResolveTCPAddr,
genInvoiceFeatures, genAmpInvoiceFeatures,
s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr,
rpcsLog, s.aliasMgr.GetPeerAlias,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 4fa483f

Please sign in to comment.