From a50e06668b15d0473049c6149941f9e22498b917 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 17 Jan 2024 22:30:51 -0500 Subject: [PATCH] Use cached client for CheckThrottler tabletmanager RPC Signed-off-by: Matt Lord --- go/vt/vttablet/grpctmclient/client.go | 45 ++++++++++++++++++--------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 0068ed74706..2d1152dd621 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -55,7 +55,7 @@ var ( ) func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App})") + fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler)") fs.StringVar(&cert, "tablet_manager_grpc_cert", cert, "the cert to use to connect") fs.StringVar(&key, "tablet_manager_grpc_key", key, "the key to use to connect") fs.StringVar(&ca, "tablet_manager_grpc_ca", ca, "the server ca to use to validate servers when connecting") @@ -94,10 +94,9 @@ type tmc struct { // grpcClient implements both dialer and poolDialer. type grpcClient struct { - // This cache of connections is to maximize QPS for ExecuteFetch. - // Note we'll keep the clients open and close them upon Close() only. - // But that's OK because usually the tasks that use them are - // one-purpose only. + // This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App} and + // CheckThrottler. Note we'll keep the clients open and close them upon Close() only. + // But that's OK because usually the tasks that use them are one-purpose only. // The map is protected by the mutex. mu sync.Mutex rpcClientMap map[string]chan *tmc @@ -115,16 +114,17 @@ type poolDialer interface { // Client implements tmclient.TabletManagerClient. // // Connections are produced by the dialer implementation, which is either the -// grpcClient implementation, which reuses connections only for ExecuteFetch and -// otherwise makes single-purpose connections that are closed after use. +// grpcClient implementation, which reuses connections only for ExecuteFetchAs{Dba,App} +// and CheckThrottler, otherwise making single-purpose connections that are closed +// after use. // // In order to more efficiently use the underlying tcp connections, you can // instead use the cachedConnDialer implementation by specifying // -// -tablet_manager_protocol "grpc-cached" +// --tablet_manager_protocol "grpc-cached" // -// The cachedConnDialer keeps connections to up to -tablet_manager_grpc_connpool_size distinct -// tablets open at any given time, for faster per-RPC call time, and less +// The cachedConnDialer keeps connections to up to --tablet_manager_grpc_connpool_size +// distinct tablets open at any given time, for faster per-RPC call time, and less // connection churn. type Client struct { dialer dialer @@ -1002,12 +1002,29 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req } // CheckThrottler is part of the tmclient.TabletManagerClient interface. +// It always uses a cached client and dialer pool as this is called very +// frequently between tablets when the throttler is enabled in a keyspace +// and the overhead of creating a new gRPC connection/channel and dialing +// the other tablet every time is not practical. func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { - c, closer, err := client.dialer.dial(ctx, tablet) - if err != nil { - return nil, err + var c tabletmanagerservicepb.TabletManagerClient + var err error + if poolDialer, ok := client.dialer.(poolDialer); ok { + c, err = poolDialer.dialPool(ctx, tablet) + if err != nil { + return nil, err + } } - defer closer.Close() + + if c == nil { + var closer io.Closer + c, closer, err = client.dialer.dial(ctx, tablet) + if err != nil { + return nil, err + } + defer closer.Close() + } + response, err := c.CheckThrottler(ctx, req) if err != nil { return nil, err