From 80918462a1a2ad888c17bb531e7f58f813f6c153 Mon Sep 17 00:00:00 2001 From: Marko Date: Thu, 30 Sep 2021 08:17:33 +0000 Subject: [PATCH] fix!: remove grpc query routing through tendermint (#10045) ## Description Revert routing queries through tendermint. The reason this change was made is because of this error: ``` fatal error: concurrent map read and map write ``` The person who identified this error submitted these steps to reproduce ``` User sends a query with grpc Tendermint Commit new blocks and tries to Commit IAVL (which causing IAVL versions map to change) At the same time query tries to read from the same map (iavl.(*MutableTree).VersionExists) to check if requested version is exists Node exits with fatal error: concurrent map read and map write ``` With the recent changes to IAVL submitted by terra (cc @YunSuk-Yeo) the reason for why we need to route through tendermint is no longer present. We should revert it when 0.17.1 of IAVL is cut, which will be later today. --- ### Author Checklist *All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues.* I have... - [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] added `!` to the type prefix if API or client breaking change - [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [ ] provided a link to the relevant issue or specification - [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [ ] added a changelog entry to `CHANGELOG.md` - [ ] included comments for [documenting Go code](https://blog.golang.org/godoc) - [ ] updated the relevant documentation or specification - [ ] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist *All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items.* I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed `!` in the type prefix if API or client breaking change - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable) --- CHANGELOG.md | 1 + baseapp/grpcrouter.go | 34 +--------------- baseapp/grpcserver.go | 82 +++++++++++++++++-------------------- client/grpc_query.go | 83 +++++++++++++++----------------------- server/grpc/server.go | 2 +- server/grpc/server_test.go | 6 +-- server/types/app.go | 2 +- 7 files changed, 77 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58bd34e82b98..a3de9c2736d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#9533](https://github.com/cosmos/cosmos-sdk/pull/9533) Added a new gRPC method, `DenomOwners`, in `x/bank` to query for all account holders of a specific denomination. * (bank) [\#9618](https://github.com/cosmos/cosmos-sdk/pull/9618) Update bank.Metadata: add URI and URIHash attributes. * [\#9837](https://github.com/cosmos/cosmos-sdk/issues/9837) `--generate-only` flag will accept the keyname now. +* [\#10045](https://github.com/cosmos/cosmos-sdk/pull/10045) Revert [#8549](https://github.com/cosmos/cosmos-sdk/pull/8549). Do not route grpc queries through Tendermint. ### API Breaking Changes diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 3570648d48ab..9c15b695176c 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -2,7 +2,6 @@ package baseapp import ( "fmt" - "reflect" "github.com/cosmos/cosmos-sdk/client/grpc/reflection" @@ -14,19 +13,13 @@ import ( codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) var protoCodec = encoding.GetCodec(proto.Name) // GRPCQueryRouter routes ABCI Query requests to GRPC handlers type GRPCQueryRouter struct { - routes map[string]GRPCQueryHandler - // returnTypes is a map of FQ method name => its return type. It is used - // for cache purposes: the first time a method handler is run, we save its - // return type in this map. Then, on subsequent method handler calls, we - // decode the ABCI response bytes using the cached return type. - returnTypes map[string]reflect.Type + routes map[string]GRPCQueryHandler interfaceRegistry codectypes.InterfaceRegistry serviceData []serviceData } @@ -42,8 +35,7 @@ var _ gogogrpc.Server = &GRPCQueryRouter{} // NewGRPCQueryRouter creates a new GRPCQueryRouter func NewGRPCQueryRouter() *GRPCQueryRouter { return &GRPCQueryRouter{ - returnTypes: map[string]reflect.Type{}, - routes: map[string]GRPCQueryHandler{}, + routes: map[string]GRPCQueryHandler{}, } } @@ -98,17 +90,8 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf if qrt.interfaceRegistry != nil { return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry) } - return nil }, nil) - - // If it's the first time we call this handler, then we save - // the return type of the handler in the `returnTypes` map. - // The return type will be used for decoding subsequent requests. - if _, found := qrt.returnTypes[fqName]; !found { - qrt.returnTypes[fqName] = reflect.TypeOf(res) - } - if err != nil { return abci.ResponseQuery{}, err } @@ -144,16 +127,3 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In reflection.NewReflectionServiceServer(interfaceRegistry), ) } - -// returnTypeOf returns the return type of a gRPC method handler. With the way the -// `returnTypes` cache map is set up, the return type of a method handler is -// guaranteed to be found if it's retrieved **after** the method handler ran at -// least once. If not, then a logic error is return. -func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) { - returnType, found := qrt.returnTypes[method] - if !found { - return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method) - } - - return returnType, nil -} diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index c1db08a555aa..68cc14e66545 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -2,78 +2,68 @@ package baseapp import ( "context" - "reflect" + "strconv" gogogrpc "github.com/gogo/protobuf/grpc" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" - "github.com/cosmos/cosmos-sdk/client" + sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" - "github.com/cosmos/cosmos-sdk/types/tx" + grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" ) // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } // RegisterGRPCServer registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) { - // Define an interceptor for all gRPC queries: this interceptor will route - // the query through the `clientCtx`, which itself queries Tendermint. - interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) { - // Two things can happen here: - // 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly, - // 2. or we are querying for state, in which case we call ABCI's Query. +func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { + // Define an interceptor for all gRPC queries: this interceptor will create + // a new sdk.Context, and pass it into the query handler. + interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + // If there's some metadata in the context, retrieve it. + md, ok := metadata.FromIncomingContext(grpcCtx) + if !ok { + return nil, status.Error(codes.Internal, "unable to retrieve metadata") + } - // Case 1. Broadcasting a Tx. - if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { - if !ok { - return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) + // Get height header from the request context, if present. + var height int64 + if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) == 1 { + height, err = strconv.ParseInt(heightHeaders[0], 10, 64) + if err != nil { + return nil, sdkerrors.Wrapf( + sdkerrors.ErrInvalidRequest, + "Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err) + } + if err := checkNegativeHeight(height); err != nil { + return nil, err } - - return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto) } - // Case 2. Querying state. - inMd, _ := metadata.FromIncomingContext(grpcCtx) - abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd) + // Create the sdk.Context. Passing false as 2nd arg, as we can't + // actually support proofs with gRPC right now. + sdkCtx, err := app.createQueryContext(height, false) if err != nil { return nil, err } - // We need to know the return type of the grpc method for - // unmarshalling abciRes.Value. - // - // When we call each method handler for the first time, we save its - // return type in the `returnTypes` map (see the method handler in - // `grpcrouter.go`). By this time, the method handler has already run - // at least once (in the RunGRPCQuery call), so we're sure the - // returnType maps is populated for this method. We're retrieving it - // for decoding. - returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod) - if err != nil { - return nil, err + // Add relevant gRPC headers + if height == 0 { + height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest } - // returnType is a pointer to a struct. Here, we're creating res which - // is a new pointer to the underlying struct. - res := reflect.New(returnType.Elem()).Interface() - - err = protoCodec.Unmarshal(abciRes.Value, res) - if err != nil { - return nil, err - } + // Attach the sdk.Context into the gRPC's context.Context. + grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) - // Send the metadata header back. The metadata currently includes: - // - block height. - err = grpc.SendHeader(grpcCtx, outMd) - if err != nil { - return nil, err - } + md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10)) + grpc.SetHeader(grpcCtx, md) - return res, nil + return handler(grpcCtx, req) } // Loop through all services and methods, add the interceptor, and register diff --git a/client/grpc_query.go b/client/grpc_query.go index cbaba73caa2a..597b82985c22 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -29,17 +29,14 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i // 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly, // 2. or we are querying for state, in which case we call ABCI's Query. - // In both cases, we don't allow empty request req (it will panic unexpectedly). + // In both cases, we don't allow empty request args (it will panic unexpectedly). if reflect.ValueOf(req).IsNil() { return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil") } // Case 1. Broadcasting a Tx. if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { - if !ok { - return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) - } - resProto, ok := reply.(*tx.BroadcastTxResponse) + res, ok := reply.(*tx.BroadcastTxResponse) if !ok { return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req) } @@ -48,62 +45,26 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i if err != nil { return err } - *resProto = *broadcastRes + *res = *broadcastRes return err } // Case 2. Querying state. - inMd, _ := metadata.FromOutgoingContext(grpcCtx) - abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd) - if err != nil { - return err - } - - err = protoCodec.Unmarshal(abciRes.Value, reply) - if err != nil { - return err - } - - for _, callOpt := range opts { - header, ok := callOpt.(grpc.HeaderCallOption) - if !ok { - continue - } - - *header.HeaderAddr = outMd - } - - if ctx.InterfaceRegistry != nil { - return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) - } - - return nil -} - -// NewStream implements the grpc ClientConn.NewStream method -func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, fmt.Errorf("streaming rpc not supported") -} - -// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary -// arguments for the gRPC method, and returns the ABCI response. It is used -// to factorize code between client (Invoke) and server (RegisterGRPCServer) -// gRPC handlers. -func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) { reqBz, err := protoCodec.Marshal(req) if err != nil { - return abci.ResponseQuery{}, nil, err + return err } // parse height header + md, _ := metadata.FromOutgoingContext(grpcCtx) if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { height, err := strconv.ParseInt(heights[0], 10, 64) if err != nil { - return abci.ResponseQuery{}, nil, err + return err } if height < 0 { - return abci.ResponseQuery{}, nil, sdkerrors.Wrapf( + return sdkerrors.Wrapf( sdkerrors.ErrInvalidRequest, "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) } @@ -117,9 +78,14 @@ func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req int Height: ctx.Height, } - abciRes, err := ctx.QueryABCI(abciReq) + res, err := ctx.QueryABCI(abciReq) + if err != nil { + return err + } + + err = protoCodec.Unmarshal(res.Value, reply) if err != nil { - return abci.ResponseQuery{}, nil, err + return err } // Create header metadata. For now the headers contain: @@ -127,7 +93,24 @@ func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req int // We then parse all the call options, if the call option is a // HeaderCallOption, then we manually set the value of that header to the // metadata. - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10)) + md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) + for _, callOpt := range opts { + header, ok := callOpt.(grpc.HeaderCallOption) + if !ok { + continue + } + + *header.HeaderAddr = md + } + + if ctx.InterfaceRegistry != nil { + return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) + } - return abciRes, md, nil + return nil +} + +// NewStream implements the grpc ClientConn.NewStream method +func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, fmt.Errorf("streaming rpc not supported") } diff --git a/server/grpc/server.go b/server/grpc/server.go index 40a3c7716d97..0b41a57cd323 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -17,7 +17,7 @@ import ( // StartGRPCServer starts a gRPC server on the given address. func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) { grpcSrv := grpc.NewServer() - app.RegisterGRPCServer(clientCtx, grpcSrv) + app.RegisterGRPCServer(grpcSrv) // reflection allows consumers to build dynamic clients that can write // to any cosmos-sdk application without relying on application packages at compile time err := reflection.Register(grpcSrv, reflection.Config{ diff --git a/server/grpc/server_test.go b/server/grpc/server_test.go index 3a4afd45b238..8ea293a9169c 100644 --- a/server/grpc/server_test.go +++ b/server/grpc/server_test.go @@ -106,7 +106,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() { ) s.Require().NoError(err) blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader) - s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height. + s.Require().Equal([]string{"1"}, blockHeight) } func (s *IntegrationTestSuite) TestGRPCServer_Reflection() { @@ -201,9 +201,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() { value string wantErr string }{ - {"-1", "\"x-cosmos-block-height\" must be >= 0"}, + {"-1", "height < 0"}, {"9223372036854775808", "value out of range"}, // > max(int64) by 1 - {"-10", "\"x-cosmos-block-height\" must be >= 0"}, + {"-10", "height < 0"}, {"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64) {"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64 } diff --git a/server/types/app.go b/server/types/app.go index cf6b6ad9a1ff..467f627c605f 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -43,7 +43,7 @@ type ( // RegisterGRPCServer registers gRPC services directly with the gRPC // server. - RegisterGRPCServer(client.Context, grpc.Server) + RegisterGRPCServer(grpc.Server) // RegisterTxService registers the gRPC Query service for tx (such as tx // simulation, fetching txs by hash...).