From 00112005a413b87700795d89e4ab01694e62ec8b Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 17 Dec 2021 17:26:41 +0800 Subject: [PATCH] Add client_interceptor Signed-off-by: mornyx --- internal/client/client_collapse.go | 10 ------- internal/client/client_interceptor.go | 43 +++++++++++++++++++++++++++ tikv/kv.go | 2 +- tikvrpc/interceptor/interceptor.go | 6 ++-- 4 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 internal/client/client_interceptor.go diff --git a/internal/client/client_collapse.go b/internal/client/client_collapse.go index 219aac398a..11ea8b5beb 100644 --- a/internal/client/client_collapse.go +++ b/internal/client/client_collapse.go @@ -42,7 +42,6 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "golang.org/x/sync/singleflight" ) @@ -66,15 +65,6 @@ func (r reqCollapse) Close() error { } func (r reqCollapse) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { - return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - return r.sendRequest(ctx, target, req, timeout) - })(addr, req) - } - return r.sendRequest(ctx, addr, req, timeout) -} - -func (r reqCollapse) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if r.Client == nil { panic("client should not be nil") } diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go new file mode 100644 index 0000000000..36f2f1c758 --- /dev/null +++ b/internal/client/client_interceptor.go @@ -0,0 +1,43 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "time" + + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +var _ Client = interceptedClient{} + +type interceptedClient struct { + Client +} + +// NewInterceptedClient creates a Client which can execute interceptor. +func NewInterceptedClient(client Client) Client { + return interceptedClient{client} +} + +func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return r.Client.SendRequest(ctx, target, req, timeout) + })(addr, req) + } + return r.Client.SendRequest(ctx, addr, req, timeout) +} diff --git a/tikv/kv.go b/tikv/kv.go index 2cb3e9eaf0..caf84592f3 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -180,7 +180,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl ctx: ctx, cancel: cancel, } - store.clientMu.client = client.NewReqCollapse(tikvclient) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.lockResolver = txnlock.NewLockResolver(store) store.wg.Add(2) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index 9a210b1be3..23c5164bc8 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -59,9 +59,9 @@ import ( // ``` // // NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. -// This is because there may be some exceptions, such as: request collapsed, request batched, -// no valid connection etc. If you have questions about the execution location of RPCInterceptor, -// please refer to: internal/client/client_collapse.go#SendRequest. +// This is because there may be some exceptions, such as: request batched, no +// valid connection etc. If you have questions about the execution location of +// RPCInterceptor, please refer to: internal/client/client_collapse.go#SendRequest. type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc // RPCInterceptorFunc is a callable function used to initiate a request to TiKV.