From 7b6d2d2217afc10139e32518fc1032d06cb42392 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Tue, 10 Sep 2024 00:40:48 +0300 Subject: [PATCH 01/17] Added support for multiple node urls --- cmd/wardenkms/wardenkms.go | 48 +++++++------ keychain-sdk/config.go | 19 +++-- keychain-sdk/example_keychain_test.go | 29 ++++---- keychain-sdk/internal/tracker/tracker.go | 39 ++++++++-- keychain-sdk/internal/writer/writer.go | 27 +++---- keychain-sdk/key_requests.go | 31 ++++---- keychain-sdk/keychain.go | 90 +++++++++++++++++++----- keychain-sdk/sign_requests.go | 33 +++++---- 8 files changed, 215 insertions(+), 101 deletions(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 0b7c8fda7..7c7a6b30b 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -20,11 +20,11 @@ import ( ) type Config struct { - ChainID string `env:"CHAIN_ID, default=warden_1337-1"` - GRPCURL string `env:"GRPC_URL, default=localhost:9090"` - GRPCInsecure bool `env:"GRPC_INSECURE, default=true"` - Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` - KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` + ChainID string `env:"CHAIN_ID, default=warden"` + GRPCURLs []string `env:"GRPC_URLS, default=localhost:9090"` + GRPCInsecure bool `env:"GRPC_INSECURE, default=true"` + Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` + KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` KeyringMnemonic string `env:"KEYRING_MNEMONIC, required"` KeyringPassword string `env:"KEYRING_PASSWORD, required"` @@ -38,6 +38,8 @@ type Config struct { HttpAddr string `env:"HTTP_ADDR, default=:8080"` LogLevel slog.Level `env:"LOG_LEVEL, default=debug"` + + ConsensusNodeThreshold uint `env:"CONSENSUS_NODE_THRESHOLD, default=1"` } func main() { @@ -57,17 +59,20 @@ func main() { } app := keychain.NewApp(keychain.Config{ - Logger: logger, - ChainID: cfg.ChainID, - GRPCURL: cfg.GRPCURL, - GRPCInsecure: cfg.GRPCInsecure, - Mnemonic: cfg.Mnemonic, - KeychainID: cfg.KeychainId, - GasLimit: cfg.GasLimit, - BatchInterval: cfg.BatchInterval, - BatchSize: cfg.BatchSize, - TxTimeout: cfg.TxTimeout, - TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), + BasicConfig: keychain.BasicConfig{ + Logger: logger, + ChainID: cfg.ChainID, + GRPCInsecure: cfg.GRPCInsecure, + Mnemonic: cfg.Mnemonic, + KeychainID: cfg.KeychainId, + GasLimit: cfg.GasLimit, + BatchInterval: cfg.BatchInterval, + BatchSize: cfg.BatchSize, + TxTimeout: cfg.TxTimeout, + TxFees: sdk.NewCoins(sdk.NewCoin("uward", math.NewInt(cfg.TxFee))), + }, + GRPCURLs: cfg.GRPCURLs, + ConsensusNodeThreshold: cfg.ConsensusNodeThreshold, }) app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) { @@ -120,11 +125,14 @@ func main() { if cfg.HttpAddr != "" { logger.Info("starting HTTP server", "addr", cfg.HttpAddr) http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) { - if app.ConnectionState() == connectivity.Ready { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusServiceUnavailable) + for _, state := range app.ConnectionState() { + if state == connectivity.Ready { + w.WriteHeader(http.StatusOK) + return + } } + + w.WriteHeader(http.StatusServiceUnavailable) }) go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }() } diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index 72d45bce3..74347aa90 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -7,8 +7,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// Config is the configuration for the Keychain. -type Config struct { +type BasicConfig struct { // Logger is the logger to use for the Keychain. // If nil, no logging will be done. Logger *slog.Logger @@ -16,10 +15,6 @@ type Config struct { // ChainID is the chain ID of the chain to connect to. ChainID string - // GRPCURL is the URL of the gRPC server to connect to. - // e.g. "localhost:9090" - GRPCURL string - // GRPCInsecure determines whether to allow an insecure connection to the // gRPC server. GRPCInsecure bool @@ -58,3 +53,15 @@ type Config struct { // failed (but the blockchain might still include in a block later). TxTimeout time.Duration } + +// Config is the configuration for the Keychain. +type Config struct { + BasicConfig + + // ConsensusNodeThreshold represents the number of nodes required to execute a pending key/sign request. + ConsensusNodeThreshold uint + + // GRPCURLs is the list of URLs of the gRPC server to connect to. + // e.g. "localhost:9090" + GRPCURLs []string +} diff --git a/keychain-sdk/example_keychain_test.go b/keychain-sdk/example_keychain_test.go index 501bf7368..ac3b420b5 100644 --- a/keychain-sdk/example_keychain_test.go +++ b/keychain-sdk/example_keychain_test.go @@ -18,21 +18,26 @@ func Main() { })) app := keychain.NewApp(keychain.Config{ - Logger: logger, // not required, but recommended + BasicConfig: keychain.BasicConfig{ + Logger: logger, // not required, but recommended - // setup the connection to the Warden Protocol node - ChainID: "warden", - GRPCURL: "localhost:9090", - GRPCInsecure: true, + // setup the connection to the Warden Protocol node + ChainID: "warden", - // setup the account used to write txs - KeychainID: 1, - Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", + GRPCInsecure: true, - // setup throughput for batching responses - GasLimit: 400000, - BatchInterval: 8 * time.Second, - BatchSize: 10, + // setup the account used to write txs + KeychainID: 1, + Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", + + // setup throughput for batching responses + GasLimit: 400000, + BatchInterval: 8 * time.Second, + BatchSize: 10, + }, + + GRPCURLs: []string{"localhost:9090"}, + ConsensusNodeThreshold: 1, }) app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) { diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index e2652d848..1061cd7f4 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -4,28 +4,53 @@ import ( "sync" ) +type hashSet map[string]struct{} + type T struct { rw sync.RWMutex - ingested map[uint64]struct{} + ingested map[uint64]hashSet } func New() *T { return &T{ - ingested: make(map[uint64]struct{}), + ingested: make(map[uint64]hashSet), + } +} + +func (t *T) IsNew(id uint64, nodeUrl string) bool { + t.rw.RLock() + defer t.rw.RUnlock() + value, exists := t.ingested[id] + if !exists { + return true } + + _, alreadySeen := value[nodeUrl] + return !alreadySeen } -func (t *T) IsNew(id uint64) bool { +func (t *T) HasReachedConsensus(id uint64, threshold uint) bool { t.rw.RLock() defer t.rw.RUnlock() - _, ok := t.ingested[id] - return !ok + value, exists := t.ingested[id] + if !exists { + return false + } + + return len(value) >= int(threshold) } -func (t *T) Ingested(id uint64) { +func (t *T) Ingested(id uint64, nodeUrl string) { t.rw.Lock() defer t.rw.Unlock() - t.ingested[id] = struct{}{} + + value, ok := t.ingested[id] + if ok == false { + t.ingested[id] = make(hashSet) + value = t.ingested[id] + } + + value[nodeUrl] = struct{}{} } func (t *T) Done(id uint64) { diff --git a/keychain-sdk/internal/writer/writer.go b/keychain-sdk/internal/writer/writer.go index 7540a0839..5a429d00e 100644 --- a/keychain-sdk/internal/writer/writer.go +++ b/keychain-sdk/internal/writer/writer.go @@ -18,9 +18,6 @@ type W struct { // included in a block after being broadcasted. TxTimeout time.Duration - // Client is the client used to send transactions to the chain. - Client *client.TxClient - Logger *slog.Logger GasLimit uint64 @@ -36,14 +33,12 @@ type W struct { } func New( - client *client.TxClient, batchSize int, batchInterval time.Duration, txTimeout time.Duration, logger *slog.Logger, ) *W { return &W{ - Client: client, BatchInterval: batchInterval, TxTimeout: txTimeout, Logger: logger, @@ -51,7 +46,9 @@ func New( } } -func (w *W) Start(ctx context.Context, flushErrors chan error) error { +type FnResolveLiveClient func() (*client.TxClient, error) + +func (w *W) Start(ctx context.Context, liveClientFn FnResolveLiveClient, flushErrors chan error) error { w.Logger.Info("starting tx writer") for { select { @@ -62,9 +59,15 @@ func (w *W) Start(ctx context.Context, flushErrors chan error) error { if w.TxTimeout > 0 { ctx, cancel = context.WithTimeout(ctx, w.TxTimeout) } - if err := w.Flush(ctx); err != nil { + + if txClient, err := liveClientFn(); err != nil { flushErrors <- err + } else { + if err := w.Flush(ctx, txClient); err != nil { + flushErrors <- err + } } + cancel() time.Sleep(w.BatchInterval) } @@ -97,7 +100,7 @@ func (w *W) fees() sdk.Coins { return w.Fees } -func (w *W) Flush(ctx context.Context) error { +func (w *W) Flush(ctx context.Context, txClient *client.TxClient) error { msgs := w.batch.Clear() if len(msgs) == 0 { w.Logger.Debug("flushing batch", "empty", true) @@ -115,7 +118,7 @@ func (w *W) Flush(ctx context.Context) error { msgers[i] = item.Msger } - if err := w.sendWaitTx(ctx, msgers...); err != nil { + if err := w.sendWaitTx(ctx, txClient, msgers...); err != nil { for _, item := range msgs { item.Done <- err } @@ -125,18 +128,18 @@ func (w *W) Flush(ctx context.Context) error { return nil } -func (w *W) sendWaitTx(ctx context.Context, msgs ...client.Msger) error { +func (w *W) sendWaitTx(ctx context.Context, txClient *client.TxClient, msgs ...client.Msger) error { w.sendTxLock.Lock() defer w.sendTxLock.Unlock() w.Logger.Info("flushing batch", "count", len(msgs)) - tx, err := w.Client.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...) + tx, err := txClient.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...) if err != nil { return err } - if err = w.Client.SendWaitTx(ctx, tx); err != nil { + if err = txClient.SendWaitTx(ctx, tx); err != nil { return err } diff --git a/keychain-sdk/key_requests.go b/keychain-sdk/key_requests.go index 112cd870f..f690d9c77 100644 --- a/keychain-sdk/key_requests.go +++ b/keychain-sdk/key_requests.go @@ -56,23 +56,30 @@ func (w *keyResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest) { +func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, client *AppClient) { + ingestRequest := func(keyRequest *wardentypes.KeyRequest) { + if !a.keyRequestTracker.IsNew(keyRequest.Id, client.grpcUrl) { + a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) + return + } + + a.logger().Info("got key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) + a.keyRequestTracker.Ingested(keyRequest.Id, client.grpcUrl) + + if a.keyRequestTracker.HasReachedConsensus(keyRequest.Id, a.config.ConsensusNodeThreshold) { + keyRequestsCh <- keyRequest + } + } + for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - keyRequests, err := a.keyRequests(reqCtx) + keyRequests, err := client.keyRequests(reqCtx) cancel() if err != nil { a.logger().Error("failed to get key requests", "error", err) } else { for _, keyRequest := range keyRequests { - if !a.keyRequestTracker.IsNew(keyRequest.Id) { - a.logger().Debug("skipping key request", "id", keyRequest.Id) - continue - } - - a.logger().Info("got key request", "id", keyRequest.Id) - a.keyRequestTracker.Ingested(keyRequest.Id) - keyRequestsCh <- keyRequest + ingestRequest(keyRequest) } } @@ -109,6 +116,6 @@ func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { }() } -func (a *App) keyRequests(ctx context.Context) ([]*wardentypes.KeyRequest, error) { - return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(a.config.BatchSize)}, a.config.KeychainID) +func (a *AppClient) keyRequests(ctx context.Context) ([]*wardentypes.KeyRequest, error) { + return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(a.batchSize)}, a.keychainId) } diff --git a/keychain-sdk/keychain.go b/keychain-sdk/keychain.go index 071368d1f..3c3a1ff6b 100644 --- a/keychain-sdk/keychain.go +++ b/keychain-sdk/keychain.go @@ -27,10 +27,19 @@ type App struct { keyRequestHandler KeyRequestHandler signRequestHandler SignRequestHandler - query *client.QueryClient txWriter *writer.W keyRequestTracker *tracker.T signRequestTracker *tracker.T + + clients []*AppClient +} + +type AppClient struct { + grpcUrl string + batchSize int + keychainId uint64 + query *client.QueryClient + txClient *client.TxClient } // NewApp creates a new Keychain application, using the given configuration. @@ -39,6 +48,7 @@ func NewApp(config Config) *App { config: config, keyRequestTracker: tracker.New(), signRequestTracker: tracker.New(), + clients: []*AppClient{}, } } @@ -71,16 +81,20 @@ func (a *App) Start(ctx context.Context) error { keyRequestsCh := make(chan *wardentypes.KeyRequest) defer close(keyRequestsCh) - go a.ingestKeyRequests(keyRequestsCh) + for _, appClient := range a.clients { + go a.ingestKeyRequests(keyRequestsCh, appClient) + } signRequestsCh := make(chan *wardentypes.SignRequest) defer close(signRequestsCh) - go a.ingestSignRequests(signRequestsCh) + for _, appClient := range a.clients { + go a.ingestSignRequests(signRequestsCh, appClient) + } flushErrors := make(chan error) defer close(flushErrors) go func() { - if err := a.txWriter.Start(ctx, flushErrors); err != nil { + if err := a.txWriter.Start(ctx, a.liveTxClient, flushErrors); err != nil { a.logger().Error("tx writer exited with error", "error", err) } }() @@ -99,30 +113,68 @@ func (a *App) Start(ctx context.Context) error { } } +func (a *App) liveTxClient() (*client.TxClient, error) { + for _, appClient := range a.clients { + if state := appClient.query.Conn().GetState(); state == connectivity.Ready || state == connectivity.Idle { + return appClient.txClient, nil + } + } + + return nil, fmt.Errorf("all node clients are down") +} + // ConnectionState returns the current state of the gRPC connection. -func (a *App) ConnectionState() connectivity.State { - return a.query.Conn().GetState() +func (a *App) ConnectionState() map[string]connectivity.State { + statuses := make(map[string]connectivity.State) + + for _, appClient := range a.clients { + state := appClient.query.Conn().GetState() + statuses[appClient.grpcUrl] = state + } + + return statuses } func (a *App) initConnections() error { - a.logger().Info("connecting to Warden Protocol using gRPC", "url", a.config.GRPCURL, "insecure", a.config.GRPCInsecure) - query, err := client.NewQueryClient(a.config.GRPCURL, a.config.GRPCInsecure) - if err != nil { - return fmt.Errorf("failed to create query client: %w", err) - } - a.query = query + initConnection := func(logger *slog.Logger, grpcUlr string, config BasicConfig) (*AppClient, error) { + appClient := &AppClient{ + batchSize: config.BatchSize, + keychainId: config.KeychainID, + grpcUrl: grpcUlr, + } - conn := query.Conn() + logger.Info("connecting to Warden Protocol using gRPC", "url", grpcUlr, "insecure", config.GRPCInsecure) - identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) - if err != nil { - return fmt.Errorf("failed to create identity: %w", err) + query, err := client.NewQueryClient(grpcUlr, config.GRPCInsecure) + if err != nil { + return nil, fmt.Errorf("failed to create query client: %w", err) + } + appClient.query = query + + conn := query.Conn() + + identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) + if err != nil { + return nil, fmt.Errorf("failed to create identity: %w", err) + } + + logger.Info("keychain writer identity", "address", identity.Address.String()) + + appClient.txClient = client.NewTxClient(identity, config.ChainID, conn, query) + + return appClient, nil } - a.logger().Info("keychain writer identity", "address", identity.Address.String()) + for _, grpcUrl := range a.config.GRPCURLs { + appClient, err := initConnection(a.logger(), grpcUrl, a.config.BasicConfig) + if err != nil { + return err + } + + a.clients = append(a.clients, appClient) + } - txClient := client.NewTxClient(identity, a.config.ChainID, conn, query) - a.txWriter = writer.New(txClient, a.config.BatchSize, a.config.BatchInterval, a.config.TxTimeout, a.logger()) + a.txWriter = writer.New(a.config.BatchSize, a.config.BatchInterval, a.config.TxTimeout, a.logger()) a.txWriter.Fees = a.config.TxFees a.txWriter.GasLimit = a.config.GasLimit diff --git a/keychain-sdk/sign_requests.go b/keychain-sdk/sign_requests.go index 7a27c93bb..d29644416 100644 --- a/keychain-sdk/sign_requests.go +++ b/keychain-sdk/sign_requests.go @@ -12,7 +12,7 @@ import ( wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" ) -// SignaResponseWriter is the interface for writing responses to sign requests. +// SignResponseWriter is the interface for writing responses to sign requests. type SignResponseWriter interface { // Fulfil writes the signature to the sign request. Fulfil(signature []byte) error @@ -68,23 +68,30 @@ func (w *signResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest) { +func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, appClient *AppClient) { + ingestRequest := func(signRequest *wardentypes.SignRequest) { + if !a.signRequestTracker.IsNew(signRequest.Id, appClient.grpcUrl) { + a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) + return + } + + a.logger().Info("got sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) + a.signRequestTracker.Ingested(signRequest.Id, appClient.grpcUrl) + + if a.signRequestTracker.HasReachedConsensus(signRequest.Id, a.config.ConsensusNodeThreshold) { + signRequestsCh <- signRequest + } + } + for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - signRequests, err := a.signRequests(reqCtx) + signRequests, err := appClient.signRequests(reqCtx) cancel() if err != nil { a.logger().Error("failed to get sign requests", "error", err) } else { for _, signRequest := range signRequests { - if !a.signRequestTracker.IsNew(signRequest.Id) { - a.logger().Debug("skipping sign request", "id", signRequest.Id) - continue - } - - a.logger().Info("got sign request", "id", signRequest.Id) - a.signRequestTracker.Ingested(signRequest.Id) - signRequestsCh <- signRequest + ingestRequest(signRequest) } } @@ -129,6 +136,6 @@ func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { }() } -func (a *App) signRequests(ctx context.Context) ([]*wardentypes.SignRequest, error) { - return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(a.config.BatchSize)}, a.config.KeychainID) +func (a *AppClient) signRequests(ctx context.Context) ([]*wardentypes.SignRequest, error) { + return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(a.batchSize)}, a.keychainId) } From 8e488215d22bde613b062f90d35bbec5f62c7d11 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Tue, 10 Sep 2024 00:52:27 +0300 Subject: [PATCH 02/17] Changed config --- cmd/wardenkms/wardenkms.go | 11 +++++++++-- keychain-sdk/config.go | 16 +++++++++++----- keychain-sdk/keychain.go | 26 ++++++++++++++------------ 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 7c7a6b30b..7ad9768fc 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -58,11 +58,18 @@ func main() { return } + grpcConfigs := make([]keychain.GrpcNodeConfig, 0) + for url, grpcInsecure := range cfg.GRPCURLs { + grpcConfigs = append(grpcConfigs, keychain.GrpcNodeConfig{ + GRPCInsecure: grpcInsecure, + GRPCURL: url, + }) + } + app := keychain.NewApp(keychain.Config{ BasicConfig: keychain.BasicConfig{ Logger: logger, ChainID: cfg.ChainID, - GRPCInsecure: cfg.GRPCInsecure, Mnemonic: cfg.Mnemonic, KeychainID: cfg.KeychainId, GasLimit: cfg.GasLimit, @@ -71,7 +78,7 @@ func main() { TxTimeout: cfg.TxTimeout, TxFees: sdk.NewCoins(sdk.NewCoin("uward", math.NewInt(cfg.TxFee))), }, - GRPCURLs: cfg.GRPCURLs, + GRPCConfigs: grpcConfigs, ConsensusNodeThreshold: cfg.ConsensusNodeThreshold, }) diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index 74347aa90..13ade5615 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -15,10 +15,6 @@ type BasicConfig struct { // ChainID is the chain ID of the chain to connect to. ChainID string - // GRPCInsecure determines whether to allow an insecure connection to the - // gRPC server. - GRPCInsecure bool - // KeychainID is the ID of the keychain this instance will fetch requests // for. KeychainID uint64 @@ -63,5 +59,15 @@ type Config struct { // GRPCURLs is the list of URLs of the gRPC server to connect to. // e.g. "localhost:9090" - GRPCURLs []string + GRPCConfigs []GrpcNodeConfig +} + +type GrpcNodeConfig struct { + // GRPCInsecure determines whether to allow an insecure connection to the + // gRPC server. + GRPCInsecure bool + + // GRPCURLs is the list of URLs of the gRPC server to connect to. + // e.g. "localhost:9090" + GRPCURL string } diff --git a/keychain-sdk/keychain.go b/keychain-sdk/keychain.go index 3c3a1ff6b..2091946ae 100644 --- a/keychain-sdk/keychain.go +++ b/keychain-sdk/keychain.go @@ -35,11 +35,12 @@ type App struct { } type AppClient struct { - grpcUrl string - batchSize int - keychainId uint64 - query *client.QueryClient - txClient *client.TxClient + grpcUrl string + grpcInsecure bool + batchSize int + keychainId uint64 + query *client.QueryClient + txClient *client.TxClient } // NewApp creates a new Keychain application, using the given configuration. @@ -136,16 +137,17 @@ func (a *App) ConnectionState() map[string]connectivity.State { } func (a *App) initConnections() error { - initConnection := func(logger *slog.Logger, grpcUlr string, config BasicConfig) (*AppClient, error) { + initConnection := func(logger *slog.Logger, grpcNodeConfig GrpcNodeConfig, config BasicConfig) (*AppClient, error) { appClient := &AppClient{ - batchSize: config.BatchSize, - keychainId: config.KeychainID, - grpcUrl: grpcUlr, + batchSize: config.BatchSize, + keychainId: config.KeychainID, + grpcUrl: grpcNodeConfig.GRPCURL, + grpcInsecure: grpcNodeConfig.GRPCInsecure, } - logger.Info("connecting to Warden Protocol using gRPC", "url", grpcUlr, "insecure", config.GRPCInsecure) + logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig, "insecure", grpcNodeConfig.GRPCInsecure) - query, err := client.NewQueryClient(grpcUlr, config.GRPCInsecure) + query, err := client.NewQueryClient(grpcNodeConfig.GRPCURL, grpcNodeConfig.GRPCInsecure) if err != nil { return nil, fmt.Errorf("failed to create query client: %w", err) } @@ -165,7 +167,7 @@ func (a *App) initConnections() error { return appClient, nil } - for _, grpcUrl := range a.config.GRPCURLs { + for _, grpcUrl := range a.config.GRPCConfigs { appClient, err := initConnection(a.logger(), grpcUrl, a.config.BasicConfig) if err != nil { return err From eefa6f1c2ae1f85e9cce367fb61e7e79a1e3f5fb Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Tue, 10 Sep 2024 00:56:39 +0300 Subject: [PATCH 03/17] Fixed test --- keychain-sdk/example_keychain_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/keychain-sdk/example_keychain_test.go b/keychain-sdk/example_keychain_test.go index ac3b420b5..0ae840c1e 100644 --- a/keychain-sdk/example_keychain_test.go +++ b/keychain-sdk/example_keychain_test.go @@ -24,8 +24,6 @@ func Main() { // setup the connection to the Warden Protocol node ChainID: "warden", - GRPCInsecure: true, - // setup the account used to write txs KeychainID: 1, Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", @@ -36,7 +34,7 @@ func Main() { BatchSize: 10, }, - GRPCURLs: []string{"localhost:9090"}, + GRPCConfigs: []keychain.GrpcNodeConfig{{GRPCURL: "localhost:9090", GRPCInsecure: false}}, ConsensusNodeThreshold: 1, }) From cd821e65d735aaffade1370a92af4b9e0c630d49 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 15:51:37 +0300 Subject: [PATCH 04/17] Updated config for wardenkms Lint fixes Merge conflicts Added ClientsPool abstraction Build fixes Lint fix Lint fix Update keychain-sdk/tx_client_pool.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Code review fixes Merge branch 'feature/683-keychain-sdk-configure-multiple-nodes-endpoints' of github.com:warden-protocol/wardenprotocol into feature/683-keychain-sdk-configure-multiple-nodes-endpoints Added removal of processed requests Lint fix --- cmd/wardenkms/wardenkms.go | 57 ++++++-- keychain-sdk/config.go | 2 +- .../internal/tracker/status_tracker.go | 53 ++++++++ keychain-sdk/internal/tracker/tracker.go | 56 ++++---- keychain-sdk/internal/writer/writer.go | 19 ++- keychain-sdk/key_requests.go | 43 +++--- keychain-sdk/keychain.go | 103 +++------------ keychain-sdk/sign_requests.go | 40 +++--- keychain-sdk/tx_client_pool.go | 124 ++++++++++++++++++ 9 files changed, 326 insertions(+), 171 deletions(-) create mode 100644 keychain-sdk/internal/tracker/status_tracker.go create mode 100644 keychain-sdk/tx_client_pool.go diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 7ad9768fc..2d9480ab8 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/binary" + "encoding/json" "fmt" "log" "log/slog" @@ -20,11 +21,10 @@ import ( ) type Config struct { - ChainID string `env:"CHAIN_ID, default=warden"` - GRPCURLs []string `env:"GRPC_URLS, default=localhost:9090"` - GRPCInsecure bool `env:"GRPC_INSECURE, default=true"` - Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` - KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` + ChainID string `env:"CHAIN_ID, default=warden_1337-1"` + GRPCURLs GrpcNodeConfigDecoder `env:"GRPC_URLS, default=[{\"GRPCUrl\":\"localhost:9090\",\"GRPCInsecure\":true}] "` + Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` + KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` KeyringMnemonic string `env:"KEYRING_MNEMONIC, required"` KeyringPassword string `env:"KEYRING_PASSWORD, required"` @@ -39,7 +39,26 @@ type Config struct { LogLevel slog.Level `env:"LOG_LEVEL, default=debug"` - ConsensusNodeThreshold uint `env:"CONSENSUS_NODE_THRESHOLD, default=1"` + ConsensusNodeThreshold uint8 `env:"CONSENSUS_NODE_THRESHOLD, default=1"` +} + +type GrpcNodeConfig struct { + GRPCUrl string + GRPCInsecure bool +} + +type GrpcNodeConfigDecoder []GrpcNodeConfig + +func (sd *GrpcNodeConfigDecoder) Decode(value string) error { + nodeConfigs := make([]GrpcNodeConfig, 0) + + if err := json.Unmarshal([]byte(value), &nodeConfigs); err != nil { + return fmt.Errorf("invalid map json: %w", err) + } + + *sd = nodeConfigs + + return nil } func main() { @@ -58,12 +77,10 @@ func main() { return } - grpcConfigs := make([]keychain.GrpcNodeConfig, 0) - for url, grpcInsecure := range cfg.GRPCURLs { - grpcConfigs = append(grpcConfigs, keychain.GrpcNodeConfig{ - GRPCInsecure: grpcInsecure, - GRPCURL: url, - }) + grpcConfigs, err := mapGrpcConfig(cfg.GRPCURLs) + if err != nil { + logger.Error("failed to initialize grpc configs", "error", err) + return } app := keychain.NewApp(keychain.Config{ @@ -159,3 +176,19 @@ func bigEndianBytesFromUint32(n uint64) ([4]byte, error) { binary.BigEndian.PutUint32(b, uint32(n)) return [4]byte(b), nil } + +func mapGrpcConfig(value GrpcNodeConfigDecoder) ([]keychain.GrpcNodeConfig, error) { + if len(value) == 0 { + return nil, fmt.Errorf("GRPCUrls must be specified") + } + + result := make([]keychain.GrpcNodeConfig, 0) + for _, item := range value { + result = append(result, keychain.GrpcNodeConfig{ + GRPCInsecure: item.GRPCInsecure, + GRPCURL: item.GRPCUrl, + }) + } + + return result, nil +} diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index 13ade5615..e74e172ba 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -55,7 +55,7 @@ type Config struct { BasicConfig // ConsensusNodeThreshold represents the number of nodes required to execute a pending key/sign request. - ConsensusNodeThreshold uint + ConsensusNodeThreshold uint8 // GRPCURLs is the list of URLs of the gRPC server to connect to. // e.g. "localhost:9090" diff --git a/keychain-sdk/internal/tracker/status_tracker.go b/keychain-sdk/internal/tracker/status_tracker.go new file mode 100644 index 000000000..b4fdeca10 --- /dev/null +++ b/keychain-sdk/internal/tracker/status_tracker.go @@ -0,0 +1,53 @@ +package tracker + +import ( + "fmt" +) + +type status int + +const ( + statusSeen status = iota + statusProcessig = iota +) + +type statusTracker struct { + threshold uint8 + seensBy stringSet + status status +} + +type stringSet map[string]struct{} + +// add safely adds a string to the set +func (s stringSet) add(str string) bool { + if _, exists := s[str]; exists { + return false + } + s[str] = struct{}{} + return true +} + +func NewStatusTracker(threshold uint8) *statusTracker { + return &statusTracker{ + threshold: threshold, + seensBy: stringSet{}, + status: statusSeen, + } +} + +func (sh *statusTracker) MarkSeen(seenBy string) error { + if !sh.seensBy.add(seenBy) { + return fmt.Errorf("cannot mark as seen: already seen by %s", seenBy) + } + + if sh.status == statusProcessig { + return fmt.Errorf("is already in processing state") + } + + if uint64(len(sh.seensBy)) >= uint64(sh.threshold) { + sh.status = statusProcessig + } + + return nil +} diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index 1061cd7f4..b9eef17a0 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -4,57 +4,57 @@ import ( "sync" ) -type hashSet map[string]struct{} +type Action int + +const ( + ActionSkip Action = iota + ActionProcess +) type T struct { - rw sync.RWMutex - ingested map[uint64]hashSet + threshold uint8 + rw sync.RWMutex + ingested map[uint64]*statusTracker } -func New() *T { +func New(threshold uint8) *T { return &T{ - ingested: make(map[uint64]hashSet), + threshold: threshold, + ingested: make(map[uint64]*statusTracker), } } -func (t *T) IsNew(id uint64, nodeUrl string) bool { - t.rw.RLock() - defer t.rw.RUnlock() - value, exists := t.ingested[id] - if !exists { - return true +func (t *T) statusTracker(id uint64) *statusTracker { + value, ok := t.ingested[id] + if !ok { + t.ingested[id] = NewStatusTracker(t.threshold) + + return t.ingested[id] } - _, alreadySeen := value[nodeUrl] - return !alreadySeen + return value } -func (t *T) HasReachedConsensus(id uint64, threshold uint) bool { +func (t *T) Ingest(id uint64, ingesterId string) (Action, error) { t.rw.RLock() defer t.rw.RUnlock() - value, exists := t.ingested[id] - if !exists { - return false - } - return len(value) >= int(threshold) -} + value := t.statusTracker(id) -func (t *T) Ingested(id uint64, nodeUrl string) { - t.rw.Lock() - defer t.rw.Unlock() + if err := value.MarkSeen(ingesterId); err != nil { + return ActionSkip, err + } - value, ok := t.ingested[id] - if ok == false { - t.ingested[id] = make(hashSet) - value = t.ingested[id] + if value.status == statusProcessig { + return ActionProcess, nil } - value[nodeUrl] = struct{}{} + return ActionSkip, nil } func (t *T) Done(id uint64) { t.rw.Lock() defer t.rw.Unlock() + delete(t.ingested, id) } diff --git a/keychain-sdk/internal/writer/writer.go b/keychain-sdk/internal/writer/writer.go index 5a429d00e..aeacdb083 100644 --- a/keychain-sdk/internal/writer/writer.go +++ b/keychain-sdk/internal/writer/writer.go @@ -32,6 +32,11 @@ type W struct { batch Batch } +type SyncTxClient interface { + SendWaitTx(ctx context.Context, txBytes []byte) error + BuildTx(ctx context.Context, gasLimit uint64, fees sdk.Coins, msgers ...client.Msger) ([]byte, error) +} + func New( batchSize int, batchInterval time.Duration, @@ -46,9 +51,7 @@ func New( } } -type FnResolveLiveClient func() (*client.TxClient, error) - -func (w *W) Start(ctx context.Context, liveClientFn FnResolveLiveClient, flushErrors chan error) error { +func (w *W) Start(ctx context.Context, client SyncTxClient, flushErrors chan error) error { w.Logger.Info("starting tx writer") for { select { @@ -60,12 +63,8 @@ func (w *W) Start(ctx context.Context, liveClientFn FnResolveLiveClient, flushEr ctx, cancel = context.WithTimeout(ctx, w.TxTimeout) } - if txClient, err := liveClientFn(); err != nil { + if err := w.Flush(ctx, client); err != nil { flushErrors <- err - } else { - if err := w.Flush(ctx, txClient); err != nil { - flushErrors <- err - } } cancel() @@ -100,7 +99,7 @@ func (w *W) fees() sdk.Coins { return w.Fees } -func (w *W) Flush(ctx context.Context, txClient *client.TxClient) error { +func (w *W) Flush(ctx context.Context, txClient SyncTxClient) error { msgs := w.batch.Clear() if len(msgs) == 0 { w.Logger.Debug("flushing batch", "empty", true) @@ -128,7 +127,7 @@ func (w *W) Flush(ctx context.Context, txClient *client.TxClient) error { return nil } -func (w *W) sendWaitTx(ctx context.Context, txClient *client.TxClient, msgs ...client.Msger) error { +func (w *W) sendWaitTx(ctx context.Context, txClient SyncTxClient, msgs ...client.Msger) error { w.sendTxLock.Lock() defer w.sendTxLock.Unlock() diff --git a/keychain-sdk/key_requests.go b/keychain-sdk/key_requests.go index f690d9c77..254f38bac 100644 --- a/keychain-sdk/key_requests.go +++ b/keychain-sdk/key_requests.go @@ -7,6 +7,7 @@ import ( "time" "github.com/warden-protocol/wardenprotocol/go-client" + "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" ) @@ -57,29 +58,15 @@ func (w *keyResponseWriter) Reject(reason string) error { } func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, client *AppClient) { - ingestRequest := func(keyRequest *wardentypes.KeyRequest) { - if !a.keyRequestTracker.IsNew(keyRequest.Id, client.grpcUrl) { - a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) - return - } - - a.logger().Info("got key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) - a.keyRequestTracker.Ingested(keyRequest.Id, client.grpcUrl) - - if a.keyRequestTracker.HasReachedConsensus(keyRequest.Id, a.config.ConsensusNodeThreshold) { - keyRequestsCh <- keyRequest - } - } - for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - keyRequests, err := client.keyRequests(reqCtx) + keyRequests, err := client.keyRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) cancel() if err != nil { a.logger().Error("failed to get key requests", "error", err) } else { for _, keyRequest := range keyRequests { - ingestRequest(keyRequest) + a.ingestRequest(keyRequestsCh, keyRequest, client) } } @@ -87,6 +74,26 @@ func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, clie } } +func (a *App) ingestRequest( + keyRequestsCh chan *wardentypes.KeyRequest, + keyRequest *wardentypes.KeyRequest, + client *AppClient) { + action, err := a.keyRequestTracker.Ingest(keyRequest.Id, client.grpcUrl) + if err != nil { + a.logger().Error("failed to ingest key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl, "error", err) + return + } + + if action == tracker.ActionSkip { + a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) + return + } + + if action == tracker.ActionProcess { + keyRequestsCh <- keyRequest + } +} + func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { if a.keyRequestHandler == nil { a.logger().Error("key request handler not set") @@ -116,6 +123,6 @@ func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { }() } -func (a *AppClient) keyRequests(ctx context.Context) ([]*wardentypes.KeyRequest, error) { - return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(a.batchSize)}, a.keychainId) +func (a *AppClient) keyRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.KeyRequest, error) { + return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/keychain.go b/keychain-sdk/keychain.go index 2091946ae..e3039cc75 100644 --- a/keychain-sdk/keychain.go +++ b/keychain-sdk/keychain.go @@ -12,7 +12,6 @@ import ( "io" "log/slog" - "github.com/warden-protocol/wardenprotocol/go-client" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" @@ -31,25 +30,15 @@ type App struct { keyRequestTracker *tracker.T signRequestTracker *tracker.T - clients []*AppClient -} - -type AppClient struct { - grpcUrl string - grpcInsecure bool - batchSize int - keychainId uint64 - query *client.QueryClient - txClient *client.TxClient + clientsPool *ClientsPool } // NewApp creates a new Keychain application, using the given configuration. func NewApp(config Config) *App { return &App{ config: config, - keyRequestTracker: tracker.New(), - signRequestTracker: tracker.New(), - clients: []*AppClient{}, + keyRequestTracker: tracker.New(config.ConsensusNodeThreshold), + signRequestTracker: tracker.New(config.ConsensusNodeThreshold), } } @@ -75,27 +64,37 @@ func (a *App) SetSignRequestHandler(handler SignRequestHandler) { func (a *App) Start(ctx context.Context) error { a.logger().Info("starting keychain", "keychain_id", a.config.KeychainID) - err := a.initConnections() - if err != nil { + clientsPool := NewClientsPool(a.config) + if err := clientsPool.initConnections(a.logger()); err != nil { return fmt.Errorf("failed to init connections: %w", err) } + a.clientsPool = clientsPool + + a.txWriter = writer.New( + a.config.BatchSize, + a.config.BatchInterval, + a.config.TxTimeout, + a.logger()) + a.txWriter.Fees = a.config.TxFees + a.txWriter.GasLimit = a.config.GasLimit + keyRequestsCh := make(chan *wardentypes.KeyRequest) defer close(keyRequestsCh) - for _, appClient := range a.clients { + for _, appClient := range a.clientsPool.clients { go a.ingestKeyRequests(keyRequestsCh, appClient) } signRequestsCh := make(chan *wardentypes.SignRequest) defer close(signRequestsCh) - for _, appClient := range a.clients { + for _, appClient := range a.clientsPool.clients { go a.ingestSignRequests(signRequestsCh, appClient) } flushErrors := make(chan error) defer close(flushErrors) go func() { - if err := a.txWriter.Start(ctx, a.liveTxClient, flushErrors); err != nil { + if err := a.txWriter.Start(ctx, a.clientsPool, flushErrors); err != nil { a.logger().Error("tx writer exited with error", "error", err) } }() @@ -114,71 +113,7 @@ func (a *App) Start(ctx context.Context) error { } } -func (a *App) liveTxClient() (*client.TxClient, error) { - for _, appClient := range a.clients { - if state := appClient.query.Conn().GetState(); state == connectivity.Ready || state == connectivity.Idle { - return appClient.txClient, nil - } - } - - return nil, fmt.Errorf("all node clients are down") -} - // ConnectionState returns the current state of the gRPC connection. func (a *App) ConnectionState() map[string]connectivity.State { - statuses := make(map[string]connectivity.State) - - for _, appClient := range a.clients { - state := appClient.query.Conn().GetState() - statuses[appClient.grpcUrl] = state - } - - return statuses -} - -func (a *App) initConnections() error { - initConnection := func(logger *slog.Logger, grpcNodeConfig GrpcNodeConfig, config BasicConfig) (*AppClient, error) { - appClient := &AppClient{ - batchSize: config.BatchSize, - keychainId: config.KeychainID, - grpcUrl: grpcNodeConfig.GRPCURL, - grpcInsecure: grpcNodeConfig.GRPCInsecure, - } - - logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig, "insecure", grpcNodeConfig.GRPCInsecure) - - query, err := client.NewQueryClient(grpcNodeConfig.GRPCURL, grpcNodeConfig.GRPCInsecure) - if err != nil { - return nil, fmt.Errorf("failed to create query client: %w", err) - } - appClient.query = query - - conn := query.Conn() - - identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) - if err != nil { - return nil, fmt.Errorf("failed to create identity: %w", err) - } - - logger.Info("keychain writer identity", "address", identity.Address.String()) - - appClient.txClient = client.NewTxClient(identity, config.ChainID, conn, query) - - return appClient, nil - } - - for _, grpcUrl := range a.config.GRPCConfigs { - appClient, err := initConnection(a.logger(), grpcUrl, a.config.BasicConfig) - if err != nil { - return err - } - - a.clients = append(a.clients, appClient) - } - - a.txWriter = writer.New(a.config.BatchSize, a.config.BatchInterval, a.config.TxTimeout, a.logger()) - a.txWriter.Fees = a.config.TxFees - a.txWriter.GasLimit = a.config.GasLimit - - return nil + return a.clientsPool.ConnectionState() } diff --git a/keychain-sdk/sign_requests.go b/keychain-sdk/sign_requests.go index d29644416..9651ff94e 100644 --- a/keychain-sdk/sign_requests.go +++ b/keychain-sdk/sign_requests.go @@ -8,6 +8,7 @@ import ( "github.com/warden-protocol/wardenprotocol/go-client" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/enc" + "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" ) @@ -69,29 +70,15 @@ func (w *signResponseWriter) Reject(reason string) error { } func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, appClient *AppClient) { - ingestRequest := func(signRequest *wardentypes.SignRequest) { - if !a.signRequestTracker.IsNew(signRequest.Id, appClient.grpcUrl) { - a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) - return - } - - a.logger().Info("got sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) - a.signRequestTracker.Ingested(signRequest.Id, appClient.grpcUrl) - - if a.signRequestTracker.HasReachedConsensus(signRequest.Id, a.config.ConsensusNodeThreshold) { - signRequestsCh <- signRequest - } - } - for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - signRequests, err := appClient.signRequests(reqCtx) + signRequests, err := appClient.signRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) cancel() if err != nil { a.logger().Error("failed to get sign requests", "error", err) } else { for _, signRequest := range signRequests { - ingestRequest(signRequest) + a.ingestSignRequest(signRequestsCh, signRequest, appClient) } } @@ -99,6 +86,23 @@ func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, a } } +func (a *App) ingestSignRequest(signRequestsCh chan *wardentypes.SignRequest, signRequest *wardentypes.SignRequest, appClient *AppClient) { + action, err := a.keyRequestTracker.Ingest(signRequest.Id, appClient.grpcUrl) + if err != nil { + a.logger().Error("failed to ingest sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl, "error", err) + return + } + + if action == tracker.ActionSkip { + a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) + return + } + + if action == tracker.ActionProcess { + signRequestsCh <- signRequest + } +} + func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { if a.signRequestHandler == nil { a.logger().Error("sign request handler not set") @@ -136,6 +140,6 @@ func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { }() } -func (a *AppClient) signRequests(ctx context.Context) ([]*wardentypes.SignRequest, error) { - return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(a.batchSize)}, a.keychainId) +func (a *AppClient) signRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.SignRequest, error) { + return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/tx_client_pool.go b/keychain-sdk/tx_client_pool.go new file mode 100644 index 000000000..ba16199eb --- /dev/null +++ b/keychain-sdk/tx_client_pool.go @@ -0,0 +1,124 @@ +package keychain + +import ( + "context" + "fmt" + "log/slog" + + sdkTypes "github.com/cosmos/cosmos-sdk/types" + client "github.com/warden-protocol/wardenprotocol/go-client" + "google.golang.org/grpc/connectivity" +) + +type AppClient struct { + grpcUrl string + grpcInsecure bool + query *client.QueryClient + txClient *client.TxClient +} + +type ClientsPool struct { + clients []*AppClient + config Config +} + +func NewClientsPool(config Config) *ClientsPool { + pool := ClientsPool{ + clients: make([]*AppClient, 0), + config: config, + } + + return &pool +} + +func (a *ClientsPool) initConnections(logger *slog.Logger) error { + identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) + if err != nil { + return fmt.Errorf("failed to create identity: %w", err) + } + + for _, grpcUrl := range a.config.GRPCConfigs { + appClient, err := a.initConnection(logger, grpcUrl, a.config.BasicConfig, identity) + if err != nil { + return err + } + + a.clients = append(a.clients, appClient) + } + + return nil +} + +func (a *ClientsPool) initConnection( + logger *slog.Logger, + grpcNodeConfig GrpcNodeConfig, + config BasicConfig, + identity client.Identity) (*AppClient, error) { + appClient := &AppClient{ + grpcUrl: grpcNodeConfig.GRPCURL, + grpcInsecure: grpcNodeConfig.GRPCInsecure, + } + + logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig.GRPCURL, "insecure", grpcNodeConfig.GRPCInsecure) + + query, err := client.NewQueryClient(grpcNodeConfig.GRPCURL, grpcNodeConfig.GRPCInsecure) + if err != nil { + return nil, fmt.Errorf("failed to create query client: %w", err) + } + appClient.query = query + + conn := query.Conn() + + logger.Info("keychain writer identity", "address", identity.Address.String()) + + appClient.txClient = client.NewTxClient(identity, config.ChainID, conn, query) + + return appClient, nil +} + +func (a *ClientsPool) liveTxClient() (*client.TxClient, error) { + for _, appClient := range a.clients { + if state := appClient.query.Conn().GetState(); isOnline(state) { + return appClient.txClient, nil + } + } + + return nil, fmt.Errorf("all node clients are down") +} + +func isOnline(state connectivity.State) bool { + return state == connectivity.Ready || state == connectivity.Idle +} + +func (p *ClientsPool) BuildTx( + ctx context.Context, + gasLimit uint64, + fees sdkTypes.Coins, + msgers ...client.Msger) ([]byte, error) { + liveClient, err := p.liveTxClient() + if err != nil { + return nil, fmt.Errorf("failed to aquire live client for BuildTx: %w", err) + } + + return liveClient.BuildTx(ctx, gasLimit, fees, msgers...) +} + +func (p *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { + liveClient, err := p.liveTxClient() + if err != nil { + return fmt.Errorf("failed to aquire live client for SendWaitTx: %w", err) + } + + return liveClient.SendWaitTx(ctx, txBytes) +} + +func (a *ClientsPool) ConnectionState() map[string]connectivity.State { + statuses := make(map[string]connectivity.State) + + for _, appClient := range a.clients { + state := appClient.query.Conn().GetState() + statuses[appClient.grpcUrl] = state + } + + return statuses +} From dc209fd44f64a787a21e1204680bff4d2eb83fd2 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 16:33:06 +0300 Subject: [PATCH 05/17] Fixed locking --- keychain-sdk/internal/tracker/tracker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index b9eef17a0..fa9d575e1 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -36,8 +36,8 @@ func (t *T) statusTracker(id uint64) *statusTracker { } func (t *T) Ingest(id uint64, ingesterId string) (Action, error) { - t.rw.RLock() - defer t.rw.RUnlock() + t.rw.Lock() + defer t.rw.Unlock() value := t.statusTracker(id) From 90d8ca51460690d0a1917be576dc3c073cebd8b1 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 16:38:03 +0300 Subject: [PATCH 06/17] Typo --- keychain-sdk/internal/tracker/status_tracker.go | 8 ++++---- keychain-sdk/internal/tracker/tracker.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/keychain-sdk/internal/tracker/status_tracker.go b/keychain-sdk/internal/tracker/status_tracker.go index b4fdeca10..32457a991 100644 --- a/keychain-sdk/internal/tracker/status_tracker.go +++ b/keychain-sdk/internal/tracker/status_tracker.go @@ -7,8 +7,8 @@ import ( type status int const ( - statusSeen status = iota - statusProcessig = iota + statusSeen status = iota + statusProcessing ) type statusTracker struct { @@ -41,12 +41,12 @@ func (sh *statusTracker) MarkSeen(seenBy string) error { return fmt.Errorf("cannot mark as seen: already seen by %s", seenBy) } - if sh.status == statusProcessig { + if sh.status == statusProcessing { return fmt.Errorf("is already in processing state") } if uint64(len(sh.seensBy)) >= uint64(sh.threshold) { - sh.status = statusProcessig + sh.status = statusProcessing } return nil diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index fa9d575e1..defbc86db 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -45,7 +45,7 @@ func (t *T) Ingest(id uint64, ingesterId string) (Action, error) { return ActionSkip, err } - if value.status == statusProcessig { + if value.status == statusProcessing { return ActionProcess, nil } From ab221264a3d8d33a825f972a39cd8a8a49ef7dac Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 16:50:31 +0300 Subject: [PATCH 07/17] Simplified a bit --- .../internal/tracker/status_tracker.go | 53 ------------------- keychain-sdk/internal/tracker/tracker.go | 28 +++++++--- 2 files changed, 20 insertions(+), 61 deletions(-) delete mode 100644 keychain-sdk/internal/tracker/status_tracker.go diff --git a/keychain-sdk/internal/tracker/status_tracker.go b/keychain-sdk/internal/tracker/status_tracker.go deleted file mode 100644 index 32457a991..000000000 --- a/keychain-sdk/internal/tracker/status_tracker.go +++ /dev/null @@ -1,53 +0,0 @@ -package tracker - -import ( - "fmt" -) - -type status int - -const ( - statusSeen status = iota - statusProcessing -) - -type statusTracker struct { - threshold uint8 - seensBy stringSet - status status -} - -type stringSet map[string]struct{} - -// add safely adds a string to the set -func (s stringSet) add(str string) bool { - if _, exists := s[str]; exists { - return false - } - s[str] = struct{}{} - return true -} - -func NewStatusTracker(threshold uint8) *statusTracker { - return &statusTracker{ - threshold: threshold, - seensBy: stringSet{}, - status: statusSeen, - } -} - -func (sh *statusTracker) MarkSeen(seenBy string) error { - if !sh.seensBy.add(seenBy) { - return fmt.Errorf("cannot mark as seen: already seen by %s", seenBy) - } - - if sh.status == statusProcessing { - return fmt.Errorf("is already in processing state") - } - - if uint64(len(sh.seensBy)) >= uint64(sh.threshold) { - sh.status = statusProcessing - } - - return nil -} diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index defbc86db..87ce89129 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -1,6 +1,7 @@ package tracker import ( + "fmt" "sync" ) @@ -11,23 +12,34 @@ const ( ActionProcess ) +type stringSet map[string]struct{} + +// add safely adds a string to the set +func (s stringSet) add(str string) bool { + if _, exists := s[str]; exists { + return false + } + s[str] = struct{}{} + return true +} + type T struct { threshold uint8 rw sync.RWMutex - ingested map[uint64]*statusTracker + ingested map[uint64]stringSet } func New(threshold uint8) *T { return &T{ threshold: threshold, - ingested: make(map[uint64]*statusTracker), + ingested: make(map[uint64]stringSet), } } -func (t *T) statusTracker(id uint64) *statusTracker { +func (t *T) ingestTracker(id uint64) stringSet { value, ok := t.ingested[id] if !ok { - t.ingested[id] = NewStatusTracker(t.threshold) + t.ingested[id] = make(stringSet) return t.ingested[id] } @@ -39,13 +51,13 @@ func (t *T) Ingest(id uint64, ingesterId string) (Action, error) { t.rw.Lock() defer t.rw.Unlock() - value := t.statusTracker(id) + value := t.ingestTracker(id) - if err := value.MarkSeen(ingesterId); err != nil { - return ActionSkip, err + if !value.add(ingesterId) { + return ActionSkip, fmt.Errorf("already ingested") } - if value.status == statusProcessing { + if uint64(len(value)) == uint64(t.threshold) { return ActionProcess, nil } From 9b4c3a7888ae3ed95f109b6ce8e63b6a254d4f38 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 16:52:00 +0300 Subject: [PATCH 08/17] Code review --- keychain-sdk/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index e74e172ba..748e780e4 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -67,7 +67,7 @@ type GrpcNodeConfig struct { // gRPC server. GRPCInsecure bool - // GRPCURLs is the list of URLs of the gRPC server to connect to. + // GRPCURL is the URL of the gRPC server to connect to. // e.g. "localhost:9090" GRPCURL string } From b1fe1deeb682fcc57247d008def784cfbf3e971f Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 16:53:47 +0300 Subject: [PATCH 09/17] Code review --- cmd/wardenkms/wardenkms.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 2d9480ab8..fd2a1496f 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -178,12 +178,17 @@ func bigEndianBytesFromUint32(n uint64) ([4]byte, error) { } func mapGrpcConfig(value GrpcNodeConfigDecoder) ([]keychain.GrpcNodeConfig, error) { - if len(value) == 0 { + var nodesLength = len(value) + if nodesLength == 0 { return nil, fmt.Errorf("GRPCUrls must be specified") } - result := make([]keychain.GrpcNodeConfig, 0) + result := make([]keychain.GrpcNodeConfig, 0, nodesLength) for _, item := range value { + if item.GRPCUrl == "" { + return nil, fmt.Errorf("GRPCUrl must be specified") + } + result = append(result, keychain.GrpcNodeConfig{ GRPCInsecure: item.GRPCInsecure, GRPCURL: item.GRPCUrl, From fcd81672cc58baaa2f8a9269f485a68bd920955f Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Thu, 24 Oct 2024 17:02:14 +0300 Subject: [PATCH 10/17] Changed status check --- cmd/wardenkms/wardenkms.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index fd2a1496f..70e3313c0 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -149,13 +149,19 @@ func main() { if cfg.HttpAddr != "" { logger.Info("starting HTTP server", "addr", cfg.HttpAddr) http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) { + + readyConnectionsCount := uint(0) for _, state := range app.ConnectionState() { if state == connectivity.Ready { - w.WriteHeader(http.StatusOK) - return + readyConnectionsCount += 1 } } + if readyConnectionsCount >= uint(cfg.ConsensusNodeThreshold) { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) }) go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }() From 31bbb3a16a348677fb354b4656728574ea920638 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Fri, 25 Oct 2024 12:07:42 +0300 Subject: [PATCH 11/17] Fixed --- cmd/wardenkms/health_check_response.go | 23 ++++++++++++++++++++ cmd/wardenkms/wardenkms.go | 29 ++++++++++++++++++++++---- docker-compose.yml | 2 +- keychain-sdk/tx_client_pool.go | 28 ++++++++++++------------- warden/x/act/client/actions.go | 2 +- 5 files changed, 64 insertions(+), 20 deletions(-) create mode 100644 cmd/wardenkms/health_check_response.go diff --git a/cmd/wardenkms/health_check_response.go b/cmd/wardenkms/health_check_response.go new file mode 100644 index 000000000..65b19a98e --- /dev/null +++ b/cmd/wardenkms/health_check_response.go @@ -0,0 +1,23 @@ +package main + +type HealthCheckResponse struct { + // The number of nodes that are online + Online uint `json:"online"` + + // The total number of nodes + Total uint `json:"total"` + + // The consensus threshold + Threshold uint8 `json:"threshold"` + + // Node statuses + Nodes []NodeStatus `json:"nodes"` +} + +type NodeStatus struct { + // The address of the node + Address string `json:"address"` + + // The status of the node + Status string `json:"status"` +} diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 70e3313c0..c469ed353 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -93,7 +93,7 @@ func main() { BatchInterval: cfg.BatchInterval, BatchSize: cfg.BatchSize, TxTimeout: cfg.TxTimeout, - TxFees: sdk.NewCoins(sdk.NewCoin("uward", math.NewInt(cfg.TxFee))), + TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), }, GRPCConfigs: grpcConfigs, ConsensusNodeThreshold: cfg.ConsensusNodeThreshold, @@ -149,20 +149,41 @@ func main() { if cfg.HttpAddr != "" { logger.Info("starting HTTP server", "addr", cfg.HttpAddr) http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) { + connectionStates := app.ConnectionState() readyConnectionsCount := uint(0) - for _, state := range app.ConnectionState() { + nodes := make([]NodeStatus, 0, len(connectionStates)) + + for url, state := range connectionStates { if state == connectivity.Ready { readyConnectionsCount += 1 } + + nodes = append(nodes, NodeStatus{ + Address: url, + Status: state.String(), + }) + } + + bytes, err := json.Marshal(HealthCheckResponse{ + Online: readyConnectionsCount, + Total: uint(len(connectionStates)), + Nodes: nodes, + Threshold: cfg.ConsensusNodeThreshold, + }) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return } if readyConnectionsCount >= uint(cfg.ConsensusNodeThreshold) { w.WriteHeader(http.StatusOK) - return + } else { + w.WriteHeader(http.StatusServiceUnavailable) } - w.WriteHeader(http.StatusServiceUnavailable) + w.Write(bytes) }) go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }() } diff --git a/docker-compose.yml b/docker-compose.yml index 9b24f856c..7131100b2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: WARDEND_GRPC_ADDRESS: 0.0.0.0:9090 WARDEND_RPC_LADDR: tcp://0.0.0.0:26657 WARDEND_RPC_CORS_ALLOWED_ORIGINS: "*" - WARDEND_MINIMUM_GAS_PRICES: "0uward" + WARDEND_MINIMUM_GAS_PRICES: "0award" faucet: build: diff --git a/keychain-sdk/tx_client_pool.go b/keychain-sdk/tx_client_pool.go index ba16199eb..856a9c291 100644 --- a/keychain-sdk/tx_client_pool.go +++ b/keychain-sdk/tx_client_pool.go @@ -31,25 +31,25 @@ func NewClientsPool(config Config) *ClientsPool { return &pool } -func (a *ClientsPool) initConnections(logger *slog.Logger) error { - identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) +func (cp *ClientsPool) initConnections(logger *slog.Logger) error { + identity, err := client.NewIdentityFromSeed(cp.config.Mnemonic) if err != nil { return fmt.Errorf("failed to create identity: %w", err) } - for _, grpcUrl := range a.config.GRPCConfigs { - appClient, err := a.initConnection(logger, grpcUrl, a.config.BasicConfig, identity) + for _, grpcUrl := range cp.config.GRPCConfigs { + appClient, err := cp.initConnection(logger, grpcUrl, cp.config.BasicConfig, identity) if err != nil { return err } - a.clients = append(a.clients, appClient) + cp.clients = append(cp.clients, appClient) } return nil } -func (a *ClientsPool) initConnection( +func (cp *ClientsPool) initConnection( logger *slog.Logger, grpcNodeConfig GrpcNodeConfig, config BasicConfig, @@ -76,8 +76,8 @@ func (a *ClientsPool) initConnection( return appClient, nil } -func (a *ClientsPool) liveTxClient() (*client.TxClient, error) { - for _, appClient := range a.clients { +func (cp *ClientsPool) liveTxClient() (*client.TxClient, error) { + for _, appClient := range cp.clients { if state := appClient.query.Conn().GetState(); isOnline(state) { return appClient.txClient, nil } @@ -90,12 +90,12 @@ func isOnline(state connectivity.State) bool { return state == connectivity.Ready || state == connectivity.Idle } -func (p *ClientsPool) BuildTx( +func (cp *ClientsPool) BuildTx( ctx context.Context, gasLimit uint64, fees sdkTypes.Coins, msgers ...client.Msger) ([]byte, error) { - liveClient, err := p.liveTxClient() + liveClient, err := cp.liveTxClient() if err != nil { return nil, fmt.Errorf("failed to aquire live client for BuildTx: %w", err) } @@ -103,8 +103,8 @@ func (p *ClientsPool) BuildTx( return liveClient.BuildTx(ctx, gasLimit, fees, msgers...) } -func (p *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { - liveClient, err := p.liveTxClient() +func (cp *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { + liveClient, err := cp.liveTxClient() if err != nil { return fmt.Errorf("failed to aquire live client for SendWaitTx: %w", err) } @@ -112,10 +112,10 @@ func (p *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { return liveClient.SendWaitTx(ctx, txBytes) } -func (a *ClientsPool) ConnectionState() map[string]connectivity.State { +func (cp *ClientsPool) ConnectionState() map[string]connectivity.State { statuses := make(map[string]connectivity.State) - for _, appClient := range a.clients { + for _, appClient := range cp.clients { state := appClient.query.Conn().GetState() statuses[appClient.grpcUrl] = state } diff --git a/warden/x/act/client/actions.go b/warden/x/act/client/actions.go index 998aa7ba5..e4347c48f 100644 --- a/warden/x/act/client/actions.go +++ b/warden/x/act/client/actions.go @@ -141,7 +141,7 @@ func addFlagsFromMsg(msg sdk.Msg, cmd *cobra.Command) { cmd.Flags().BytesBase64(flagName, nil, "") case reflect.Struct: if v.Field(i).Type().Elem().AssignableTo(reflect.TypeOf(sdk.Coin{})) { - cmd.Flags().StringSlice(flagName, nil, "(e.g. 10uward)") + cmd.Flags().StringSlice(flagName, nil, "(e.g. 10award)") } else { panic(fmt.Sprintf("unsupported slice type %v (for field %s)", v.Field(i).Type().Elem().Kind(), fieldName)) } From 3980febda2f035350fcebc4378220a685c641a14 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Fri, 25 Oct 2024 13:34:02 +0300 Subject: [PATCH 12/17] Omit err check --- cmd/wardenkms/wardenkms.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index c469ed353..329270b9f 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -183,7 +183,7 @@ func main() { w.WriteHeader(http.StatusServiceUnavailable) } - w.Write(bytes) + _, _ = w.Write(bytes) }) go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }() } From b5923b8dfad464917a5bcd845cafe016d88f043b Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Wed, 20 Nov 2024 17:14:35 +0300 Subject: [PATCH 13/17] uward revert --- warden/x/act/client/actions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warden/x/act/client/actions.go b/warden/x/act/client/actions.go index e4347c48f..998aa7ba5 100644 --- a/warden/x/act/client/actions.go +++ b/warden/x/act/client/actions.go @@ -141,7 +141,7 @@ func addFlagsFromMsg(msg sdk.Msg, cmd *cobra.Command) { cmd.Flags().BytesBase64(flagName, nil, "") case reflect.Struct: if v.Field(i).Type().Elem().AssignableTo(reflect.TypeOf(sdk.Coin{})) { - cmd.Flags().StringSlice(flagName, nil, "(e.g. 10award)") + cmd.Flags().StringSlice(flagName, nil, "(e.g. 10uward)") } else { panic(fmt.Sprintf("unsupported slice type %v (for field %s)", v.Field(i).Type().Elem().Kind(), fieldName)) } From d4c3937cb232e28ae9f4ac59d20f22a7bee4c98f Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Wed, 20 Nov 2024 17:14:50 +0300 Subject: [PATCH 14/17] Fixed wardenkms startup --- cmd/wardenkms/wardenkms.go | 52 +++++--------------------------------- 1 file changed, 6 insertions(+), 46 deletions(-) diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 329270b9f..576ffd330 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -21,10 +21,10 @@ import ( ) type Config struct { - ChainID string `env:"CHAIN_ID, default=warden_1337-1"` - GRPCURLs GrpcNodeConfigDecoder `env:"GRPC_URLS, default=[{\"GRPCUrl\":\"localhost:9090\",\"GRPCInsecure\":true}] "` - Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` - KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` + ChainID string `env:"CHAIN_ID, default=warden_1337-1"` + GRPCURLs string `env:"GRPC_URLS, default=[{\"GRPCUrl\":\"localhost:9090\",\"GRPCInsecure\":true}]"` + Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` + KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` KeyringMnemonic string `env:"KEYRING_MNEMONIC, required"` KeyringPassword string `env:"KEYRING_PASSWORD, required"` @@ -42,25 +42,6 @@ type Config struct { ConsensusNodeThreshold uint8 `env:"CONSENSUS_NODE_THRESHOLD, default=1"` } -type GrpcNodeConfig struct { - GRPCUrl string - GRPCInsecure bool -} - -type GrpcNodeConfigDecoder []GrpcNodeConfig - -func (sd *GrpcNodeConfigDecoder) Decode(value string) error { - nodeConfigs := make([]GrpcNodeConfig, 0) - - if err := json.Unmarshal([]byte(value), &nodeConfigs); err != nil { - return fmt.Errorf("invalid map json: %w", err) - } - - *sd = nodeConfigs - - return nil -} - func main() { var cfg Config if err := envconfig.Process(context.Background(), &cfg); err != nil { @@ -77,8 +58,8 @@ func main() { return } - grpcConfigs, err := mapGrpcConfig(cfg.GRPCURLs) - if err != nil { + var grpcConfigs []keychain.GrpcNodeConfig + if err := json.Unmarshal([]byte(cfg.GRPCURLs), &grpcConfigs); err != nil { logger.Error("failed to initialize grpc configs", "error", err) return } @@ -203,24 +184,3 @@ func bigEndianBytesFromUint32(n uint64) ([4]byte, error) { binary.BigEndian.PutUint32(b, uint32(n)) return [4]byte(b), nil } - -func mapGrpcConfig(value GrpcNodeConfigDecoder) ([]keychain.GrpcNodeConfig, error) { - var nodesLength = len(value) - if nodesLength == 0 { - return nil, fmt.Errorf("GRPCUrls must be specified") - } - - result := make([]keychain.GrpcNodeConfig, 0, nodesLength) - for _, item := range value { - if item.GRPCUrl == "" { - return nil, fmt.Errorf("GRPCUrl must be specified") - } - - result = append(result, keychain.GrpcNodeConfig{ - GRPCInsecure: item.GRPCInsecure, - GRPCURL: item.GRPCUrl, - }) - } - - return result, nil -} From 57bd353fcfcab4654264a6c18ef735bd0435969e Mon Sep 17 00:00:00 2001 From: backsapc Date: Fri, 22 Nov 2024 17:49:35 +0300 Subject: [PATCH 15/17] Update keychain-sdk/tx_client_pool.go Co-authored-by: Antonio Pitasi --- keychain-sdk/tx_client_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keychain-sdk/tx_client_pool.go b/keychain-sdk/tx_client_pool.go index 856a9c291..f3977a80c 100644 --- a/keychain-sdk/tx_client_pool.go +++ b/keychain-sdk/tx_client_pool.go @@ -106,7 +106,7 @@ func (cp *ClientsPool) BuildTx( func (cp *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { liveClient, err := cp.liveTxClient() if err != nil { - return fmt.Errorf("failed to aquire live client for SendWaitTx: %w", err) + return fmt.Errorf("failed to acquire live client for SendWaitTx: %w", err) } return liveClient.SendWaitTx(ctx, txBytes) From 6a545f3c769a9208521d2bc598ea1cbd176b24e0 Mon Sep 17 00:00:00 2001 From: backsapc Date: Fri, 22 Nov 2024 17:50:22 +0300 Subject: [PATCH 16/17] Update keychain-sdk/example_keychain_test.go Co-authored-by: Antonio Pitasi --- keychain-sdk/example_keychain_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keychain-sdk/example_keychain_test.go b/keychain-sdk/example_keychain_test.go index 0ae840c1e..8a17ffabd 100644 --- a/keychain-sdk/example_keychain_test.go +++ b/keychain-sdk/example_keychain_test.go @@ -22,7 +22,7 @@ func Main() { Logger: logger, // not required, but recommended // setup the connection to the Warden Protocol node - ChainID: "warden", + ChainID: "warden_1337-1", // setup the account used to write txs KeychainID: 1, From 3642a15b2892d0106b9b4faeceea74201249b384 Mon Sep 17 00:00:00 2001 From: Aleksandr Tretiakov Date: Wed, 27 Nov 2024 15:20:06 +0300 Subject: [PATCH 17/17] Code review comments --- cmd/wardenkms/health_check_response.go | 12 +++--- cmd/wardenkms/wardenkms.go | 24 ++++++------ docker-compose.yml | 2 +- keychain-sdk/config.go | 23 +++++------ keychain-sdk/example_keychain_test.go | 25 ++++++------ keychain-sdk/internal/writer/writer.go | 2 +- keychain-sdk/key_requests.go | 12 +++--- keychain-sdk/keychain.go | 4 +- keychain-sdk/sign_requests.go | 12 +++--- keychain-sdk/tx_client_pool.go | 54 +++++++++++++------------- 10 files changed, 80 insertions(+), 90 deletions(-) diff --git a/cmd/wardenkms/health_check_response.go b/cmd/wardenkms/health_check_response.go index 65b19a98e..72cd77d78 100644 --- a/cmd/wardenkms/health_check_response.go +++ b/cmd/wardenkms/health_check_response.go @@ -1,23 +1,23 @@ package main type HealthCheckResponse struct { - // The number of nodes that are online + // Online is the number of nodes that are online Online uint `json:"online"` - // The total number of nodes + // Total is the total number of nodes Total uint `json:"total"` - // The consensus threshold + // Threshold is the consensus threshold Threshold uint8 `json:"threshold"` - // Node statuses + // Nodes is a node statuses collection Nodes []NodeStatus `json:"nodes"` } type NodeStatus struct { - // The address of the node + // Address is the address of the node Address string `json:"address"` - // The status of the node + // Status is the status of the node Status string `json:"status"` } diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 576ffd330..f098e0b2b 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -58,25 +58,23 @@ func main() { return } - var grpcConfigs []keychain.GrpcNodeConfig + var grpcConfigs []keychain.GRPCNodeConfig if err := json.Unmarshal([]byte(cfg.GRPCURLs), &grpcConfigs); err != nil { logger.Error("failed to initialize grpc configs", "error", err) return } app := keychain.NewApp(keychain.Config{ - BasicConfig: keychain.BasicConfig{ - Logger: logger, - ChainID: cfg.ChainID, - Mnemonic: cfg.Mnemonic, - KeychainID: cfg.KeychainId, - GasLimit: cfg.GasLimit, - BatchInterval: cfg.BatchInterval, - BatchSize: cfg.BatchSize, - TxTimeout: cfg.TxTimeout, - TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), - }, - GRPCConfigs: grpcConfigs, + Logger: logger, + ChainID: cfg.ChainID, + Mnemonic: cfg.Mnemonic, + KeychainID: cfg.KeychainId, + GasLimit: cfg.GasLimit, + BatchInterval: cfg.BatchInterval, + BatchSize: cfg.BatchSize, + TxTimeout: cfg.TxTimeout, + TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), + Nodes: grpcConfigs, ConsensusNodeThreshold: cfg.ConsensusNodeThreshold, }) diff --git a/docker-compose.yml b/docker-compose.yml index 7131100b2..9b24f856c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: WARDEND_GRPC_ADDRESS: 0.0.0.0:9090 WARDEND_RPC_LADDR: tcp://0.0.0.0:26657 WARDEND_RPC_CORS_ALLOWED_ORIGINS: "*" - WARDEND_MINIMUM_GAS_PRICES: "0award" + WARDEND_MINIMUM_GAS_PRICES: "0uward" faucet: build: diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index 748e780e4..56e8e83a8 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -7,7 +7,8 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -type BasicConfig struct { +// Config is the configuration for the Keychain. +type Config struct { // Logger is the logger to use for the Keychain. // If nil, no logging will be done. Logger *slog.Logger @@ -48,26 +49,20 @@ type BasicConfig struct { // If the transaction isn't included in a block, it will be considered as // failed (but the blockchain might still include in a block later). TxTimeout time.Duration -} - -// Config is the configuration for the Keychain. -type Config struct { - BasicConfig // ConsensusNodeThreshold represents the number of nodes required to execute a pending key/sign request. ConsensusNodeThreshold uint8 - // GRPCURLs is the list of URLs of the gRPC server to connect to. - // e.g. "localhost:9090" - GRPCConfigs []GrpcNodeConfig + // Nodes is the list of URLs of the gRPC server to connect to. + Nodes []GRPCNodeConfig } -type GrpcNodeConfig struct { - // GRPCInsecure determines whether to allow an insecure connection to the +type GRPCNodeConfig struct { + // Insecure determines whether to allow an insecure connection to the // gRPC server. - GRPCInsecure bool + Insecure bool - // GRPCURL is the URL of the gRPC server to connect to. + // Host is the URL of the gRPC server to connect to. // e.g. "localhost:9090" - GRPCURL string + Host string } diff --git a/keychain-sdk/example_keychain_test.go b/keychain-sdk/example_keychain_test.go index 8a17ffabd..64ec11852 100644 --- a/keychain-sdk/example_keychain_test.go +++ b/keychain-sdk/example_keychain_test.go @@ -18,23 +18,20 @@ func Main() { })) app := keychain.NewApp(keychain.Config{ - BasicConfig: keychain.BasicConfig{ - Logger: logger, // not required, but recommended + Logger: logger, // not required, but recommended - // setup the connection to the Warden Protocol node - ChainID: "warden_1337-1", + // setup the connection to the Warden Protocol node + ChainID: "warden_1337-1", - // setup the account used to write txs - KeychainID: 1, - Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", + // setup the account used to write txs + KeychainID: 1, + Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", - // setup throughput for batching responses - GasLimit: 400000, - BatchInterval: 8 * time.Second, - BatchSize: 10, - }, - - GRPCConfigs: []keychain.GrpcNodeConfig{{GRPCURL: "localhost:9090", GRPCInsecure: false}}, + // setup throughput for batching responses + GasLimit: 400000, + BatchInterval: 8 * time.Second, + BatchSize: 10, + Nodes: []keychain.GRPCNodeConfig{{Host: "localhost:9090", Insecure: false}}, ConsensusNodeThreshold: 1, }) diff --git a/keychain-sdk/internal/writer/writer.go b/keychain-sdk/internal/writer/writer.go index 1cd1ca4b8..3e9f8726c 100644 --- a/keychain-sdk/internal/writer/writer.go +++ b/keychain-sdk/internal/writer/writer.go @@ -33,7 +33,7 @@ type W struct { } type SyncTxClient interface { - SendWaitTx(ctx context.Context, txBytes []byte) error + SendWaitTx(ctx context.Context, txBytes []byte) (string, error) BuildTx(ctx context.Context, gasLimit uint64, fees sdk.Coins, msgers ...client.Msger) ([]byte, error) } diff --git a/keychain-sdk/key_requests.go b/keychain-sdk/key_requests.go index 254f38bac..9ad41dd76 100644 --- a/keychain-sdk/key_requests.go +++ b/keychain-sdk/key_requests.go @@ -57,7 +57,7 @@ func (w *keyResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, client *AppClient) { +func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, client *wardenClient) { for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) keyRequests, err := client.keyRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) @@ -77,15 +77,15 @@ func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, clie func (a *App) ingestRequest( keyRequestsCh chan *wardentypes.KeyRequest, keyRequest *wardentypes.KeyRequest, - client *AppClient) { - action, err := a.keyRequestTracker.Ingest(keyRequest.Id, client.grpcUrl) + client *wardenClient) { + action, err := a.keyRequestTracker.Ingest(keyRequest.Id, client.grpcURL) if err != nil { - a.logger().Error("failed to ingest key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl, "error", err) + a.logger().Error("failed to ingest key request", "id", keyRequest.Id, "grpcUrl", client.grpcURL, "error", err) return } if action == tracker.ActionSkip { - a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcUrl) + a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcURL) return } @@ -123,6 +123,6 @@ func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { }() } -func (a *AppClient) keyRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.KeyRequest, error) { +func (a *wardenClient) keyRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.KeyRequest, error) { return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/keychain.go b/keychain-sdk/keychain.go index e3039cc75..474dc69d0 100644 --- a/keychain-sdk/keychain.go +++ b/keychain-sdk/keychain.go @@ -30,7 +30,7 @@ type App struct { keyRequestTracker *tracker.T signRequestTracker *tracker.T - clientsPool *ClientsPool + clientsPool *clientsPool } // NewApp creates a new Keychain application, using the given configuration. @@ -64,7 +64,7 @@ func (a *App) SetSignRequestHandler(handler SignRequestHandler) { func (a *App) Start(ctx context.Context) error { a.logger().Info("starting keychain", "keychain_id", a.config.KeychainID) - clientsPool := NewClientsPool(a.config) + clientsPool := newClientsPool(a.config) if err := clientsPool.initConnections(a.logger()); err != nil { return fmt.Errorf("failed to init connections: %w", err) } diff --git a/keychain-sdk/sign_requests.go b/keychain-sdk/sign_requests.go index 9651ff94e..104c19236 100644 --- a/keychain-sdk/sign_requests.go +++ b/keychain-sdk/sign_requests.go @@ -69,7 +69,7 @@ func (w *signResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, appClient *AppClient) { +func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, appClient *wardenClient) { for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) signRequests, err := appClient.signRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) @@ -86,15 +86,15 @@ func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, a } } -func (a *App) ingestSignRequest(signRequestsCh chan *wardentypes.SignRequest, signRequest *wardentypes.SignRequest, appClient *AppClient) { - action, err := a.keyRequestTracker.Ingest(signRequest.Id, appClient.grpcUrl) +func (a *App) ingestSignRequest(signRequestsCh chan *wardentypes.SignRequest, signRequest *wardentypes.SignRequest, appClient *wardenClient) { + action, err := a.keyRequestTracker.Ingest(signRequest.Id, appClient.grpcURL) if err != nil { - a.logger().Error("failed to ingest sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl, "error", err) + a.logger().Error("failed to ingest sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcURL, "error", err) return } if action == tracker.ActionSkip { - a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcUrl) + a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcURL) return } @@ -140,6 +140,6 @@ func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { }() } -func (a *AppClient) signRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.SignRequest, error) { +func (a *wardenClient) signRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.SignRequest, error) { return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/tx_client_pool.go b/keychain-sdk/tx_client_pool.go index f3977a80c..6bce1b362 100644 --- a/keychain-sdk/tx_client_pool.go +++ b/keychain-sdk/tx_client_pool.go @@ -10,35 +10,35 @@ import ( "google.golang.org/grpc/connectivity" ) -type AppClient struct { - grpcUrl string +type wardenClient struct { + grpcURL string grpcInsecure bool query *client.QueryClient txClient *client.TxClient } -type ClientsPool struct { - clients []*AppClient +type clientsPool struct { + clients []*wardenClient config Config } -func NewClientsPool(config Config) *ClientsPool { - pool := ClientsPool{ - clients: make([]*AppClient, 0), +func newClientsPool(config Config) *clientsPool { + pool := clientsPool{ + clients: make([]*wardenClient, 0), config: config, } return &pool } -func (cp *ClientsPool) initConnections(logger *slog.Logger) error { +func (cp *clientsPool) initConnections(logger *slog.Logger) error { identity, err := client.NewIdentityFromSeed(cp.config.Mnemonic) if err != nil { return fmt.Errorf("failed to create identity: %w", err) } - for _, grpcUrl := range cp.config.GRPCConfigs { - appClient, err := cp.initConnection(logger, grpcUrl, cp.config.BasicConfig, identity) + for _, grpcURL := range cp.config.Nodes { + appClient, err := cp.initConnection(logger, grpcURL, cp.config.ChainID, identity) if err != nil { return err } @@ -49,19 +49,19 @@ func (cp *ClientsPool) initConnections(logger *slog.Logger) error { return nil } -func (cp *ClientsPool) initConnection( +func (cp *clientsPool) initConnection( logger *slog.Logger, - grpcNodeConfig GrpcNodeConfig, - config BasicConfig, - identity client.Identity) (*AppClient, error) { - appClient := &AppClient{ - grpcUrl: grpcNodeConfig.GRPCURL, - grpcInsecure: grpcNodeConfig.GRPCInsecure, + grpcNodeConfig GRPCNodeConfig, + chainID string, + identity client.Identity) (*wardenClient, error) { + appClient := &wardenClient{ + grpcURL: grpcNodeConfig.Host, + grpcInsecure: grpcNodeConfig.Insecure, } - logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig.GRPCURL, "insecure", grpcNodeConfig.GRPCInsecure) + logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig.Host, "insecure", grpcNodeConfig.Insecure) - query, err := client.NewQueryClient(grpcNodeConfig.GRPCURL, grpcNodeConfig.GRPCInsecure) + query, err := client.NewQueryClient(grpcNodeConfig.Host, grpcNodeConfig.Insecure) if err != nil { return nil, fmt.Errorf("failed to create query client: %w", err) } @@ -71,12 +71,12 @@ func (cp *ClientsPool) initConnection( logger.Info("keychain writer identity", "address", identity.Address.String()) - appClient.txClient = client.NewTxClient(identity, config.ChainID, conn, query) + appClient.txClient = client.NewTxClient(identity, chainID, conn, query) return appClient, nil } -func (cp *ClientsPool) liveTxClient() (*client.TxClient, error) { +func (cp *clientsPool) liveTxClient() (*client.TxClient, error) { for _, appClient := range cp.clients { if state := appClient.query.Conn().GetState(); isOnline(state) { return appClient.txClient, nil @@ -90,34 +90,34 @@ func isOnline(state connectivity.State) bool { return state == connectivity.Ready || state == connectivity.Idle } -func (cp *ClientsPool) BuildTx( +func (cp *clientsPool) BuildTx( ctx context.Context, gasLimit uint64, fees sdkTypes.Coins, msgers ...client.Msger) ([]byte, error) { liveClient, err := cp.liveTxClient() if err != nil { - return nil, fmt.Errorf("failed to aquire live client for BuildTx: %w", err) + return nil, fmt.Errorf("failed to acquire live client for BuildTx: %w", err) } return liveClient.BuildTx(ctx, gasLimit, fees, msgers...) } -func (cp *ClientsPool) SendWaitTx(ctx context.Context, txBytes []byte) error { +func (cp *clientsPool) SendWaitTx(ctx context.Context, txBytes []byte) (string, error) { liveClient, err := cp.liveTxClient() if err != nil { - return fmt.Errorf("failed to acquire live client for SendWaitTx: %w", err) + return "", fmt.Errorf("failed to acquire live client for SendWaitTx: %w", err) } return liveClient.SendWaitTx(ctx, txBytes) } -func (cp *ClientsPool) ConnectionState() map[string]connectivity.State { +func (cp *clientsPool) ConnectionState() map[string]connectivity.State { statuses := make(map[string]connectivity.State) for _, appClient := range cp.clients { state := appClient.query.Conn().GetState() - statuses[appClient.grpcUrl] = state + statuses[appClient.grpcURL] = state } return statuses