Skip to content

Commit

Permalink
refactor: cleanup server logic (backport #15041)
Browse files Browse the repository at this point in the history
cleanup for easier review
  • Loading branch information
alexanderbez authored and yihuang committed May 18, 2023
1 parent 329d7d8 commit b19f0dc
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 191 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (x/staking) [#16068](https://github.com/cosmos/cosmos-sdk/pull/16068) Update simulation to allow non-EOA accounts to stake.
* (server) [#16142](https://github.com/cosmos/cosmos-sdk/pull/16142) Remove JSON Indentation from the GRPC to REST gateway's responses. (Saving bandwidth)
* (types) [#16145](https://github.com/cosmos/cosmos-sdk/pull/16145) Rename interface `ExtensionOptionI` back to `TxExtensionOptionI` to avoid breaking change.
* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready.

### Bug Fixes

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ require (
github.com/tendermint/go-amino v0.16.0
github.com/tidwall/btree v1.6.0
golang.org/x/crypto v0.7.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 h1:LGJsf5LRplCck6jUCH3dBL2dmycNruWNF5xugkSlfXw=
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -1160,6 +1160,7 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
46 changes: 36 additions & 10 deletions server/api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -47,6 +48,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case grpctypes.GRPCBlockHeightHeader:
return grpctypes.GRPCBlockHeightHeader, true

default:
return runtime.DefaultHeaderMatcher(key)
}
Expand Down Expand Up @@ -83,9 +85,12 @@ func New(clientCtx client.Context, logger log.Logger) *Server {

// Start starts the API server. Internally, the API server leverages Tendermint's
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the Tendermint JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
// and are delegated to the Tendermint JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
s.mtx.Lock()

tmCfg := tmrpcserver.DefaultConfig()
Expand All @@ -102,17 +107,38 @@ func (s *Server) Start(cfg config.Config) error {

s.registerGRPCGatewayRoutes()
s.listener = listener
var h http.Handler = s.Router

s.mtx.Unlock()

if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg)
}
errCh := make(chan error)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.API.Address)

s.logger.Info("starting API server...")
return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
}
}(cfg.API.EnableUnsafeCORS)

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.API.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
}
}

// Close closes the API server.
Expand Down
20 changes: 15 additions & 5 deletions server/grpc/grpc_web.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package grpc

import (
"context"
"fmt"
"net/http"
"time"

"github.com/cometbft/cometbft/libs/log"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/server/types"
)

// StartGRPCWeb starts a gRPC-Web server on the given address.
func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, error) {
func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.Config) error {
var options []grpcweb.Option
if config.GRPCWeb.EnableUnsafeCORS {
options = append(options,
Expand All @@ -32,15 +33,24 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err

errCh := make(chan error)
go func() {
logger.Info("starting gRPC web server...", "address", config.GRPCWeb.Address)
if err := grpcWebSrv.ListenAndServe(); err != nil {
errCh <- fmt.Errorf("[grpc] failed to serve: %w", err)
}
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC-web server.
logger.Info("stopping gRPC web server...", "address", config.GRPCWeb.Address)
grpcWebSrv.Close()
return nil

case err := <-errCh:
return nil, err
case <-time.After(types.ServerStartTime): // assume server started successfully
return grpcWebSrv, nil
logger.Error("failed to start gRPC Web server", "err", err)
return err
}
}
49 changes: 35 additions & 14 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"github.com/cometbft/cometbft/libs/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -17,8 +18,9 @@ import (
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -46,39 +48,58 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
modes[m.String()] = (int32)(m)
}

return modes
}(),
ChainID: clientCtx.ChainID,
SdkConfig: sdk.GetConfig(),
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to register reflection service: %w", err)
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case err := <-errCh:
return nil, err
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()

return nil

case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
case err := <-errCh:
logger.Error("failed to start gRPC server", "err", err)
return err
}
}
Loading

0 comments on commit b19f0dc

Please sign in to comment.