Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server/v2/cometbft): add codec.Codec and clean-up APIs #22566

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 23 additions & 71 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
Expand All @@ -37,6 +33,11 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"

"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)

const (
Expand All @@ -45,22 +46,24 @@ const (
QueryPathStore = "store"
)

var _ abci.Application = (*Consensus[transaction.Tx])(nil)
var _ abci.Application = (*consensus[transaction.Tx])(nil)

type Consensus[T transaction.Tx] struct {
// consensus contains the implementation of the ABCI interface for CometBFT.
type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
indexedEvents map[string]struct{}
chainID string
indexedEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand All @@ -81,60 +84,9 @@ type Consensus[T transaction.Tx] struct {
getProtoRegistry func() (*protoregistry.Files, error)
}

func NewConsensus[T transaction.Tx](
logger log.Logger,
appName string,
app appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
queryHandlersMap map[string]appmodulev2.Handler,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
chainId string,
) *Consensus[T] {
return &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: cfg,
store: store,
logger: logger,
txCodec: txCodec,
streaming: streaming.Manager{},
snapshotManager: nil,
mempool: mp,
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: nil,
processProposalHandler: nil,
verifyVoteExt: nil,
extendVote: nil,
chainID: chainId,
indexedEvents: indexedEvents,
initialHeight: 0,
queryHandlersMap: queryHandlersMap,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
}
}

// SetStreamingManager sets the streaming manager for the consensus module.
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
return fmt.Errorf("failed to register snapshot extensions: %w", err)
}

return nil
}

// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
}

// Info implements types.Application.
func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
version, _, err := c.store.StateLatest()
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
if isGRPC {
return resp, err
Expand Down Expand Up @@ -246,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, nil
}

func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
Expand Down Expand Up @@ -288,7 +240,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq

txResult, _, err := c.app.Simulate(ctx, tx)
if err != nil {
return nil, true, fmt.Errorf("%v with gas used: '%d'", err, txResult.GasUsed)
return nil, true, fmt.Errorf("%w with gas used: '%d'", err, txResult.GasUsed)
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}

msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp))
Expand Down Expand Up @@ -337,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
}

// InitChain implements types.Application.
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)

// store chainID to be used later on in execution
Expand Down Expand Up @@ -421,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

