diff --git a/cmd/wardenkms/health_check_response.go b/cmd/wardenkms/health_check_response.go new file mode 100644 index 000000000..72cd77d78 --- /dev/null +++ b/cmd/wardenkms/health_check_response.go @@ -0,0 +1,23 @@ +package main + +type HealthCheckResponse struct { + // Online is the number of nodes that are online + Online uint `json:"online"` + + // Total is the total number of nodes + Total uint `json:"total"` + + // Threshold is the consensus threshold + Threshold uint8 `json:"threshold"` + + // Nodes is a node statuses collection + Nodes []NodeStatus `json:"nodes"` +} + +type NodeStatus struct { + // Address is the address of the node + Address string `json:"address"` + + // Status is the status of the node + Status string `json:"status"` +} diff --git a/cmd/wardenkms/wardenkms.go b/cmd/wardenkms/wardenkms.go index 0b7c8fda7..f098e0b2b 100644 --- a/cmd/wardenkms/wardenkms.go +++ b/cmd/wardenkms/wardenkms.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/binary" + "encoding/json" "fmt" "log" "log/slog" @@ -20,11 +21,10 @@ import ( ) type Config struct { - ChainID string `env:"CHAIN_ID, default=warden_1337-1"` - GRPCURL string `env:"GRPC_URL, default=localhost:9090"` - GRPCInsecure bool `env:"GRPC_INSECURE, default=true"` - Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` - KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` + ChainID string `env:"CHAIN_ID, default=warden_1337-1"` + GRPCURLs string `env:"GRPC_URLS, default=[{\"GRPCUrl\":\"localhost:9090\",\"GRPCInsecure\":true}]"` + Mnemonic string `env:"MNEMONIC, default=exclude try nephew main caught favorite tone degree lottery device tissue tent ugly mouse pelican gasp lava flush pen river noise remind balcony emerge"` + KeychainId uint64 `env:"KEYCHAIN_ID, default=1"` KeyringMnemonic string `env:"KEYRING_MNEMONIC, required"` KeyringPassword string `env:"KEYRING_PASSWORD, required"` @@ -38,6 +38,8 @@ type Config struct { HttpAddr string `env:"HTTP_ADDR, default=:8080"` LogLevel slog.Level `env:"LOG_LEVEL, default=debug"` + + ConsensusNodeThreshold uint8 `env:"CONSENSUS_NODE_THRESHOLD, default=1"` } func main() { @@ -56,18 +58,24 @@ func main() { return } + var grpcConfigs []keychain.GRPCNodeConfig + if err := json.Unmarshal([]byte(cfg.GRPCURLs), &grpcConfigs); err != nil { + logger.Error("failed to initialize grpc configs", "error", err) + return + } + app := keychain.NewApp(keychain.Config{ - Logger: logger, - ChainID: cfg.ChainID, - GRPCURL: cfg.GRPCURL, - GRPCInsecure: cfg.GRPCInsecure, - Mnemonic: cfg.Mnemonic, - KeychainID: cfg.KeychainId, - GasLimit: cfg.GasLimit, - BatchInterval: cfg.BatchInterval, - BatchSize: cfg.BatchSize, - TxTimeout: cfg.TxTimeout, - TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), + Logger: logger, + ChainID: cfg.ChainID, + Mnemonic: cfg.Mnemonic, + KeychainID: cfg.KeychainId, + GasLimit: cfg.GasLimit, + BatchInterval: cfg.BatchInterval, + BatchSize: cfg.BatchSize, + TxTimeout: cfg.TxTimeout, + TxFees: sdk.NewCoins(sdk.NewCoin("award", math.NewInt(cfg.TxFee))), + Nodes: grpcConfigs, + ConsensusNodeThreshold: cfg.ConsensusNodeThreshold, }) app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) { @@ -120,11 +128,41 @@ func main() { if cfg.HttpAddr != "" { logger.Info("starting HTTP server", "addr", cfg.HttpAddr) http.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) { - if app.ConnectionState() == connectivity.Ready { + connectionStates := app.ConnectionState() + + readyConnectionsCount := uint(0) + nodes := make([]NodeStatus, 0, len(connectionStates)) + + for url, state := range connectionStates { + if state == connectivity.Ready { + readyConnectionsCount += 1 + } + + nodes = append(nodes, NodeStatus{ + Address: url, + Status: state.String(), + }) + } + + bytes, err := json.Marshal(HealthCheckResponse{ + Online: readyConnectionsCount, + Total: uint(len(connectionStates)), + Nodes: nodes, + Threshold: cfg.ConsensusNodeThreshold, + }) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if readyConnectionsCount >= uint(cfg.ConsensusNodeThreshold) { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusServiceUnavailable) } + + _, _ = w.Write(bytes) }) go func() { _ = http.ListenAndServe(cfg.HttpAddr, nil) }() } diff --git a/keychain-sdk/config.go b/keychain-sdk/config.go index 72d45bce3..56e8e83a8 100644 --- a/keychain-sdk/config.go +++ b/keychain-sdk/config.go @@ -16,14 +16,6 @@ type Config struct { // ChainID is the chain ID of the chain to connect to. ChainID string - // GRPCURL is the URL of the gRPC server to connect to. - // e.g. "localhost:9090" - GRPCURL string - - // GRPCInsecure determines whether to allow an insecure connection to the - // gRPC server. - GRPCInsecure bool - // KeychainID is the ID of the keychain this instance will fetch requests // for. KeychainID uint64 @@ -57,4 +49,20 @@ type Config struct { // If the transaction isn't included in a block, it will be considered as // failed (but the blockchain might still include in a block later). TxTimeout time.Duration + + // ConsensusNodeThreshold represents the number of nodes required to execute a pending key/sign request. + ConsensusNodeThreshold uint8 + + // Nodes is the list of URLs of the gRPC server to connect to. + Nodes []GRPCNodeConfig +} + +type GRPCNodeConfig struct { + // Insecure determines whether to allow an insecure connection to the + // gRPC server. + Insecure bool + + // Host is the URL of the gRPC server to connect to. + // e.g. "localhost:9090" + Host string } diff --git a/keychain-sdk/example_keychain_test.go b/keychain-sdk/example_keychain_test.go index 501bf7368..64ec11852 100644 --- a/keychain-sdk/example_keychain_test.go +++ b/keychain-sdk/example_keychain_test.go @@ -21,18 +21,18 @@ func Main() { Logger: logger, // not required, but recommended // setup the connection to the Warden Protocol node - ChainID: "warden", - GRPCURL: "localhost:9090", - GRPCInsecure: true, + ChainID: "warden_1337-1", // setup the account used to write txs KeychainID: 1, Mnemonic: "virus boat radio apple pilot ask vault exhaust again state doll stereo slide exhibit scissors miss attack boat budget egg bird mask more trick", // setup throughput for batching responses - GasLimit: 400000, - BatchInterval: 8 * time.Second, - BatchSize: 10, + GasLimit: 400000, + BatchInterval: 8 * time.Second, + BatchSize: 10, + Nodes: []keychain.GRPCNodeConfig{{Host: "localhost:9090", Insecure: false}}, + ConsensusNodeThreshold: 1, }) app.SetKeyRequestHandler(func(w keychain.KeyResponseWriter, req *keychain.KeyRequest) { diff --git a/keychain-sdk/internal/tracker/tracker.go b/keychain-sdk/internal/tracker/tracker.go index e2652d848..87ce89129 100644 --- a/keychain-sdk/internal/tracker/tracker.go +++ b/keychain-sdk/internal/tracker/tracker.go @@ -1,35 +1,72 @@ package tracker import ( + "fmt" "sync" ) +type Action int + +const ( + ActionSkip Action = iota + ActionProcess +) + +type stringSet map[string]struct{} + +// add safely adds a string to the set +func (s stringSet) add(str string) bool { + if _, exists := s[str]; exists { + return false + } + s[str] = struct{}{} + return true +} + type T struct { - rw sync.RWMutex - ingested map[uint64]struct{} + threshold uint8 + rw sync.RWMutex + ingested map[uint64]stringSet } -func New() *T { +func New(threshold uint8) *T { return &T{ - ingested: make(map[uint64]struct{}), + threshold: threshold, + ingested: make(map[uint64]stringSet), } } -func (t *T) IsNew(id uint64) bool { - t.rw.RLock() - defer t.rw.RUnlock() - _, ok := t.ingested[id] - return !ok +func (t *T) ingestTracker(id uint64) stringSet { + value, ok := t.ingested[id] + if !ok { + t.ingested[id] = make(stringSet) + + return t.ingested[id] + } + + return value } -func (t *T) Ingested(id uint64) { +func (t *T) Ingest(id uint64, ingesterId string) (Action, error) { t.rw.Lock() defer t.rw.Unlock() - t.ingested[id] = struct{}{} + + value := t.ingestTracker(id) + + if !value.add(ingesterId) { + return ActionSkip, fmt.Errorf("already ingested") + } + + if uint64(len(value)) == uint64(t.threshold) { + return ActionProcess, nil + } + + return ActionSkip, nil } func (t *T) Done(id uint64) { t.rw.Lock() defer t.rw.Unlock() + delete(t.ingested, id) } diff --git a/keychain-sdk/internal/writer/writer.go b/keychain-sdk/internal/writer/writer.go index f26ef6356..3e9f8726c 100644 --- a/keychain-sdk/internal/writer/writer.go +++ b/keychain-sdk/internal/writer/writer.go @@ -18,9 +18,6 @@ type W struct { // included in a block after being broadcasted. TxTimeout time.Duration - // Client is the client used to send transactions to the chain. - Client *client.TxClient - Logger *slog.Logger GasLimit uint64 @@ -35,15 +32,18 @@ type W struct { batch Batch } +type SyncTxClient interface { + SendWaitTx(ctx context.Context, txBytes []byte) (string, error) + BuildTx(ctx context.Context, gasLimit uint64, fees sdk.Coins, msgers ...client.Msger) ([]byte, error) +} + func New( - client *client.TxClient, batchSize int, batchInterval time.Duration, txTimeout time.Duration, logger *slog.Logger, ) *W { return &W{ - Client: client, BatchInterval: batchInterval, TxTimeout: txTimeout, Logger: logger, @@ -51,7 +51,7 @@ func New( } } -func (w *W) Start(ctx context.Context, flushErrors chan error) error { +func (w *W) Start(ctx context.Context, client SyncTxClient, flushErrors chan error) error { w.Logger.Info("starting tx writer") for { select { @@ -62,9 +62,11 @@ func (w *W) Start(ctx context.Context, flushErrors chan error) error { if w.TxTimeout > 0 { ctx, cancel = context.WithTimeout(ctx, w.TxTimeout) } - if err := w.Flush(ctx); err != nil { + + if err := w.Flush(ctx, client); err != nil { flushErrors <- err } + cancel() time.Sleep(w.BatchInterval) } @@ -97,7 +99,7 @@ func (w *W) fees() sdk.Coins { return w.Fees } -func (w *W) Flush(ctx context.Context) error { +func (w *W) Flush(ctx context.Context, txClient SyncTxClient) error { msgs := w.batch.Clear() if len(msgs) == 0 { w.Logger.Debug("flushing batch", "empty", true) @@ -115,7 +117,7 @@ func (w *W) Flush(ctx context.Context) error { msgers[i] = item.Msger } - if err := w.sendWaitTx(ctx, msgers...); err != nil { + if err := w.sendWaitTx(ctx, txClient, msgers...); err != nil { for _, item := range msgs { item.Done <- err } @@ -125,18 +127,18 @@ func (w *W) Flush(ctx context.Context) error { return nil } -func (w *W) sendWaitTx(ctx context.Context, msgs ...client.Msger) error { +func (w *W) sendWaitTx(ctx context.Context, txClient SyncTxClient, msgs ...client.Msger) error { w.sendTxLock.Lock() defer w.sendTxLock.Unlock() w.Logger.Info("flushing batch", "count", len(msgs)) - tx, err := w.Client.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...) + tx, err := txClient.BuildTx(ctx, w.gasLimit(), w.fees(), msgs...) if err != nil { return err } - hash, err := w.Client.SendWaitTx(ctx, tx) + hash, err := txClient.SendWaitTx(ctx, tx) if err != nil { return err } diff --git a/keychain-sdk/key_requests.go b/keychain-sdk/key_requests.go index 112cd870f..9ad41dd76 100644 --- a/keychain-sdk/key_requests.go +++ b/keychain-sdk/key_requests.go @@ -7,6 +7,7 @@ import ( "time" "github.com/warden-protocol/wardenprotocol/go-client" + "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" ) @@ -56,23 +57,16 @@ func (w *keyResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest) { +func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest, client *wardenClient) { for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - keyRequests, err := a.keyRequests(reqCtx) + keyRequests, err := client.keyRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) cancel() if err != nil { a.logger().Error("failed to get key requests", "error", err) } else { for _, keyRequest := range keyRequests { - if !a.keyRequestTracker.IsNew(keyRequest.Id) { - a.logger().Debug("skipping key request", "id", keyRequest.Id) - continue - } - - a.logger().Info("got key request", "id", keyRequest.Id) - a.keyRequestTracker.Ingested(keyRequest.Id) - keyRequestsCh <- keyRequest + a.ingestRequest(keyRequestsCh, keyRequest, client) } } @@ -80,6 +74,26 @@ func (a *App) ingestKeyRequests(keyRequestsCh chan *wardentypes.KeyRequest) { } } +func (a *App) ingestRequest( + keyRequestsCh chan *wardentypes.KeyRequest, + keyRequest *wardentypes.KeyRequest, + client *wardenClient) { + action, err := a.keyRequestTracker.Ingest(keyRequest.Id, client.grpcURL) + if err != nil { + a.logger().Error("failed to ingest key request", "id", keyRequest.Id, "grpcUrl", client.grpcURL, "error", err) + return + } + + if action == tracker.ActionSkip { + a.logger().Debug("skipping key request", "id", keyRequest.Id, "grpcUrl", client.grpcURL) + return + } + + if action == tracker.ActionProcess { + keyRequestsCh <- keyRequest + } +} + func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { if a.keyRequestHandler == nil { a.logger().Error("key request handler not set") @@ -109,6 +123,6 @@ func (a *App) handleKeyRequest(keyRequest *wardentypes.KeyRequest) { }() } -func (a *App) keyRequests(ctx context.Context) ([]*wardentypes.KeyRequest, error) { - return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(a.config.BatchSize)}, a.config.KeychainID) +func (a *wardenClient) keyRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.KeyRequest, error) { + return a.query.PendingKeyRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/keychain.go b/keychain-sdk/keychain.go index 071368d1f..474dc69d0 100644 --- a/keychain-sdk/keychain.go +++ b/keychain-sdk/keychain.go @@ -12,7 +12,6 @@ import ( "io" "log/slog" - "github.com/warden-protocol/wardenprotocol/go-client" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" @@ -27,18 +26,19 @@ type App struct { keyRequestHandler KeyRequestHandler signRequestHandler SignRequestHandler - query *client.QueryClient txWriter *writer.W keyRequestTracker *tracker.T signRequestTracker *tracker.T + + clientsPool *clientsPool } // NewApp creates a new Keychain application, using the given configuration. func NewApp(config Config) *App { return &App{ config: config, - keyRequestTracker: tracker.New(), - signRequestTracker: tracker.New(), + keyRequestTracker: tracker.New(config.ConsensusNodeThreshold), + signRequestTracker: tracker.New(config.ConsensusNodeThreshold), } } @@ -64,23 +64,37 @@ func (a *App) SetSignRequestHandler(handler SignRequestHandler) { func (a *App) Start(ctx context.Context) error { a.logger().Info("starting keychain", "keychain_id", a.config.KeychainID) - err := a.initConnections() - if err != nil { + clientsPool := newClientsPool(a.config) + if err := clientsPool.initConnections(a.logger()); err != nil { return fmt.Errorf("failed to init connections: %w", err) } + a.clientsPool = clientsPool + + a.txWriter = writer.New( + a.config.BatchSize, + a.config.BatchInterval, + a.config.TxTimeout, + a.logger()) + a.txWriter.Fees = a.config.TxFees + a.txWriter.GasLimit = a.config.GasLimit + keyRequestsCh := make(chan *wardentypes.KeyRequest) defer close(keyRequestsCh) - go a.ingestKeyRequests(keyRequestsCh) + for _, appClient := range a.clientsPool.clients { + go a.ingestKeyRequests(keyRequestsCh, appClient) + } signRequestsCh := make(chan *wardentypes.SignRequest) defer close(signRequestsCh) - go a.ingestSignRequests(signRequestsCh) + for _, appClient := range a.clientsPool.clients { + go a.ingestSignRequests(signRequestsCh, appClient) + } flushErrors := make(chan error) defer close(flushErrors) go func() { - if err := a.txWriter.Start(ctx, flushErrors); err != nil { + if err := a.txWriter.Start(ctx, a.clientsPool, flushErrors); err != nil { a.logger().Error("tx writer exited with error", "error", err) } }() @@ -100,31 +114,6 @@ func (a *App) Start(ctx context.Context) error { } // ConnectionState returns the current state of the gRPC connection. -func (a *App) ConnectionState() connectivity.State { - return a.query.Conn().GetState() -} - -func (a *App) initConnections() error { - a.logger().Info("connecting to Warden Protocol using gRPC", "url", a.config.GRPCURL, "insecure", a.config.GRPCInsecure) - query, err := client.NewQueryClient(a.config.GRPCURL, a.config.GRPCInsecure) - if err != nil { - return fmt.Errorf("failed to create query client: %w", err) - } - a.query = query - - conn := query.Conn() - - identity, err := client.NewIdentityFromSeed(a.config.Mnemonic) - if err != nil { - return fmt.Errorf("failed to create identity: %w", err) - } - - a.logger().Info("keychain writer identity", "address", identity.Address.String()) - - txClient := client.NewTxClient(identity, a.config.ChainID, conn, query) - a.txWriter = writer.New(txClient, a.config.BatchSize, a.config.BatchInterval, a.config.TxTimeout, a.logger()) - a.txWriter.Fees = a.config.TxFees - a.txWriter.GasLimit = a.config.GasLimit - - return nil +func (a *App) ConnectionState() map[string]connectivity.State { + return a.clientsPool.ConnectionState() } diff --git a/keychain-sdk/sign_requests.go b/keychain-sdk/sign_requests.go index 7a27c93bb..104c19236 100644 --- a/keychain-sdk/sign_requests.go +++ b/keychain-sdk/sign_requests.go @@ -8,11 +8,12 @@ import ( "github.com/warden-protocol/wardenprotocol/go-client" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/enc" + "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/tracker" "github.com/warden-protocol/wardenprotocol/keychain-sdk/internal/writer" wardentypes "github.com/warden-protocol/wardenprotocol/warden/x/warden/types/v1beta3" ) -// SignaResponseWriter is the interface for writing responses to sign requests. +// SignResponseWriter is the interface for writing responses to sign requests. type SignResponseWriter interface { // Fulfil writes the signature to the sign request. Fulfil(signature []byte) error @@ -68,23 +69,16 @@ func (w *signResponseWriter) Reject(reason string) error { return err } -func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest) { +func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest, appClient *wardenClient) { for { reqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - signRequests, err := a.signRequests(reqCtx) + signRequests, err := appClient.signRequests(reqCtx, a.config.BatchSize, a.config.KeychainID) cancel() if err != nil { a.logger().Error("failed to get sign requests", "error", err) } else { for _, signRequest := range signRequests { - if !a.signRequestTracker.IsNew(signRequest.Id) { - a.logger().Debug("skipping sign request", "id", signRequest.Id) - continue - } - - a.logger().Info("got sign request", "id", signRequest.Id) - a.signRequestTracker.Ingested(signRequest.Id) - signRequestsCh <- signRequest + a.ingestSignRequest(signRequestsCh, signRequest, appClient) } } @@ -92,6 +86,23 @@ func (a *App) ingestSignRequests(signRequestsCh chan *wardentypes.SignRequest) { } } +func (a *App) ingestSignRequest(signRequestsCh chan *wardentypes.SignRequest, signRequest *wardentypes.SignRequest, appClient *wardenClient) { + action, err := a.keyRequestTracker.Ingest(signRequest.Id, appClient.grpcURL) + if err != nil { + a.logger().Error("failed to ingest sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcURL, "error", err) + return + } + + if action == tracker.ActionSkip { + a.logger().Debug("skipping sign request", "id", signRequest.Id, "grpcUrl", appClient.grpcURL) + return + } + + if action == tracker.ActionProcess { + signRequestsCh <- signRequest + } +} + func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { if a.signRequestHandler == nil { a.logger().Error("sign request handler not set") @@ -129,6 +140,6 @@ func (a *App) handleSignRequest(signRequest *wardentypes.SignRequest) { }() } -func (a *App) signRequests(ctx context.Context) ([]*wardentypes.SignRequest, error) { - return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(a.config.BatchSize)}, a.config.KeychainID) +func (a *wardenClient) signRequests(ctx context.Context, batchSize int, keychainId uint64) ([]*wardentypes.SignRequest, error) { + return a.query.PendingSignRequests(ctx, &client.PageRequest{Limit: uint64(batchSize)}, keychainId) } diff --git a/keychain-sdk/tx_client_pool.go b/keychain-sdk/tx_client_pool.go new file mode 100644 index 000000000..6bce1b362 --- /dev/null +++ b/keychain-sdk/tx_client_pool.go @@ -0,0 +1,124 @@ +package keychain + +import ( + "context" + "fmt" + "log/slog" + + sdkTypes "github.com/cosmos/cosmos-sdk/types" + client "github.com/warden-protocol/wardenprotocol/go-client" + "google.golang.org/grpc/connectivity" +) + +type wardenClient struct { + grpcURL string + grpcInsecure bool + query *client.QueryClient + txClient *client.TxClient +} + +type clientsPool struct { + clients []*wardenClient + config Config +} + +func newClientsPool(config Config) *clientsPool { + pool := clientsPool{ + clients: make([]*wardenClient, 0), + config: config, + } + + return &pool +} + +func (cp *clientsPool) initConnections(logger *slog.Logger) error { + identity, err := client.NewIdentityFromSeed(cp.config.Mnemonic) + if err != nil { + return fmt.Errorf("failed to create identity: %w", err) + } + + for _, grpcURL := range cp.config.Nodes { + appClient, err := cp.initConnection(logger, grpcURL, cp.config.ChainID, identity) + if err != nil { + return err + } + + cp.clients = append(cp.clients, appClient) + } + + return nil +} + +func (cp *clientsPool) initConnection( + logger *slog.Logger, + grpcNodeConfig GRPCNodeConfig, + chainID string, + identity client.Identity) (*wardenClient, error) { + appClient := &wardenClient{ + grpcURL: grpcNodeConfig.Host, + grpcInsecure: grpcNodeConfig.Insecure, + } + + logger.Info("connecting to Warden Protocol using gRPC", "url", grpcNodeConfig.Host, "insecure", grpcNodeConfig.Insecure) + + query, err := client.NewQueryClient(grpcNodeConfig.Host, grpcNodeConfig.Insecure) + if err != nil { + return nil, fmt.Errorf("failed to create query client: %w", err) + } + appClient.query = query + + conn := query.Conn() + + logger.Info("keychain writer identity", "address", identity.Address.String()) + + appClient.txClient = client.NewTxClient(identity, chainID, conn, query) + + return appClient, nil +} + +func (cp *clientsPool) liveTxClient() (*client.TxClient, error) { + for _, appClient := range cp.clients { + if state := appClient.query.Conn().GetState(); isOnline(state) { + return appClient.txClient, nil + } + } + + return nil, fmt.Errorf("all node clients are down") +} + +func isOnline(state connectivity.State) bool { + return state == connectivity.Ready || state == connectivity.Idle +} + +func (cp *clientsPool) BuildTx( + ctx context.Context, + gasLimit uint64, + fees sdkTypes.Coins, + msgers ...client.Msger) ([]byte, error) { + liveClient, err := cp.liveTxClient() + if err != nil { + return nil, fmt.Errorf("failed to acquire live client for BuildTx: %w", err) + } + + return liveClient.BuildTx(ctx, gasLimit, fees, msgers...) +} + +func (cp *clientsPool) SendWaitTx(ctx context.Context, txBytes []byte) (string, error) { + liveClient, err := cp.liveTxClient() + if err != nil { + return "", fmt.Errorf("failed to acquire live client for SendWaitTx: %w", err) + } + + return liveClient.SendWaitTx(ctx, txBytes) +} + +func (cp *clientsPool) ConnectionState() map[string]connectivity.State { + statuses := make(map[string]connectivity.State) + + for _, appClient := range cp.clients { + state := appClient.query.Conn().GetState() + statuses[appClient.grpcURL] = state + } + + return statuses +}