From efc05e81226fdaa5180f40972e2ae8fb28a57aad Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 20 Nov 2024 19:52:18 +0400 Subject: [PATCH] refactor(server/v2/cometbft): add `codec.Codec` and clean-up APIs (#22566) --- server/v2/cometbft/abci.go | 96 +++++++------------------- server/v2/cometbft/abci_test.go | 17 +++-- server/v2/cometbft/commands.go | 2 +- server/v2/cometbft/grpc.go | 41 +++++++----- server/v2/cometbft/options.go | 11 ++- server/v2/cometbft/query.go | 8 +-- server/v2/cometbft/server.go | 111 ++++++++++++++++++++++--------- server/v2/cometbft/snapshots.go | 8 +-- server/v2/cometbft/streaming.go | 4 +- server/v2/cometbft/utils.go | 14 ++-- simapp/v2/simdv2/cmd/commands.go | 9 +-- 11 files changed, 174 insertions(+), 147 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index e8c72f6ef34..c19110da0c4 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -6,14 +6,10 @@ import ( "errors" "fmt" "strings" - "sync" "sync/atomic" abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" - sdk "github.com/cosmos/cosmos-sdk/types" - txtypes "github.com/cosmos/cosmos-sdk/types/tx" gogoproto "github.com/cosmos/gogoproto/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" @@ -37,6 +33,11 @@ import ( "cosmossdk.io/server/v2/streaming" "cosmossdk.io/store/v2/snapshots" consensustypes "cosmossdk.io/x/consensus/types" + + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + sdk "github.com/cosmos/cosmos-sdk/types" + txtypes "github.com/cosmos/cosmos-sdk/types/tx" ) const ( @@ -45,22 +46,24 @@ const ( QueryPathStore = "store" ) -var _ abci.Application = (*Consensus[transaction.Tx])(nil) +var _ abci.Application = (*consensus[transaction.Tx])(nil) -type Consensus[T transaction.Tx] struct { +// consensus contains the implementation of the ABCI interface for CometBFT. +type consensus[T transaction.Tx] struct { logger log.Logger appName, version string app appmanager.AppManager[T] + appCodec codec.Codec txCodec transaction.Codec[T] store types.Store - streaming streaming.Manager listener *appdata.Listener snapshotManager *snapshots.Manager + streamingManager streaming.Manager mempool mempool.Mempool[T] cfg Config - indexedEvents map[string]struct{} chainID string + indexedEvents map[string]struct{} initialHeight uint64 // this is only available after this node has committed a block (in FinalizeBlock), @@ -81,60 +84,9 @@ type Consensus[T transaction.Tx] struct { getProtoRegistry func() (*protoregistry.Files, error) } -func NewConsensus[T transaction.Tx]( - logger log.Logger, - appName string, - app appmanager.AppManager[T], - mp mempool.Mempool[T], - indexedEvents map[string]struct{}, - queryHandlersMap map[string]appmodulev2.Handler, - store types.Store, - cfg Config, - txCodec transaction.Codec[T], - chainId string, -) *Consensus[T] { - return &Consensus[T]{ - appName: appName, - version: getCometBFTServerVersion(), - app: app, - cfg: cfg, - store: store, - logger: logger, - txCodec: txCodec, - streaming: streaming.Manager{}, - snapshotManager: nil, - mempool: mp, - lastCommittedHeight: atomic.Int64{}, - prepareProposalHandler: nil, - processProposalHandler: nil, - verifyVoteExt: nil, - extendVote: nil, - chainID: chainId, - indexedEvents: indexedEvents, - initialHeight: 0, - queryHandlersMap: queryHandlersMap, - getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), - } -} - -// SetStreamingManager sets the streaming manager for the consensus module. -func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) { - c.streaming = sm -} - -// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager. -// It allows additional snapshotter implementations to be used for creating and restoring snapshots. -func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error { - if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil { - return fmt.Errorf("failed to register snapshot extensions: %w", err) - } - - return nil -} - // CheckTx implements types.Application. // It is called by cometbft to verify transaction validity -func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) { +func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) { decodedTx, err := c.txCodec.Decode(req.Tx) if err != nil { return nil, err @@ -172,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques } // Info implements types.Application. -func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) { +func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) { version, _, err := c.store.StateLatest() if err != nil { return nil, err @@ -212,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // Query implements types.Application. // It is called by cometbft to query application state. -func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { +func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req) if isGRPC { return resp, err @@ -227,7 +179,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( switch path[0] { case QueryPathApp: - resp, err = c.handlerQueryApp(ctx, path, req) + resp, err = c.handleQueryApp(ctx, path, req) case QueryPathStore: resp, err = c.handleQueryStore(path, req) @@ -246,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return resp, nil } -func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) { +func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) { // if this fails then we cannot serve queries anymore registry, err := c.getProtoRegistry() if err != nil { @@ -288,7 +240,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq txResult, _, err := c.app.Simulate(ctx, tx) if err != nil { - return nil, true, fmt.Errorf("%v with gas used: '%d'", err, txResult.GasUsed) + return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err) } msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp)) @@ -337,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq } // InitChain implements types.Application. -func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { +func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId) // store chainID to be used later on in execution @@ -421,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe // PrepareProposal implements types.Application. // It is called by cometbft to prepare a proposal block. -func (c *Consensus[T]) PrepareProposal( +func (c *consensus[T]) PrepareProposal( ctx context.Context, req *abciproto.PrepareProposalRequest, ) (resp *abciproto.PrepareProposalResponse, err error) { @@ -457,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal( // ProcessProposal implements types.Application. // It is called by cometbft to process/verify a proposal block. -func (c *Consensus[T]) ProcessProposal( +func (c *consensus[T]) ProcessProposal( ctx context.Context, req *abciproto.ProcessProposalRequest, ) (*abciproto.ProcessProposalResponse, error) { @@ -491,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal( // FinalizeBlock implements types.Application. // It is called by cometbft to finalize a block. -func (c *Consensus[T]) FinalizeBlock( +func (c *consensus[T]) FinalizeBlock( ctx context.Context, req *abciproto.FinalizeBlockRequest, ) (*abciproto.FinalizeBlockResponse, error) { @@ -581,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock( // Commit implements types.Application. // It is called by cometbft to notify the application that a block was committed. -func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) { +func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) { lastCommittedHeight := c.lastCommittedHeight.Load() c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight) @@ -599,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) ( // Vote extensions // VerifyVoteExtension implements types.Application. -func (c *Consensus[T]) VerifyVoteExtension( +func (c *consensus[T]) VerifyVoteExtension( ctx context.Context, req *abciproto.VerifyVoteExtensionRequest, ) (*abciproto.VerifyVoteExtensionResponse, error) { @@ -641,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension( } // ExtendVote implements types.Application. -func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) { +func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) { // If vote extensions are not enabled, as a safety precaution, we return an // error. cp, err := c.GetConsensusParams(ctx) diff --git a/server/v2/cometbft/abci_test.go b/server/v2/cometbft/abci_test.go index ab29139da33..c86ca226391 100644 --- a/server/v2/cometbft/abci_test.go +++ b/server/v2/cometbft/abci_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "io" "strings" + "sync" "testing" "time" @@ -637,7 +638,7 @@ func TestConsensus_Query(t *testing.T) { require.Equal(t, res.Value, []byte(nil)) } -func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] { +func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] { t.Helper() msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) { @@ -699,9 +700,17 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock. nil, ) - return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am, - mempool, map[string]struct{}{}, nil, mockStore, - Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test") + return &consensus[mock.Tx]{ + logger: log.NewNopLogger(), + appName: "testing-app", + app: am, + mempool: mempool, + store: mockStore, + cfg: Config{AppTomlConfig: DefaultAppTomlConfig()}, + txCodec: mock.TxCodec{}, + chainID: "test", + getProtoRegistry: sync.OnceValues(proto.MergedRegistry), + } } // Check target version same with store's latest version diff --git a/server/v2/cometbft/commands.go b/server/v2/cometbft/commands.go index d1bb2f25724..7cafa4a45d6 100644 --- a/server/v2/cometbft/commands.go +++ b/server/v2/cometbft/commands.go @@ -388,7 +388,7 @@ func (s *CometBFTServer[T]) BootstrapStateCmd() *cobra.Command { return err } if height == 0 { - height, err = s.Consensus.store.GetLatestVersion() + height, err = s.store.GetLatestVersion() if err != nil { return err } diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 3e8f1855ba0..5a2d64d7d2a 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -3,7 +3,8 @@ package cometbft import ( "context" - v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1" + abci "github.com/cometbft/cometbft/abci/types" + abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" "github.com/cosmos/gogoproto/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -12,8 +13,8 @@ import ( autocliv1 "cosmossdk.io/api/cosmos/autocli/v1" cmtv1beta1 "cosmossdk.io/api/cosmos/base/tendermint/v1beta1" "cosmossdk.io/core/server" + corestore "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" - errorsmod "cosmossdk.io/errors/v2" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" @@ -23,17 +24,25 @@ import ( txtypes "github.com/cosmos/cosmos-sdk/types/tx" ) -// GRPCServiceRegistrar returns a function that registers the CometBFT gRPC service +type appSimulator[T transaction.Tx] interface { + Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error) +} + +// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service // Those services are defined for backward compatibility. // Eventually, they will be removed in favor of the new gRPC services. -func (c *Consensus[T]) GRPCServiceRegistrar( +func gRPCServiceRegistrar[T transaction.Tx]( clientCtx client.Context, cfg server.ConfigMap, + cometBFTAppConfig *AppTomlConfig, + txCodec transaction.Codec[T], + consensus abci.Application, + app appSimulator[T], ) func(srv *grpc.Server) error { return func(srv *grpc.Server) error { - cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, c.Query, clientCtx.ConsensusAddressCodec)) - txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, c}) - nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, c}) + cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec)) + txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app}) + nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus}) return nil } @@ -86,7 +95,8 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{ type txServer[T transaction.Tx] struct { clientCtx client.Context - consensus *Consensus[T] + txCodec transaction.Codec[T] + app appSimulator[T] } // BroadcastTx implements tx.ServiceServer. @@ -132,12 +142,12 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest) return nil, status.Errorf(codes.InvalidArgument, "empty txBytes is not allowed") } - tx, err := t.consensus.txCodec.Decode(txBytes) + tx, err := t.txCodec.Decode(txBytes) if err != nil { - return nil, errorsmod.Wrap(err, "failed to decode tx") + return nil, status.Errorf(codes.InvalidArgument, "failed to decode tx: %v", err) } - txResult, _, err := t.consensus.app.Simulate(ctx, tx) + txResult, _, err := t.app.Simulate(ctx, tx) if err != nil { return nil, status.Errorf(codes.Unknown, "%v with gas used: '%d'", err, txResult.GasUsed) } @@ -186,8 +196,9 @@ func (t txServer[T]) TxEncodeAmino(context.Context, *txtypes.TxEncodeAminoReques var _ txtypes.ServiceServer = txServer[transaction.Tx]{} type nodeServer[T transaction.Tx] struct { - cfg server.ConfigMap - consensus *Consensus[T] + cfg server.ConfigMap + cometBFTAppConfig *AppTomlConfig + consensus abci.Application } func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) (*nodeservice.ConfigResponse, error) { @@ -201,12 +212,12 @@ func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) MinimumGasPrice: minGasPricesStr, PruningKeepRecent: "ambiguous in v2", PruningInterval: "ambiguous in v2", - HaltHeight: s.consensus.cfg.AppTomlConfig.HaltHeight, + HaltHeight: s.cometBFTAppConfig.HaltHeight, }, nil } func (s nodeServer[T]) Status(ctx context.Context, _ *nodeservice.StatusRequest) (*nodeservice.StatusResponse, error) { - nodeInfo, err := s.consensus.Info(ctx, &v1.InfoRequest{}) + nodeInfo, err := s.consensus.Info(ctx, &abciproto.InfoRequest{}) if err != nil { return nil, err } diff --git a/server/v2/cometbft/options.go b/server/v2/cometbft/options.go index d5aa4872fff..b5936148b5a 100644 --- a/server/v2/cometbft/options.go +++ b/server/v2/cometbft/options.go @@ -8,6 +8,7 @@ import ( "cosmossdk.io/server/v2/cometbft/handlers" "cosmossdk.io/server/v2/cometbft/mempool" "cosmossdk.io/server/v2/cometbft/types" + "cosmossdk.io/server/v2/streaming" "cosmossdk.io/store/v2/snapshots" ) @@ -23,8 +24,14 @@ type ServerOptions[T transaction.Tx] struct { ExtendVoteHandler handlers.ExtendVoteHandler KeygenF keyGenF - Mempool func(cfg map[string]any) mempool.Mempool[T] + // Set mempool for the consensus module. + Mempool func(cfg map[string]any) mempool.Mempool[T] + // Set streaming manager for the consensus module. + StreamingManager streaming.Manager + // Set snapshot options for the consensus module. SnapshotOptions func(cfg map[string]any) snapshots.SnapshotOptions + // Allows additional snapshotter implementations to be used for creating and restoring snapshots. + SnapshotExtensions []snapshots.ExtensionSnapshotter AddrPeerFilter types.PeerFilter // filter peers by address and port IdPeerFilter types.PeerFilter // filter peers by node ID @@ -40,7 +47,9 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] { VerifyVoteExtensionHandler: handlers.NoOpVerifyVoteExtensionHandler(), ExtendVoteHandler: handlers.NoOpExtendVote(), Mempool: func(cfg map[string]any) mempool.Mempool[T] { return mempool.NoOpMempool[T]{} }, + StreamingManager: streaming.Manager{}, SnapshotOptions: func(cfg map[string]any) snapshots.SnapshotOptions { return snapshots.NewSnapshotOptions(0, 0) }, + SnapshotExtensions: []snapshots.ExtensionSnapshotter{}, AddrPeerFilter: nil, IdPeerFilter: nil, KeygenF: func() (cmtcrypto.PrivKey, error) { return cmted22519.GenPrivKey(), nil }, diff --git a/server/v2/cometbft/query.go b/server/v2/cometbft/query.go index f032b280735..f7fbe811d1b 100644 --- a/server/v2/cometbft/query.go +++ b/server/v2/cometbft/query.go @@ -11,7 +11,7 @@ import ( cometerrors "cosmossdk.io/server/v2/cometbft/types/errors" ) -func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) { +func (c *consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) { // "/p2p" prefix for p2p queries if len(path) < 4 { return nil, errorsmod.Wrap(cometerrors.ErrUnknownRequest, "path should be p2p filter ") @@ -34,14 +34,14 @@ func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error return nil, errorsmod.Wrap(cometerrors.ErrUnknownRequest, "expected second parameter to be 'filter'") } -// handlerQueryApp handles the query requests for the application. +// handleQueryApp handles the query requests for the application. // It expects the path parameter to have at least two elements. // The second element of the path can be either 'simulate' or 'version'. // If the second element is 'simulate', it decodes the request data into a transaction, // simulates the transaction using the application, and returns the simulation result. // If the second element is 'version', it returns the version of the application. // If the second element is neither 'simulate' nor 'version', it returns an error indicating an unknown query. -func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) { +func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) { if len(path) < 2 { return nil, errorsmod.Wrap( cometerrors.ErrUnknownRequest, @@ -83,7 +83,7 @@ func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req * return nil, errorsmod.Wrapf(cometerrors.ErrUnknownRequest, "unknown query: %s", path) } -func (c *Consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) { +func (c *consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) { req.Path = "/" + strings.Join(path[1:], "/") if req.Height <= 1 && req.Prove { return nil, errorsmod.Wrap( diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 58eb7b44257..e844df3c80c 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -7,21 +7,27 @@ import ( "fmt" "os" "path/filepath" + "sync" + "sync/atomic" abciserver "github.com/cometbft/cometbft/abci/server" + abci "github.com/cometbft/cometbft/abci/types" cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands" cmtcfg "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/node" "github.com/cometbft/cometbft/p2p" pvm "github.com/cometbft/cometbft/privval" "github.com/cometbft/cometbft/proxy" + gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/cobra" "github.com/spf13/pflag" + "google.golang.org/grpc" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" "cosmossdk.io/log" + "cosmossdk.io/schema/appdata" "cosmossdk.io/schema/decoding" "cosmossdk.io/schema/indexer" serverv2 "cosmossdk.io/server/v2" @@ -31,6 +37,8 @@ import ( "cosmossdk.io/server/v2/cometbft/types" "cosmossdk.io/store/v2/snapshots" + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" ) @@ -44,32 +52,39 @@ var ( type CometBFTServer[T transaction.Tx] struct { Node *node.Node - Consensus *Consensus[T] + Consensus abci.Application - initTxCodec transaction.Codec[T] logger log.Logger serverOptions ServerOptions[T] config Config cfgOptions []CfgOption + + app appmanager.AppManager[T] + txCodec transaction.Codec[T] + store types.Store } func New[T transaction.Tx]( logger log.Logger, appName string, store types.Store, - appManager appmanager.AppManager[T], + app appmanager.AppManager[T], + appCodec codec.Codec, + txCodec transaction.Codec[T], queryHandlers map[string]appmodulev2.Handler, decoderResolver decoding.DecoderResolver, - txCodec transaction.Codec[T], - cfg server.ConfigMap, serverOptions ServerOptions[T], + cfg server.ConfigMap, cfgOptions ...CfgOption, ) (*CometBFTServer[T], error) { srv := &CometBFTServer[T]{ - initTxCodec: txCodec, serverOptions: serverOptions, cfgOptions: cfgOptions, + app: app, + txCodec: txCodec, + store: store, } + srv.logger = logger.With(log.ModuleKey, srv.Name()) home, _ := cfg[serverv2.FlagHome].(string) @@ -111,27 +126,6 @@ func New[T transaction.Tx]( indexEvents[e] = struct{}{} } - srv.logger = logger.With(log.ModuleKey, srv.Name()) - consensus := NewConsensus( - logger, - appName, - appManager, - srv.serverOptions.Mempool(cfg), - indexEvents, - queryHandlers, - store, - srv.config, - srv.initTxCodec, - chainID, - ) - consensus.prepareProposalHandler = srv.serverOptions.PrepareProposalHandler - consensus.processProposalHandler = srv.serverOptions.ProcessProposalHandler - consensus.checkTxHandler = srv.serverOptions.CheckTxHandler - consensus.verifyVoteExt = srv.serverOptions.VerifyVoteExtensionHandler - consensus.extendVote = srv.serverOptions.ExtendVoteHandler - consensus.addrPeerFilter = srv.serverOptions.AddrPeerFilter - consensus.idPeerFilter = srv.serverOptions.IdPeerFilter - ss := store.GetStateStorage().(snapshots.StorageSnapshotter) sc := store.GetStateCommitment().(snapshots.CommitSnapshotter) @@ -139,14 +133,11 @@ func New[T transaction.Tx]( if err != nil { return nil, err } - consensus.snapshotManager = snapshots.NewManager( - snapshotStore, srv.serverOptions.SnapshotOptions(cfg), sc, ss, nil, logger) - - srv.Consensus = consensus // initialize the indexer + var listener *appdata.Listener if indexerCfg := srv.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 { - listener, err := indexer.StartIndexing(indexer.IndexingOptions{ + indexingTarget, err := indexer.StartIndexing(indexer.IndexingOptions{ Config: indexerCfg, Resolver: decoderResolver, Logger: logger.With(log.ModuleKey, "indexer"), @@ -154,7 +145,51 @@ func New[T transaction.Tx]( if err != nil { return nil, fmt.Errorf("failed to start indexing: %w", err) } - consensus.listener = &listener.Listener + + listener = &indexingTarget.Listener + } + + // snapshot manager + snapshotManager := snapshots.NewManager( + snapshotStore, + srv.serverOptions.SnapshotOptions(cfg), + sc, + ss, + nil, // extensions snapshotter registered below + logger, + ) + if exts := serverOptions.SnapshotExtensions; len(exts) > 0 { + if err := snapshotManager.RegisterExtensions(serverOptions.SnapshotExtensions...); err != nil { + return nil, fmt.Errorf("failed to register snapshot extensions: %w", err) + } + } + + srv.Consensus = &consensus[T]{ + appName: appName, + version: getCometBFTServerVersion(), + app: app, + cfg: srv.config, + store: store, + logger: logger, + txCodec: txCodec, + appCodec: appCodec, + listener: listener, + snapshotManager: snapshotManager, + streamingManager: srv.serverOptions.StreamingManager, + mempool: srv.serverOptions.Mempool(cfg), + lastCommittedHeight: atomic.Int64{}, + prepareProposalHandler: srv.serverOptions.PrepareProposalHandler, + processProposalHandler: srv.serverOptions.ProcessProposalHandler, + verifyVoteExt: srv.serverOptions.VerifyVoteExtensionHandler, + checkTxHandler: srv.serverOptions.CheckTxHandler, + extendVote: srv.serverOptions.ExtendVoteHandler, + chainID: chainID, + indexedEvents: indexEvents, + initialHeight: 0, + queryHandlersMap: queryHandlers, + getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), + addrPeerFilter: srv.serverOptions.AddrPeerFilter, + idPeerFilter: srv.serverOptions.IdPeerFilter, } return srv, nil @@ -333,3 +368,13 @@ func (s *CometBFTServer[T]) WriteCustomConfigAt(configPath string) error { cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cfg.ConfigTomlConfig) return nil } + +// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service +// Those services are defined for backward compatibility. +// Eventually, they will be removed in favor of the new gRPC services. +func (s *CometBFTServer[T]) GRPCServiceRegistrar( + clientCtx client.Context, + cfg server.ConfigMap, +) func(srv *grpc.Server) error { + return gRPCServiceRegistrar[T](clientCtx, cfg, s.Config().(*AppTomlConfig), s.txCodec, s.Consensus, s.app) +} diff --git a/server/v2/cometbft/snapshots.go b/server/v2/cometbft/snapshots.go index 5534712864d..e47312a469d 100644 --- a/server/v2/cometbft/snapshots.go +++ b/server/v2/cometbft/snapshots.go @@ -34,7 +34,7 @@ func GetSnapshotStore(rootDir string) (*snapshots.Store, error) { } // ApplySnapshotChunk implements types.Application. -func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) { +func (c *consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) { if c.snapshotManager == nil { c.logger.Error("snapshot manager not configured") return &abci.ApplySnapshotChunkResponse{Result: abci.APPLY_SNAPSHOT_CHUNK_RESULT_ABORT}, nil @@ -65,7 +65,7 @@ func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnap } // ListSnapshots implements types.Application. -func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) { +func (c *consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) { if c.snapshotManager == nil { return nil, nil } @@ -91,7 +91,7 @@ func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsR } // LoadSnapshotChunk implements types.Application. -func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) { +func (c *consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) { if c.snapshotManager == nil { return &abci.LoadSnapshotChunkResponse{}, nil } @@ -112,7 +112,7 @@ func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapsh } // OfferSnapshot implements types.Application. -func (c *Consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) { +func (c *consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) { if c.snapshotManager == nil { c.logger.Error("snapshot manager not configured") return &abci.OfferSnapshotResponse{Result: abci.OFFER_SNAPSHOT_RESULT_ABORT}, nil diff --git a/server/v2/cometbft/streaming.go b/server/v2/cometbft/streaming.go index 65f52a002af..c9ca7fddcc7 100644 --- a/server/v2/cometbft/streaming.go +++ b/server/v2/cometbft/streaming.go @@ -12,7 +12,7 @@ import ( ) // streamDeliverBlockChanges will stream all the changes happened during deliver block. -func (c *Consensus[T]) streamDeliverBlockChanges( +func (c *consensus[T]) streamDeliverBlockChanges( ctx context.Context, height int64, txs [][]byte, @@ -40,7 +40,7 @@ func (c *Consensus[T]) streamDeliverBlockChanges( } } - for _, streamingListener := range c.streaming.Listeners { + for _, streamingListener := range c.streamingManager.Listeners { events, err := streaming.IntoStreamingEvents(events) if err != nil { return err diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 2b1052bd195..09b32c3d00b 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -19,7 +19,7 @@ import ( "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" errorsmod "cosmossdk.io/errors/v2" - consensus "cosmossdk.io/x/consensus/types" + "cosmossdk.io/x/consensus/types" sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -268,7 +268,7 @@ func QueryResult(err error, debug bool) *abci.QueryResponse { } } -func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error { +func (c *consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error { if req.Height < 1 { return fmt.Errorf("invalid height: %d", req.Height) } @@ -302,18 +302,18 @@ func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockReques // GetConsensusParams makes a query to the consensus module in order to get the latest consensus // parameters from committed state -func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) { +func (c *consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) { latestVersion, err := c.store.GetLatestVersion() if err != nil { return nil, err } - res, err := c.app.Query(ctx, latestVersion, &consensus.QueryParamsRequest{}) + res, err := c.app.Query(ctx, latestVersion, &types.QueryParamsRequest{}) if err != nil { return nil, err } - if r, ok := res.(*consensus.QueryParamsResponse); !ok { + if r, ok := res.(*types.QueryParamsResponse); !ok { return nil, errors.New("failed to query consensus params") } else { // convert our params to cometbft params @@ -321,7 +321,7 @@ func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.Consen } } -func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 { +func (c *consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 { // pruning is disabled if minRetainBlocks is zero if c.cfg.AppTomlConfig.MinRetainBlocks == 0 { return 0 @@ -376,7 +376,7 @@ func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, com } // checkHalt checks if height or time exceeds halt-height or halt-time respectively. -func (c *Consensus[T]) checkHalt(height int64, time time.Time) error { +func (c *consensus[T]) checkHalt(height int64, time time.Time) error { var halt bool switch { case c.cfg.AppTomlConfig.HaltHeight > 0 && uint64(height) >= c.cfg.AppTomlConfig.HaltHeight: diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 64c91f85c12..24fad3eaa53 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -40,7 +40,7 @@ type CommandDependencies[T transaction.Tx] struct { TxConfig client.TxConfig ModuleManager *runtimev2.MM[T] SimApp *simapp.SimApp[T] - // could be more generic with serverv2.ServerComponent[T] + // could generally be more generic with serverv2.ServerComponent[T] // however, we want to register extra grpc handlers ConsensusServer *cometbft.CometBFTServer[T] ClientContext client.Context @@ -106,11 +106,12 @@ func InitRootCmd[T transaction.Tx]( simApp.Name(), simApp.Store(), simApp.App.AppManager, + simApp.AppCodec(), + &genericTxDecoder[T]{deps.TxConfig}, simApp.App.QueryHandlers(), simApp.App.SchemaDecoderResolver(), - &genericTxDecoder[T]{deps.TxConfig}, - deps.GlobalConfig, initCometOptions[T](), + deps.GlobalConfig, ) if err != nil { return nil, err @@ -129,7 +130,7 @@ func InitRootCmd[T transaction.Tx]( simApp.Query, deps.GlobalConfig, grpcserver.WithExtraGRPCHandlers[T]( - deps.ConsensusServer.Consensus.GRPCServiceRegistrar( + deps.ConsensusServer.GRPCServiceRegistrar( deps.ClientContext, deps.GlobalConfig, ),