Skip to content

Commit

Permalink
multi: add terminator grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Jan 31, 2020
1 parent bb78ded commit a3cdc4b
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 47 deletions.
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
defaultNetwork = "mainnet"
defaultMinimumMonitor = time.Hour * 24 * 7 * 4 // four weeks in hours
defaultDebugLevel = "info"
defaultRPCListen = "localhost:8419"
)

type config struct {
Expand Down Expand Up @@ -48,6 +49,9 @@ type config struct {
// DebugLevel is a string defining the log level for the service either
// for all subsystems the same or individual level by subsystem.
DebugLevel string `long:"debuglevel" description:"Debug level for termaintor and its subsystems."`

// RPCListen is the listen address for the terminator rpc server.
RPCListen string `long:"rpclisten" description:"Address to listen on for gRPC clients"`
}

// loadConfig starts with a skeleton default config, and reads in user provided
Expand All @@ -62,6 +66,7 @@ func loadConfig() (*config, error) {
MacaroonFile: defaultMacaroon,
MinimumMonitored: defaultMinimumMonitor,
DebugLevel: defaultDebugLevel,
RPCListen: defaultRPCListen,
}

// Parse command line options to obtain user specified values.
Expand Down
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/btcsuite/btclog"
"github.com/lightninglabs/terminator/dataset"
"github.com/lightninglabs/terminator/recommend"
"github.com/lightninglabs/terminator/trmrpc"
"github.com/lightningnetwork/lnd/build"
)

Expand All @@ -24,6 +25,7 @@ func init() {
setSubLogger(Subsystem, log, nil)
addSubLogger(recommend.Subsystem, recommend.UseLogger)
addSubLogger(dataset.Subsystem, dataset.UseLogger)
addSubLogger(trmrpc.Subsystem, trmrpc.UseLogger)
}

// UseLogger uses a specified Logger to output package logging info.
Expand Down
65 changes: 18 additions & 47 deletions terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
package terminator

import (
"context"
"fmt"

"github.com/lightninglabs/loop/lndclient"
"github.com/lightninglabs/terminator/recommend"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightninglabs/terminator/trmrpc"
"github.com/lightningnetwork/lnd/signal"
)