// PrepareProposal implements types.Application.
// It is called by cometbft to prepare a proposal block.
func (c *Consensus[T]) PrepareProposal(
func (c *consensus[T]) PrepareProposal(
ctx context.Context,
req *abciproto.PrepareProposalRequest,
) (resp *abciproto.PrepareProposalResponse, err error) {
Expand Down Expand Up @@ -457,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal(

// ProcessProposal implements types.Application.
// It is called by cometbft to process/verify a proposal block.
func (c *Consensus[T]) ProcessProposal(
func (c *consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
Expand Down Expand Up @@ -491,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal(

// FinalizeBlock implements types.Application.
// It is called by cometbft to finalize a block.
func (c *Consensus[T]) FinalizeBlock(
func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
Expand Down Expand Up @@ -581,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock(

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
lastCommittedHeight := c.lastCommittedHeight.Load()

c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight)
Expand All @@ -599,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (
// Vote extensions

// VerifyVoteExtension implements types.Application.
func (c *Consensus[T]) VerifyVoteExtension(
func (c *consensus[T]) VerifyVoteExtension(
ctx context.Context,
req *abciproto.VerifyVoteExtensionRequest,
) (*abciproto.VerifyVoteExtensionResponse, error) {
Expand Down Expand Up @@ -641,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension(
}

// ExtendVote implements types.Application.
func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
// If vote extensions are not enabled, as a safety precaution, we return an
// error.
cp, err := c.GetConsensusParams(ctx)
Expand Down
17 changes: 13 additions & 4 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -637,7 +638,7 @@ func TestConsensus_Query(t *testing.T) {
require.Equal(t, res.Value, []byte(nil))
}

func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] {
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] {
t.Helper()

msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) {
Expand Down Expand Up @@ -699,9 +700,17 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
nil,
)

return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am,
mempool, map[string]struct{}{}, nil, mockStore,
Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
chainID: "test",
getProtoRegistry: sync.OnceValues(proto.MergedRegistry),
}
}

// Check target version same with store's latest version
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *CometBFTServer[T]) BootstrapStateCmd() *cobra.Command {
return err
}
if height == 0 {
height, err = s.Consensus.store.GetLatestVersion()
height, err = s.store.GetLatestVersion()
if err != nil {
return err
}
Expand Down
38 changes: 25 additions & 13 deletions server/v2/cometbft/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package cometbft
import (
"context"

v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1"
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -12,6 +13,7 @@ import (
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
cmtv1beta1 "cosmossdk.io/api/cosmos/base/tendermint/v1beta1"
"cosmossdk.io/core/server"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"

Expand All @@ -23,17 +25,25 @@ import (
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)

// GRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
type appSimulator[T transaction.Tx] interface {
Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error)
}

// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
// Those services are defined for backward compatibility.
// Eventually, they will be removed in favor of the new gRPC services.
func (c *Consensus[T]) GRPCServiceRegistrar(
func gRPCServiceRegistrar[T transaction.Tx](
clientCtx client.Context,
cfg server.ConfigMap,
cometBFTAppConfig *AppTomlConfig,
txCodec transaction.Codec[T],
consensus abci.Application,
app appSimulator[T],
) func(srv *grpc.Server) error {
return func(srv *grpc.Server) error {
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, c.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, c})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, c})
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus})

return nil
}
Expand Down Expand Up @@ -86,7 +96,8 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{

type txServer[T transaction.Tx] struct {
clientCtx client.Context
consensus *Consensus[T]
txCodec transaction.Codec[T]
app appSimulator[T]
}

// BroadcastTx implements tx.ServiceServer.
Expand Down Expand Up @@ -132,12 +143,12 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
return nil, status.Errorf(codes.InvalidArgument, "empty txBytes is not allowed")
}

tx, err := t.consensus.txCodec.Decode(txBytes)
tx, err := t.txCodec.Decode(txBytes)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}

txResult, _, err := t.consensus.app.Simulate(ctx, tx)
txResult, _, err := t.app.Simulate(ctx, tx)
if err != nil {
return nil, status.Errorf(codes.Unknown, "%v with gas used: '%d'", err, txResult.GasUsed)
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -186,8 +197,9 @@ func (t txServer[T]) TxEncodeAmino(context.Context, *txtypes.TxEncodeAminoReques
var _ txtypes.ServiceServer = txServer[transaction.Tx]{}

type nodeServer[T transaction.Tx] struct {
cfg server.ConfigMap
consensus *Consensus[T]
cfg server.ConfigMap
cometBFTAppConfig *AppTomlConfig
consensus abci.Application
}

func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) (*nodeservice.ConfigResponse, error) {
Expand All @@ -201,12 +213,12 @@ func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest)
MinimumGasPrice: minGasPricesStr,
PruningKeepRecent: "ambiguous in v2",
PruningInterval: "ambiguous in v2",
HaltHeight: s.consensus.cfg.AppTomlConfig.HaltHeight,
HaltHeight: s.cometBFTAppConfig.HaltHeight,
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

func (s nodeServer[T]) Status(ctx context.Context, _ *nodeservice.StatusRequest) (*nodeservice.StatusResponse, error) {
nodeInfo, err := s.consensus.Info(ctx, &v1.InfoRequest{})
nodeInfo, err := s.consensus.Info(ctx, &abciproto.InfoRequest{})
if err != nil {
return nil, err
}
Expand Down
11 changes: 10 additions & 1 deletion server/v2/cometbft/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/types"
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
)

Expand All @@ -23,8 +24,14 @@ type ServerOptions[T transaction.Tx] struct {
ExtendVoteHandler handlers.ExtendVoteHandler
KeygenF keyGenF

Mempool func(cfg map[string]any) mempool.Mempool[T]
// Set mempool for the consensus module.
Mempool func(cfg map[string]any) mempool.Mempool[T]
// Set streaming manager for the consensus module.
StreamingManager streaming.Manager
// Set snapshot options for the consensus module.
SnapshotOptions func(cfg map[string]any) snapshots.SnapshotOptions
// Allows additional snapshotter implementations to be used for creating and restoring snapshots.
SnapshotExtensions []snapshots.ExtensionSnapshotter

AddrPeerFilter types.PeerFilter // filter peers by address and port
IdPeerFilter types.PeerFilter // filter peers by node ID
Expand All @@ -40,7 +47,9 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] {
VerifyVoteExtensionHandler: handlers.NoOpVerifyVoteExtensionHandler(),
ExtendVoteHandler: handlers.NoOpExtendVote(),
Mempool: func(cfg map[string]any) mempool.Mempool[T] { return mempool.NoOpMempool[T]{} },
StreamingManager: streaming.Manager{},
SnapshotOptions: func(cfg map[string]any) snapshots.SnapshotOptions { return snapshots.NewSnapshotOptions(0, 0) },
SnapshotExtensions: []snapshots.ExtensionSnapshotter{},
AddrPeerFilter: nil,
IdPeerFilter: nil,
KeygenF: func() (cmtcrypto.PrivKey, error) { return cmted22519.GenPrivKey(), nil },
Expand Down
Loading
Loading