// Main is the real entry point for terminator. It is required to ensure that
Expand All @@ -18,9 +17,6 @@ func Main() error {
return fmt.Errorf("error loading config: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// NewBasicClient get a lightning rpc client with
client, err := lndclient.NewBasicClient(
config.RPCServer,
Expand All @@ -30,53 +26,28 @@ func Main() error {
lndclient.MacFilename(config.MacaroonFile),
)
if err != nil {
return fmt.Errorf("cannot connect to lightning client: %v", err)
return fmt.Errorf("cannot connect to lightning client: %v",
err)
}

// Get channel close recommendations for the current set of open public
// channels.
report, err := recommend.CloseRecommendations(
&recommend.CloseRecommendationConfig{
// OpenChannels provides all of the open, public channels for the
// node.
OpenChannels: func() (channels []*lnrpc.Channel, e error) {
resp, err := client.ListChannels(ctx,
&lnrpc.ListChannelsRequest{
PublicOnly: true,
})
if err != nil {
return nil, err
}

return resp.Channels, nil
},

// For the first iteration of the terminator, we set a
// multiplier which will only detect extreme values so
// that we conservatively recommend closes.
OutlierMultiplier: recommend.DefaultOutlierMultiplier,
// Instantiate the terminator gRPC server.
server := trmrpc.NewRPCServer(
&trmrpc.Config{
LightningClient: client,
RPCListen: config.RPCListen,
},
)

// Set the minimum monitor time to the value provided in our config.
MinimumMonitored: config.MinimumMonitored,
})
if err != nil {
return fmt.Errorf("could not get close recommendations: %v", err)
if err := server.Start(); err != nil {
return err
}

log.Infof("Considering: %v channels for closure from a "+
"total of: %v.", report.ConsideredChannels,
report.TotalChannels)

log.Info("Outlier Recommendations:")
for channel, rec := range report.OutlierRecommendations {
log.Infof("%v: Value: %v, Recommend Close: %v", channel,
rec.Value, rec.RecommendClose)
}
// Run until the user terminates.
<-signal.ShutdownChannel()
log.Infof("Received shutdown signal.")

log.Info("Threshold Recommendations:")
for channel, rec := range report.ThresholdRecommendations {
log.Infof("%v: Value: %v, Recommend Close: %v", channel,
rec.Value, rec.RecommendClose)
if err := server.Stop(); err != nil {
return err
}

log.Info("That's all for now. I will be back.")
Expand Down
66 changes: 66 additions & 0 deletions trmrpc/close_recommendations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package trmrpc

import (
"context"
"time"

"github.com/lightninglabs/terminator/recommend"
)

// parseRequest parses a rpc close recommendation request and returns the
// close recommendation config that the request requires.
func parseRequest(ctx context.Context, cfg *Config,
req *CloseRecommendationsRequest) *recommend.CloseRecommendationConfig {

// Create a close recommendations config with the minimum monitored
// value provided in the request and the default outlier multiplier.
recConfig := &recommend.CloseRecommendationConfig{
OpenChannels: cfg.wrapListChannels(ctx, true),
MinimumMonitored: time.Second *
time.Duration(req.MinimumMonitored),
OutlierMultiplier: recommend.DefaultOutlierMultiplier,
}

// If a non-zero outlier multiple was provided, set it on the config.
if req.OutlierMultiplier != 0 {
recConfig.OutlierMultiplier = float64(req.OutlierMultiplier)
}

threshold, ok := req.Threshold.(*CloseRecommendationsRequest_UptimeThreshold)
if ok {
recConfig.UptimeThreshold = float64(threshold.UptimeThreshold)
}

return recConfig
}

// parseResponse parses the response obtained getting a close recommendation
// and converts it to a close recommendation response.
func parseResponse(report *recommend.Report) *CloseRecommendationsResponse {
resp := &CloseRecommendationsResponse{
TotalChannels: int32(report.TotalChannels),
ConsideredChannels: int32(report.ConsideredChannels),
}

for chanPoint, rec := range report.OutlierRecommendations {
resp.OutlierRecommendations = append(
resp.OutlierRecommendations, &Recommendation{
ChanPoint: chanPoint,
Value: float32(rec.Value),
RecommendClose: rec.RecommendClose,
},
)
}

for chanPoint, rec := range report.ThresholdRecommendations {
resp.ThresholdRecommendations = append(
resp.ThresholdRecommendations, &Recommendation{
ChanPoint: chanPoint,
Value: float32(rec.Value),
RecommendClose: rec.RecommendClose,
},
)
}

return resp
}
26 changes: 26 additions & 0 deletions trmrpc/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package trmrpc

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the logging code for this subsystem.
const Subsystem = "TRPC"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the
// caller requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
145 changes: 145 additions & 0 deletions trmrpc/rpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Package trmrpc contains the proto files, generated code and server logic
// for the terminator's grpc server which serves requests for close
// recommendations.
//
// The Terminator server interface is implemented by the RPCServer struct.
// To keep this file readable, each function implemented by the interface
// has a file named after the function call which contains rpc parsing
// code for the request and response. If the call requires extensive
// additional logic, and unexported function with the same name should
// be created in this file as well.
package trmrpc

import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"

"github.com/lightninglabs/terminator/recommend"
"github.com/lightningnetwork/lnd/lnrpc"
"google.golang.org/grpc"
)

// RPCServer implements the terminator service, serving requests over grpc.
type RPCServer struct {
// To be used atomically.
started int32

// To be used atomically.
stopped int32

// cfg contains closures and settings required for operation.
cfg *Config

// grpcServer is the main gRPC RPCServer that this RPC server will
// register itself with and accept client requests from.
grpcServer *grpc.Server

// rpcListener is the to use when starting the grpc server.
rpcListener net.Listener

wg sync.WaitGroup
}

// Config provides closures and settings required to run the rpc server.
type Config struct {
// LightningClient is a client which can be used to query lnd.
LightningClient lnrpc.LightningClient

// RPCListen is the address:port that the rpc server should listen
// on.
RPCListen string
}

// wrapListChannels wraps the listchannels call to lnd, with a publicOnly bool
// that can be used to toggle whether private channels are included.
func (c *Config) wrapListChannels(ctx context.Context,
publicOnly bool) func() ([]*lnrpc.Channel, error) {

return func() (channels []*lnrpc.Channel, e error) {
resp, err := c.LightningClient.ListChannels(
ctx,
&lnrpc.ListChannelsRequest{
PublicOnly: publicOnly,
},
)
if err != nil {
return nil, err
}

return resp.Channels, nil
}
}

// NewRPCServer returns a server which will listen for rpc requests on the
// rpc listen address provided. Note that the server returned is not running,
// and should be started using Start().
func NewRPCServer(cfg *Config) *RPCServer {
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)

return &RPCServer{
cfg: cfg,
grpcServer: grpcServer,
}
}

// Start starts the listener and server.
func (s *RPCServer) Start() error {
if atomic.AddInt32(&s.started, 1) != 1 {
return nil
}

// Start the gRPC RPCServer listening for HTTP/2 connections.
log.Info("Starting gRPC listener")
grpcListener, err := net.Listen("tcp", s.cfg.RPCListen)
if err != nil {
return fmt.Errorf("RPC RPCServer unable to listen on %v",
s.cfg.RPCListen)

}
s.rpcListener = grpcListener

RegisterTerminatorServerServer(s.grpcServer, s)

s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.grpcServer.Serve(s.rpcListener); err != nil {
log.Errorf("could not serve grpc server: %v", err)
}
}()

return nil
}

// Stop stops the grpc listener and server.
func (s *RPCServer) Stop() error {
if atomic.AddInt32(&s.stopped, 1) != 1 {
return nil
}

// Stop the grpc server and wait for all go routines to terminate.
s.grpcServer.Stop()
s.wg.Wait()

return nil
}

// CloseRecommendations provides a set of close recommendations for the
// current set of open channels.
func (s *RPCServer) CloseRecommendations(ctx context.Context,
req *CloseRecommendationsRequest) (*CloseRecommendationsResponse,
error) {

cfg := parseRequest(ctx, s.cfg, req)

report, err := recommend.CloseRecommendations(cfg)
if err != nil {
return nil, err
}

return parseResponse(report), nil
}

0 comments on commit a3cdc4b

Please sign in to comment.