From ce6f5e1f0ddb4d45fcd10e601eb405ec803d4327 Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 3 Dec 2021 17:11:52 +0800 Subject: [PATCH 01/16] Add interceptor mechanism for tikv RPC Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 68 ++++++++++ internal/client/client.go | 11 ++ tikvrpc/interceptor.go | 186 ++++++++++++++++++++++++++ tikvrpc/interceptor_test.go | 59 ++++++++ txnkv/transaction/txn.go | 28 ++++ txnkv/txnsnapshot/scan.go | 6 + txnkv/txnsnapshot/snapshot.go | 23 ++++ 7 files changed, 381 insertions(+) create mode 100644 integration_tests/interceptor_test.go create mode 100644 tikvrpc/interceptor.go create mode 100644 tikvrpc/interceptor_test.go diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go new file mode 100644 index 0000000000..1ac5e70077 --- /dev/null +++ b/integration_tests/interceptor_test.go @@ -0,0 +1,68 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestInterceptor(t *testing.T) { + store := NewTestStore(t) + defer func() { + assert.NoError(t, store.Close()) + }() + store.SetTiKVClient(&mockRPCClient{store.GetTiKVClient()}) + manager := tikvrpc.MockInterceptorManager{} + + txn, err := store.Begin() + txn.SetInterceptor(manager.CreateMockInterceptor()) + assert.NoError(t, err) + err = txn.Set([]byte("KEY-1"), []byte("VALUE-1")) + assert.NoError(t, err) + err = txn.Commit(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) + manager.Reset() + + txn, err = store.Begin() + txn.SetInterceptor(manager.CreateMockInterceptor()) + assert.NoError(t, err) + value, err := txn.Get(context.Background(), []byte("KEY-1")) + assert.NoError(t, err) + assert.Equal(t, []byte("VALUE-1"), value) + assert.Equal(t, 1, manager.BeginCount()) + assert.Equal(t, 1, manager.EndCount()) + manager.Reset() +} + +type mockRPCClient struct { + tikv.Client +} + +func (c *mockRPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if interceptor := tikvrpc.GetInterceptorFromCtx(ctx); interceptor != nil { + return interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return c.Client.SendRequest(ctx, addr, req, timeout) + })(addr, req) + } + return c.Client.SendRequest(ctx, addr, req, timeout) +} diff --git a/internal/client/client.go b/internal/client/client.go index fb5eb2baea..d53d0ac8ae 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -368,7 +368,18 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time. } // SendRequest sends a Request to server and receives Response. +// If tikvrpc.Interceptor has been set in ctx, it will be used to wrap RPC action. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if interceptor := tikvrpc.GetInterceptorFromCtx(ctx); interceptor != nil { + return interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return c.sendRequest(ctx, target, req, timeout) + })(addr, req) + } + return c.sendRequest(ctx, addr, req, timeout) +} + +// sendRequest sends a Request to server and receives Response. +func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context())) defer span1.Finish() diff --git a/tikvrpc/interceptor.go b/tikvrpc/interceptor.go new file mode 100644 index 0000000000..1465123b7b --- /dev/null +++ b/tikvrpc/interceptor.go @@ -0,0 +1,186 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikvrpc + +import ( + "context" + "sync/atomic" +) + +// Interceptor is used to decorate the RPC requests to TiKV. +// +// The definition of an interceptor is: Given an InterceptorFunc, we will +// get the decorated InterceptorFunc with additional logic before and after +// the execution of the given InterceptorFunc. +// +// The decorated InterceptorFunc will be executed before and after the real +// RPC request is initiated to TiKV. +// +// We can implement an Interceptor like this: +// ``` +// func LogInterceptor(next InterceptorFunc) InterceptorFunc { +// return func(target string, req *Request) (*Response, error) { +// log.Println("before") +// resp, err := next(target, req) +// log.Println("after") +// return resp, err +// } +// } +// txn.SetInterceptor(LogInterceptor) +// ``` +// +// Or you want to inject some dependent modules: +// ``` +// func GetLogInterceptor(lg *log.Logger) Interceptor { +// return func(next InterceptorFunc) InterceptorFunc { +// return func(target string, req *Request) (*Response, error) { +// lg.Println("before") +// resp, err := next(target, req) +// lg.Println("after") +// return resp, err +// } +// } +// } +// txn.SetInterceptor(GetLogInterceptor()) +// ``` +type Interceptor func(next InterceptorFunc) InterceptorFunc + +// InterceptorFunc is a callable function used to initiate a request to TiKV. +// It is mainly used as the parameter and return value of Interceptor. +type InterceptorFunc func(target string, req *Request) (*Response, error) + +// InterceptorChain is used to combine multiple Interceptors into one. +// Multiple interceptors will be executed in the order of link time, but are more +// similar to the onion model: The earlier the interceptor is executed, the later +// it will return. +// +// We can use InterceptorChain like this: +// ``` +// func Interceptor1(next InterceptorFunc) InterceptorFunc { +// return func(target string, req *Request) (*Response, error) { +// fmt.Println("begin-interceptor-1") +// defer fmt.Println("end-interceptor-1") +// return next(target, req) +// } +// } +// func Interceptor2(next InterceptorFunc) InterceptorFunc { +// return func(target string, req *Request) (*Response, error) { +// fmt.Println("begin-interceptor-2") +// defer fmt.Println("end-interceptor-2") +// return next(target, req) +// } +// } +// txn.SetInterceptor(NewInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build()) +// ``` +// Then every time an RPC request is initiated, it will be printed in the following order: +// ``` +// begin-interceptor-1 +// begin-interceptor-2 +// /* do request & respond here */ +// end-interceptor-2 +// end-interceptor-1 +// ``` +type InterceptorChain struct { + chain []Interceptor +} + +// NewInterceptorChain creates an empty InterceptorChain. +func NewInterceptorChain() *InterceptorChain { + return &InterceptorChain{} +} + +// Link is used to link the next Interceptor. +// Multiple interceptors will be executed in the order of link time. +func (c *InterceptorChain) Link(it Interceptor) *InterceptorChain { + c.chain = append(c.chain, it) + return c +} + +// Build merges the previously linked interceptors into one. +func (c *InterceptorChain) Build() Interceptor { + return func(next InterceptorFunc) InterceptorFunc { + for n := len(c.chain) - 1; n >= 0; n-- { + next = c.chain[n](next) + } + return next + } +} + +type interceptorCtxKeyType struct{} + +var interceptorCtxKey = interceptorCtxKeyType{} + +// SetInterceptorIntoCtx is a helper function used to write Interceptor into ctx. +// Different from the behavior of calling context.WithValue() directly, calling +// SetInterceptorIntoCtx multiple times will not bind multiple Interceptors, but +// will replace the original value each time. +// Be careful not to forget to use the returned ctx. +func SetInterceptorIntoCtx(ctx context.Context, interceptor Interceptor) context.Context { + if v := ctx.Value(interceptorCtxKey); v != nil { + v.(*atomic.Value).Store(interceptor) + return ctx + } + v := new(atomic.Value) + v.Store(interceptor) + return context.WithValue(ctx, interceptorCtxKey, v) +} + +// GetInterceptorFromCtx gets the Interceptor bound by the previous call to SetInterceptorIntoCtx, +// and returns nil if there is none. +func GetInterceptorFromCtx(ctx context.Context) Interceptor { + if v := ctx.Value(interceptorCtxKey); v != nil { + v := v.(*atomic.Value).Load() + if interceptor, ok := v.(Interceptor); ok && interceptor != nil { + return interceptor + } + } + return nil +} + +/* Suite for testing */ + +// MockInterceptorManager can be used to create Interceptor and record the +// number of executions of the created Interceptor. +type MockInterceptorManager struct { + begin int32 + end int32 +} + +// CreateMockInterceptor creates an Interceptor for testing. +func (m *MockInterceptorManager) CreateMockInterceptor() Interceptor { + return func(next InterceptorFunc) InterceptorFunc { + return func(target string, req *Request) (*Response, error) { + atomic.AddInt32(&m.begin, 1) + defer atomic.AddInt32(&m.end, 1) + return next(target, req) + } + } +} + +// Reset clear all counters. +func (m *MockInterceptorManager) Reset() { + atomic.StoreInt32(&m.begin, 0) + atomic.StoreInt32(&m.end, 0) +} + +// BeginCount gets how many times the previously created Interceptor has been executed. +func (m *MockInterceptorManager) BeginCount() int { + return int(atomic.LoadInt32(&m.begin)) +} + +// EndCount gets how many times the previously created Interceptor has been returned. +func (m *MockInterceptorManager) EndCount() int { + return int(atomic.LoadInt32(&m.end)) +} diff --git a/tikvrpc/interceptor_test.go b/tikvrpc/interceptor_test.go new file mode 100644 index 0000000000..ed073996fa --- /dev/null +++ b/tikvrpc/interceptor_test.go @@ -0,0 +1,59 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikvrpc + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInterceptorChain(t *testing.T) { + chain := NewInterceptorChain() + manager := MockInterceptorManager{} + it := chain. + Link(manager.CreateMockInterceptor()). + Link(manager.CreateMockInterceptor()). + Build() + _, _ = it(func(target string, req *Request) (*Response, error) { + return nil, nil + })("", nil) + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) +} + +func TestGetAndSetInterceptorCtx(t *testing.T) { + ctx := context.Background() + assert.Nil(t, GetInterceptorFromCtx(ctx)) + var it1 Interceptor = func(next InterceptorFunc) InterceptorFunc { + return next + } + ctx = SetInterceptorIntoCtx(ctx, it1) + it2 := GetInterceptorFromCtx(ctx) + assert.Equal(t, funcKey(it1), funcKey(it2)) + var it3 Interceptor = func(next InterceptorFunc) InterceptorFunc { + return next + } + assert.NotEqual(t, funcKey(it1), funcKey(it3)) + ctx = SetInterceptorIntoCtx(ctx, it3) + it4 := GetInterceptorFromCtx(ctx) + assert.Equal(t, funcKey(it3), funcKey(it4)) +} + +func funcKey(v interface{}) string { + return fmt.Sprintf("%v", v) +} diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 35e2fc3f6d..50e281ad92 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -112,6 +112,8 @@ type KVTxn struct { resourceGroupTag []byte resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil diskFullOpt kvrpcpb.DiskFullOpt + // interceptor is used to decorate the RPC request logic related to the txn. + interceptor tikvrpc.Interceptor } // NewTiKVTxn creates a new KVTxn. @@ -242,6 +244,13 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { txn.GetSnapshot().SetResourceGroupTagger(tagger) } +// SetInterceptor sets tikvrpc.Interceptor for the transaction and its related snapshot. +// tikvrpc.Interceptor will be executed before each RPC request is initiated. +func (txn *KVTxn) SetInterceptor(interceptor tikvrpc.Interceptor) { + txn.interceptor = interceptor + txn.GetSnapshot().SetInterceptor(interceptor) +} + // SetSchemaAmender sets an amender to update mutations after schema change. func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { txn.schemaAmender = sa @@ -343,6 +352,13 @@ func (txn *KVTxn) Commit(ctx context.Context) error { sessionID = val.(uint64) } + if txn.interceptor != nil { + // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + ctx = tikvrpc.SetInterceptorIntoCtx(ctx, txn.interceptor) + } + var err error // If the txn use pessimistic lock, committer is initialized. committer := txn.committer @@ -450,6 +466,12 @@ func (txn *KVTxn) rollbackPessimisticLocks() error { return nil } bo := retry.NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) + if txn.interceptor != nil { + // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), txn.interceptor)) + } keys := txn.collectLockedKeys() return txn.committer.pessimisticRollbackMutations(bo, &PlainMutations{keys: keys}) } @@ -526,6 +548,12 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, // LockKeys tries to lock the entries with the keys in KV store. // lockCtx is the context for lock, lockCtx.lockWaitTime in ms func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error { + if txn.interceptor != nil { + // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + ctx = tikvrpc.SetInterceptorIntoCtx(ctx, txn.interceptor) + } // Exclude keys that are already locked. var err error keys := make([][]byte, 0, len(keysInput)) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 354ca81956..6d437fc478 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -118,6 +118,12 @@ func (s *Scanner) Next() error { if !s.valid { return errors.New("scanner iterator is invalid") } + if s.snapshot.interceptor != nil { + // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.snapshot.interceptor)) + } var err error for { s.idx++ diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index ef5b684217..4045738945 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -136,6 +136,8 @@ type KVSnapshot struct { resourceGroupTag []byte // resourceGroupTagger is use to set the kv request resource group tag if resourceGroupTag is nil. resourceGroupTagger tikvrpc.ResourceGroupTagger + // interceptor is used to decorate the RPC request logic related to the snapshot. + interceptor tikvrpc.Interceptor } // NewTiKVSnapshot creates a snapshot of an TiKV store. @@ -203,6 +205,13 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) + if s.interceptor != nil { + // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + ctx = tikvrpc.SetInterceptorIntoCtx(ctx, s.interceptor) + } + // Create a map to collect key-values from region servers. var mu sync.Mutex err := s.batchGetKeysByRegions(bo, keys, func(k, v []byte) { @@ -468,6 +477,14 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) + + if s.interceptor != nil { + // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.interceptor)) + } + val, err := s.get(ctx, bo, k) s.recordBackoffInfo(bo) if err != nil { @@ -734,6 +751,12 @@ func (s *KVSnapshot) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) s.resourceGroupTagger = tagger } +// SetInterceptor sets tikvrpc.Interceptor for the snapshot. +// tikvrpc.Interceptor will be executed before each RPC request is initiated. +func (s *KVSnapshot) SetInterceptor(interceptor tikvrpc.Interceptor) { + s.interceptor = interceptor +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) From 179f017199a1bdd356b0c14ce171897880713782 Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 3 Dec 2021 17:14:02 +0800 Subject: [PATCH 02/16] Rename comment Signed-off-by: mornyx --- txnkv/txnsnapshot/snapshot.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 4045738945..37fbc6f66f 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -206,7 +206,7 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) if s.interceptor != nil { - // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. ctx = tikvrpc.SetInterceptorIntoCtx(ctx, s.interceptor) @@ -479,7 +479,7 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) if s.interceptor != nil { - // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.interceptor)) From 66444fdde259f689b08e854bd217733c7cda27f1 Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 3 Dec 2021 17:23:16 +0800 Subject: [PATCH 03/16] Modify comments Signed-off-by: mornyx --- tikvrpc/interceptor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tikvrpc/interceptor.go b/tikvrpc/interceptor.go index 1465123b7b..a6c2c79e23 100644 --- a/tikvrpc/interceptor.go +++ b/tikvrpc/interceptor.go @@ -84,7 +84,8 @@ type InterceptorFunc func(target string, req *Request) (*Response, error) // } // txn.SetInterceptor(NewInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build()) // ``` -// Then every time an RPC request is initiated, it will be printed in the following order: +// +// Then every time an RPC request is initiated, the following text will be printed: // ``` // begin-interceptor-1 // begin-interceptor-2 @@ -122,7 +123,7 @@ type interceptorCtxKeyType struct{} var interceptorCtxKey = interceptorCtxKeyType{} -// SetInterceptorIntoCtx is a helper function used to write Interceptor into ctx. +// SetInterceptorIntoCtx is a helper function used to bind Interceptor into ctx. // Different from the behavior of calling context.WithValue() directly, calling // SetInterceptorIntoCtx multiple times will not bind multiple Interceptors, but // will replace the original value each time. From b750e28fabd6ad0813e8a3b4ac63ef67cfeae955 Mon Sep 17 00:00:00 2001 From: mornyx Date: Tue, 7 Dec 2021 14:03:05 +0800 Subject: [PATCH 04/16] Fix ineffectual assignment Signed-off-by: mornyx --- txnkv/txnsnapshot/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 37fbc6f66f..da812b9884 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -209,7 +209,7 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - ctx = tikvrpc.SetInterceptorIntoCtx(ctx, s.interceptor) + bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.interceptor)) } // Create a map to collect key-values from region servers. From a8d82c1ce1ab47caba459449e61b83f28d8d9304 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 9 Dec 2021 17:59:52 +0800 Subject: [PATCH 05/16] add AddInterceptor for KVTxn and KVSnapshot Signed-off-by: mornyx --- txnkv/transaction/txn.go | 11 +++++++++++ txnkv/txnsnapshot/snapshot.go | 10 ++++++++++ 2 files changed, 21 insertions(+) diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 50e281ad92..e2c85b0029 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -246,11 +246,22 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { // SetInterceptor sets tikvrpc.Interceptor for the transaction and its related snapshot. // tikvrpc.Interceptor will be executed before each RPC request is initiated. +// Note that SetInterceptor will replace the previously set interceptor. func (txn *KVTxn) SetInterceptor(interceptor tikvrpc.Interceptor) { txn.interceptor = interceptor txn.GetSnapshot().SetInterceptor(interceptor) } +// AddInterceptor adds an interceptor, the order of addition is the order of execution. +func (txn *KVTxn) AddInterceptor(interceptor tikvrpc.Interceptor) { + if txn.interceptor == nil { + txn.SetInterceptor(interceptor) + return + } + txn.interceptor = tikvrpc.NewInterceptorChain().Link(txn.interceptor).Link(interceptor).Build() + txn.GetSnapshot().AddInterceptor(interceptor) +} + // SetSchemaAmender sets an amender to update mutations after schema change. func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { txn.schemaAmender = sa diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 0a3c772124..41e0b45aa8 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -753,10 +753,20 @@ func (s *KVSnapshot) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) // SetInterceptor sets tikvrpc.Interceptor for the snapshot. // tikvrpc.Interceptor will be executed before each RPC request is initiated. +// Note that SetInterceptor will replace the previously set interceptor. func (s *KVSnapshot) SetInterceptor(interceptor tikvrpc.Interceptor) { s.interceptor = interceptor } +// AddInterceptor adds an interceptor, the order of addition is the order of execution. +func (s *KVSnapshot) AddInterceptor(interceptor tikvrpc.Interceptor) { + if s.interceptor == nil { + s.SetInterceptor(interceptor) + return + } + s.interceptor = tikvrpc.NewInterceptorChain().Link(s.interceptor).Link(interceptor).Build() +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) From e4a03694c56cb0765bb110014c0f7310e9150742 Mon Sep 17 00:00:00 2001 From: mornyx Date: Tue, 14 Dec 2021 16:23:07 +0800 Subject: [PATCH 06/16] Separate intercepor package Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 11 +- internal/client/client.go | 5 +- tikvrpc/interceptor.go | 187 ------------------ tikvrpc/interceptor/interceptor.go | 185 +++++++++++++++++ tikvrpc/{ => interceptor}/interceptor_test.go | 33 +--- tikvrpc/interceptor/main_test.go | 25 +++ txnkv/transaction/txn.go | 37 ++-- txnkv/txnsnapshot/scan.go | 5 +- txnkv/txnsnapshot/snapshot.go | 29 +-- 9 files changed, 261 insertions(+), 256 deletions(-) delete mode 100644 tikvrpc/interceptor.go create mode 100644 tikvrpc/interceptor/interceptor.go rename tikvrpc/{ => interceptor}/interceptor_test.go (52%) create mode 100644 tikvrpc/interceptor/main_test.go diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go index 1ac5e70077..d48bd2a0c8 100644 --- a/integration_tests/interceptor_test.go +++ b/integration_tests/interceptor_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" ) func TestInterceptor(t *testing.T) { @@ -30,10 +31,10 @@ func TestInterceptor(t *testing.T) { assert.NoError(t, store.Close()) }() store.SetTiKVClient(&mockRPCClient{store.GetTiKVClient()}) - manager := tikvrpc.MockInterceptorManager{} + manager := interceptor.MockInterceptorManager{} txn, err := store.Begin() - txn.SetInterceptor(manager.CreateMockInterceptor()) + txn.SetRPCInterceptor(manager.CreateMockInterceptor()) assert.NoError(t, err) err = txn.Set([]byte("KEY-1"), []byte("VALUE-1")) assert.NoError(t, err) @@ -44,7 +45,7 @@ func TestInterceptor(t *testing.T) { manager.Reset() txn, err = store.Begin() - txn.SetInterceptor(manager.CreateMockInterceptor()) + txn.SetRPCInterceptor(manager.CreateMockInterceptor()) assert.NoError(t, err) value, err := txn.Get(context.Background(), []byte("KEY-1")) assert.NoError(t, err) @@ -59,8 +60,8 @@ type mockRPCClient struct { } func (c *mockRPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if interceptor := tikvrpc.GetInterceptorFromCtx(ctx); interceptor != nil { - return interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { return c.Client.SendRequest(ctx, addr, req, timeout) })(addr, req) } diff --git a/internal/client/client.go b/internal/client/client.go index d53d0ac8ae..d2ff5fa12f 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -59,6 +59,7 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/util" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -370,8 +371,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time. // SendRequest sends a Request to server and receives Response. // If tikvrpc.Interceptor has been set in ctx, it will be used to wrap RPC action. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if interceptor := tikvrpc.GetInterceptorFromCtx(ctx); interceptor != nil { - return interceptor(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { return c.sendRequest(ctx, target, req, timeout) })(addr, req) } diff --git a/tikvrpc/interceptor.go b/tikvrpc/interceptor.go deleted file mode 100644 index a6c2c79e23..0000000000 --- a/tikvrpc/interceptor.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2021 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikvrpc - -import ( - "context" - "sync/atomic" -) - -// Interceptor is used to decorate the RPC requests to TiKV. -// -// The definition of an interceptor is: Given an InterceptorFunc, we will -// get the decorated InterceptorFunc with additional logic before and after -// the execution of the given InterceptorFunc. -// -// The decorated InterceptorFunc will be executed before and after the real -// RPC request is initiated to TiKV. -// -// We can implement an Interceptor like this: -// ``` -// func LogInterceptor(next InterceptorFunc) InterceptorFunc { -// return func(target string, req *Request) (*Response, error) { -// log.Println("before") -// resp, err := next(target, req) -// log.Println("after") -// return resp, err -// } -// } -// txn.SetInterceptor(LogInterceptor) -// ``` -// -// Or you want to inject some dependent modules: -// ``` -// func GetLogInterceptor(lg *log.Logger) Interceptor { -// return func(next InterceptorFunc) InterceptorFunc { -// return func(target string, req *Request) (*Response, error) { -// lg.Println("before") -// resp, err := next(target, req) -// lg.Println("after") -// return resp, err -// } -// } -// } -// txn.SetInterceptor(GetLogInterceptor()) -// ``` -type Interceptor func(next InterceptorFunc) InterceptorFunc - -// InterceptorFunc is a callable function used to initiate a request to TiKV. -// It is mainly used as the parameter and return value of Interceptor. -type InterceptorFunc func(target string, req *Request) (*Response, error) - -// InterceptorChain is used to combine multiple Interceptors into one. -// Multiple interceptors will be executed in the order of link time, but are more -// similar to the onion model: The earlier the interceptor is executed, the later -// it will return. -// -// We can use InterceptorChain like this: -// ``` -// func Interceptor1(next InterceptorFunc) InterceptorFunc { -// return func(target string, req *Request) (*Response, error) { -// fmt.Println("begin-interceptor-1") -// defer fmt.Println("end-interceptor-1") -// return next(target, req) -// } -// } -// func Interceptor2(next InterceptorFunc) InterceptorFunc { -// return func(target string, req *Request) (*Response, error) { -// fmt.Println("begin-interceptor-2") -// defer fmt.Println("end-interceptor-2") -// return next(target, req) -// } -// } -// txn.SetInterceptor(NewInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build()) -// ``` -// -// Then every time an RPC request is initiated, the following text will be printed: -// ``` -// begin-interceptor-1 -// begin-interceptor-2 -// /* do request & respond here */ -// end-interceptor-2 -// end-interceptor-1 -// ``` -type InterceptorChain struct { - chain []Interceptor -} - -// NewInterceptorChain creates an empty InterceptorChain. -func NewInterceptorChain() *InterceptorChain { - return &InterceptorChain{} -} - -// Link is used to link the next Interceptor. -// Multiple interceptors will be executed in the order of link time. -func (c *InterceptorChain) Link(it Interceptor) *InterceptorChain { - c.chain = append(c.chain, it) - return c -} - -// Build merges the previously linked interceptors into one. -func (c *InterceptorChain) Build() Interceptor { - return func(next InterceptorFunc) InterceptorFunc { - for n := len(c.chain) - 1; n >= 0; n-- { - next = c.chain[n](next) - } - return next - } -} - -type interceptorCtxKeyType struct{} - -var interceptorCtxKey = interceptorCtxKeyType{} - -// SetInterceptorIntoCtx is a helper function used to bind Interceptor into ctx. -// Different from the behavior of calling context.WithValue() directly, calling -// SetInterceptorIntoCtx multiple times will not bind multiple Interceptors, but -// will replace the original value each time. -// Be careful not to forget to use the returned ctx. -func SetInterceptorIntoCtx(ctx context.Context, interceptor Interceptor) context.Context { - if v := ctx.Value(interceptorCtxKey); v != nil { - v.(*atomic.Value).Store(interceptor) - return ctx - } - v := new(atomic.Value) - v.Store(interceptor) - return context.WithValue(ctx, interceptorCtxKey, v) -} - -// GetInterceptorFromCtx gets the Interceptor bound by the previous call to SetInterceptorIntoCtx, -// and returns nil if there is none. -func GetInterceptorFromCtx(ctx context.Context) Interceptor { - if v := ctx.Value(interceptorCtxKey); v != nil { - v := v.(*atomic.Value).Load() - if interceptor, ok := v.(Interceptor); ok && interceptor != nil { - return interceptor - } - } - return nil -} - -/* Suite for testing */ - -// MockInterceptorManager can be used to create Interceptor and record the -// number of executions of the created Interceptor. -type MockInterceptorManager struct { - begin int32 - end int32 -} - -// CreateMockInterceptor creates an Interceptor for testing. -func (m *MockInterceptorManager) CreateMockInterceptor() Interceptor { - return func(next InterceptorFunc) InterceptorFunc { - return func(target string, req *Request) (*Response, error) { - atomic.AddInt32(&m.begin, 1) - defer atomic.AddInt32(&m.end, 1) - return next(target, req) - } - } -} - -// Reset clear all counters. -func (m *MockInterceptorManager) Reset() { - atomic.StoreInt32(&m.begin, 0) - atomic.StoreInt32(&m.end, 0) -} - -// BeginCount gets how many times the previously created Interceptor has been executed. -func (m *MockInterceptorManager) BeginCount() int { - return int(atomic.LoadInt32(&m.begin)) -} - -// EndCount gets how many times the previously created Interceptor has been returned. -func (m *MockInterceptorManager) EndCount() int { - return int(atomic.LoadInt32(&m.end)) -} diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go new file mode 100644 index 0000000000..2c8adcf9ce --- /dev/null +++ b/tikvrpc/interceptor/interceptor.go @@ -0,0 +1,185 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interceptor + +import ( + "context" + "sync/atomic" + + "github.com/tikv/client-go/v2/tikvrpc" +) + +// RPCInterceptor is used to decorate the RPC requests to TiKV. +// +// The definition of an interceptor is: Given an RPCInterceptorFunc, we will +// get the decorated RPCInterceptorFunc with additional logic before and after +// the execution of the given RPCInterceptorFunc. +// +// The decorated RPCInterceptorFunc will be executed before and after the real +// RPC request is initiated to TiKV. +// +// We can implement an RPCInterceptor like this: +// ``` +// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// log.Println("before") +// resp, err := next(target, req) +// log.Println("after") +// return resp, err +// } +// } +// txn.SetRPCInterceptor(LogInterceptor) +// ``` +// +// Or you want to inject some dependent modules: +// ``` +// func GetLogInterceptor(lg *log.Logger) RPCInterceptor { +// return func(next RPCInterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// lg.Println("before") +// resp, err := next(target, req) +// lg.Println("after") +// return resp, err +// } +// } +// } +// txn.SetRPCInterceptor(GetLogInterceptor()) +// ``` +type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc + +// RPCInterceptorFunc is a callable function used to initiate a request to TiKV. +// It is mainly used as the parameter and return value of RPCInterceptor. +type RPCInterceptorFunc func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) + +// RPCInterceptorChain is used to combine multiple interceptors into one. +// Multiple interceptors will be executed in the order of link time, but are more +// similar to the onion model: The earlier the interceptor is executed, the later +// it will return. +// +// We can use RPCInterceptorChain like this: +// ``` +// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// fmt.Println("begin-interceptor-1") +// defer fmt.Println("end-interceptor-1") +// return next(target, req) +// } +// } +// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// fmt.Println("begin-interceptor-2") +// defer fmt.Println("end-interceptor-2") +// return next(target, req) +// } +// } +// txn.SetRPCInterceptor(NewRPCInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build()) +// ``` +// +// Then every time an RPC request is initiated, the following text will be printed: +// ``` +// begin-interceptor-1 +// begin-interceptor-2 +// /* do request & respond here */ +// end-interceptor-2 +// end-interceptor-1 +// ``` +type RPCInterceptorChain struct { + chain []RPCInterceptor +} + +// NewRPCInterceptorChain creates an empty RPCInterceptorChain. +func NewRPCInterceptorChain() *RPCInterceptorChain { + return &RPCInterceptorChain{} +} + +// Link is used to link the next RPCInterceptor. +// Multiple interceptors will be executed in the order of link time. +func (c *RPCInterceptorChain) Link(it RPCInterceptor) *RPCInterceptorChain { + c.chain = append(c.chain, it) + return c +} + +// Build merges the previously linked interceptors into one. +func (c *RPCInterceptorChain) Build() RPCInterceptor { + return func(next RPCInterceptorFunc) RPCInterceptorFunc { + for n := len(c.chain) - 1; n >= 0; n-- { + next = c.chain[n](next) + } + return next + } +} + +// ChainRPCInterceptors chains multiple RPCInterceptors into one. +func ChainRPCInterceptors(its ...RPCInterceptor) RPCInterceptor { + chain := NewRPCInterceptorChain() + for _, it := range its { + chain.Link(it) + } + return chain.Build() +} + +type interceptorCtxKeyType struct{} + +var interceptorCtxKey = interceptorCtxKeyType{} + +// WithRPCInterceptor is a helper function used to bind RPCInterceptor with ctx. +func WithRPCInterceptor(ctx context.Context, interceptor RPCInterceptor) context.Context { + return context.WithValue(ctx, interceptorCtxKey, interceptor) +} + +// GetRPCInterceptorFromCtx gets the RPCInterceptor bound by the previous call +// to WithRPCInterceptor, and returns nil if there is none. +func GetRPCInterceptorFromCtx(ctx context.Context) RPCInterceptor { + if v := ctx.Value(interceptorCtxKey); v != nil { + return v.(RPCInterceptor) + } + return nil +} + +/* Suite for testing */ + +// MockInterceptorManager can be used to create Interceptor and record the +// number of executions of the created Interceptor. +type MockInterceptorManager struct { + begin int32 + end int32 +} + +// CreateMockInterceptor creates an RPCInterceptor for testing. +func (m *MockInterceptorManager) CreateMockInterceptor() RPCInterceptor { + return func(next RPCInterceptorFunc) RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + atomic.AddInt32(&m.begin, 1) + defer atomic.AddInt32(&m.end, 1) + return next(target, req) + } + } +} + +// Reset clear all counters. +func (m *MockInterceptorManager) Reset() { + atomic.StoreInt32(&m.begin, 0) + atomic.StoreInt32(&m.end, 0) +} + +// BeginCount gets how many times the previously created Interceptor has been executed. +func (m *MockInterceptorManager) BeginCount() int { + return int(atomic.LoadInt32(&m.begin)) +} + +// EndCount gets how many times the previously created Interceptor has been returned. +func (m *MockInterceptorManager) EndCount() int { + return int(atomic.LoadInt32(&m.end)) +} diff --git a/tikvrpc/interceptor_test.go b/tikvrpc/interceptor/interceptor_test.go similarity index 52% rename from tikvrpc/interceptor_test.go rename to tikvrpc/interceptor/interceptor_test.go index ed073996fa..4aa5806fbb 100644 --- a/tikvrpc/interceptor_test.go +++ b/tikvrpc/interceptor/interceptor_test.go @@ -12,48 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikvrpc +package interceptor import ( - "context" - "fmt" "testing" "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" ) -func TestInterceptorChain(t *testing.T) { - chain := NewInterceptorChain() +func TestInterceptor(t *testing.T) { + chain := NewRPCInterceptorChain() manager := MockInterceptorManager{} it := chain. Link(manager.CreateMockInterceptor()). Link(manager.CreateMockInterceptor()). Build() - _, _ = it(func(target string, req *Request) (*Response, error) { + _, _ = it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { return nil, nil })("", nil) assert.Equal(t, 2, manager.BeginCount()) assert.Equal(t, 2, manager.EndCount()) } - -func TestGetAndSetInterceptorCtx(t *testing.T) { - ctx := context.Background() - assert.Nil(t, GetInterceptorFromCtx(ctx)) - var it1 Interceptor = func(next InterceptorFunc) InterceptorFunc { - return next - } - ctx = SetInterceptorIntoCtx(ctx, it1) - it2 := GetInterceptorFromCtx(ctx) - assert.Equal(t, funcKey(it1), funcKey(it2)) - var it3 Interceptor = func(next InterceptorFunc) InterceptorFunc { - return next - } - assert.NotEqual(t, funcKey(it1), funcKey(it3)) - ctx = SetInterceptorIntoCtx(ctx, it3) - it4 := GetInterceptorFromCtx(ctx) - assert.Equal(t, funcKey(it3), funcKey(it4)) -} - -func funcKey(v interface{}) string { - return fmt.Sprintf("%v", v) -} diff --git a/tikvrpc/interceptor/main_test.go b/tikvrpc/interceptor/main_test.go new file mode 100644 index 0000000000..1641c7d9a9 --- /dev/null +++ b/tikvrpc/interceptor/main_test.go @@ -0,0 +1,25 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interceptor + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index e2c85b0029..888fb72157 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -59,6 +59,7 @@ import ( tikv "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" @@ -113,7 +114,7 @@ type KVTxn struct { resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil diskFullOpt kvrpcpb.DiskFullOpt // interceptor is used to decorate the RPC request logic related to the txn. - interceptor tikvrpc.Interceptor + interceptor interceptor.RPCInterceptor } // NewTiKVTxn creates a new KVTxn. @@ -244,22 +245,22 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { txn.GetSnapshot().SetResourceGroupTagger(tagger) } -// SetInterceptor sets tikvrpc.Interceptor for the transaction and its related snapshot. -// tikvrpc.Interceptor will be executed before each RPC request is initiated. -// Note that SetInterceptor will replace the previously set interceptor. -func (txn *KVTxn) SetInterceptor(interceptor tikvrpc.Interceptor) { - txn.interceptor = interceptor - txn.GetSnapshot().SetInterceptor(interceptor) +// SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot. +// interceptor.RPCInterceptor will be executed before each RPC request is initiated. +// Note that SetRPCInterceptor will replace the previously set interceptor. +func (txn *KVTxn) SetRPCInterceptor(it interceptor.RPCInterceptor) { + txn.interceptor = it + txn.GetSnapshot().SetRPCInterceptor(it) } -// AddInterceptor adds an interceptor, the order of addition is the order of execution. -func (txn *KVTxn) AddInterceptor(interceptor tikvrpc.Interceptor) { +// AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. +func (txn *KVTxn) AddRPCInterceptor(it interceptor.RPCInterceptor) { if txn.interceptor == nil { - txn.SetInterceptor(interceptor) + txn.SetRPCInterceptor(it) return } - txn.interceptor = tikvrpc.NewInterceptorChain().Link(txn.interceptor).Link(interceptor).Build() - txn.GetSnapshot().AddInterceptor(interceptor) + txn.interceptor = interceptor.ChainRPCInterceptors(txn.interceptor, it) + txn.GetSnapshot().AddRPCInterceptor(it) } // SetSchemaAmender sets an amender to update mutations after schema change. @@ -364,10 +365,10 @@ func (txn *KVTxn) Commit(ctx context.Context) error { } if txn.interceptor != nil { - // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - ctx = tikvrpc.SetInterceptorIntoCtx(ctx, txn.interceptor) + ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor) } var err error @@ -478,10 +479,10 @@ func (txn *KVTxn) rollbackPessimisticLocks() error { } bo := retry.NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) if txn.interceptor != nil { - // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), txn.interceptor)) + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), txn.interceptor)) } keys := txn.collectLockedKeys() return txn.committer.pessimisticRollbackMutations(bo, &PlainMutations{keys: keys}) @@ -560,10 +561,10 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, // lockCtx is the context for lock, lockCtx.lockWaitTime in ms func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error { if txn.interceptor != nil { - // User has called txn.SetInterceptor() to explicitly set an interceptor, we + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - ctx = tikvrpc.SetInterceptorIntoCtx(ctx, txn.interceptor) + ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor) } // Exclude keys that are already locked. var err error diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index bce66fcf24..a73f7635f4 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -47,6 +47,7 @@ import ( "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnlock" "go.uber.org/zap" ) @@ -119,10 +120,10 @@ func (s *Scanner) Next() error { return errors.New("scanner iterator is invalid") } if s.snapshot.interceptor != nil { - // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.snapshot.interceptor)) + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.snapshot.interceptor)) } var err error for { diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 41e0b45aa8..84401c3f5e 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -57,6 +57,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" @@ -137,7 +138,7 @@ type KVSnapshot struct { // resourceGroupTagger is use to set the kv request resource group tag if resourceGroupTag is nil. resourceGroupTagger tikvrpc.ResourceGroupTagger // interceptor is used to decorate the RPC request logic related to the snapshot. - interceptor tikvrpc.Interceptor + interceptor interceptor.RPCInterceptor } // NewTiKVSnapshot creates a snapshot of an TiKV store. @@ -206,10 +207,10 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) if s.interceptor != nil { - // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.interceptor)) + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor)) } // Create a map to collect key-values from region servers. @@ -479,10 +480,10 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) if s.interceptor != nil { - // User has called snapshot.SetInterceptor() to explicitly set an interceptor, we + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute // it before initiating an RPC request. - bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.interceptor)) + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor)) } val, err := s.get(ctx, bo, k) @@ -751,20 +752,20 @@ func (s *KVSnapshot) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) s.resourceGroupTagger = tagger } -// SetInterceptor sets tikvrpc.Interceptor for the snapshot. -// tikvrpc.Interceptor will be executed before each RPC request is initiated. -// Note that SetInterceptor will replace the previously set interceptor. -func (s *KVSnapshot) SetInterceptor(interceptor tikvrpc.Interceptor) { - s.interceptor = interceptor +// SetRPCInterceptor sets interceptor.RPCInterceptor for the snapshot. +// interceptor.RPCInterceptor will be executed before each RPC request is initiated. +// Note that SetRPCInterceptor will replace the previously set interceptor. +func (s *KVSnapshot) SetRPCInterceptor(it interceptor.RPCInterceptor) { + s.interceptor = it } -// AddInterceptor adds an interceptor, the order of addition is the order of execution. -func (s *KVSnapshot) AddInterceptor(interceptor tikvrpc.Interceptor) { +// AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. +func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) { if s.interceptor == nil { - s.SetInterceptor(interceptor) + s.SetRPCInterceptor(it) return } - s.interceptor = tikvrpc.NewInterceptorChain().Link(s.interceptor).Link(interceptor).Build() + s.interceptor = interceptor.ChainRPCInterceptors(s.interceptor, it) } // SnapCacheHitCount gets the snapshot cache hit count. Only for test. From 3d3b93bd94b72242088f1114edbaadeb31722bc1 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 11:25:40 +0800 Subject: [PATCH 07/16] Add comments Signed-off-by: mornyx --- tikvrpc/interceptor/interceptor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index 2c8adcf9ce..f5cc64dfd4 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -122,6 +122,8 @@ func (c *RPCInterceptorChain) Build() RPCInterceptor { } // ChainRPCInterceptors chains multiple RPCInterceptors into one. +// Multiple RPCInterceptors will be executed in the order of their parameters. +// See RPCInterceptorChain for more information. func ChainRPCInterceptors(its ...RPCInterceptor) RPCInterceptor { chain := NewRPCInterceptorChain() for _, it := range its { From 7271bdaaa3cf1ce2bfde5e5bb80b6a25b12bbc90 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 14:14:42 +0800 Subject: [PATCH 08/16] Add order test Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 4 ++-- tikvrpc/interceptor/interceptor.go | 19 ++++++++++++++++--- tikvrpc/interceptor/interceptor_test.go | 8 ++++++-- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go index d48bd2a0c8..1fdb4c53b8 100644 --- a/integration_tests/interceptor_test.go +++ b/integration_tests/interceptor_test.go @@ -34,7 +34,7 @@ func TestInterceptor(t *testing.T) { manager := interceptor.MockInterceptorManager{} txn, err := store.Begin() - txn.SetRPCInterceptor(manager.CreateMockInterceptor()) + txn.SetRPCInterceptor(manager.CreateMockInterceptor("INTERCEPTOR-1")) assert.NoError(t, err) err = txn.Set([]byte("KEY-1"), []byte("VALUE-1")) assert.NoError(t, err) @@ -45,7 +45,7 @@ func TestInterceptor(t *testing.T) { manager.Reset() txn, err = store.Begin() - txn.SetRPCInterceptor(manager.CreateMockInterceptor()) + txn.SetRPCInterceptor(manager.CreateMockInterceptor("INTERCEPTOR-2")) assert.NoError(t, err) value, err := txn.Get(context.Background(), []byte("KEY-1")) assert.NoError(t, err) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index f5cc64dfd4..b46f86e512 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -155,14 +155,21 @@ func GetRPCInterceptorFromCtx(ctx context.Context) RPCInterceptor { // MockInterceptorManager can be used to create Interceptor and record the // number of executions of the created Interceptor. type MockInterceptorManager struct { - begin int32 - end int32 + begin int32 + end int32 + execLog []string // interceptor name list +} + +// NewMockInterceptorManager creates an empty MockInterceptorManager. +func NewMockInterceptorManager() *MockInterceptorManager { + return &MockInterceptorManager{execLog: []string{}} } // CreateMockInterceptor creates an RPCInterceptor for testing. -func (m *MockInterceptorManager) CreateMockInterceptor() RPCInterceptor { +func (m *MockInterceptorManager) CreateMockInterceptor(name string) RPCInterceptor { return func(next RPCInterceptorFunc) RPCInterceptorFunc { return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + m.execLog = append(m.execLog, name) atomic.AddInt32(&m.begin, 1) defer atomic.AddInt32(&m.end, 1) return next(target, req) @@ -174,6 +181,7 @@ func (m *MockInterceptorManager) CreateMockInterceptor() RPCInterceptor { func (m *MockInterceptorManager) Reset() { atomic.StoreInt32(&m.begin, 0) atomic.StoreInt32(&m.end, 0) + m.execLog = []string{} } // BeginCount gets how many times the previously created Interceptor has been executed. @@ -185,3 +193,8 @@ func (m *MockInterceptorManager) BeginCount() int { func (m *MockInterceptorManager) EndCount() int { return int(atomic.LoadInt32(&m.end)) } + +// ExecLog gets execution log of all interceptors. +func (m *MockInterceptorManager) ExecLog() []string { + return m.execLog +} diff --git a/tikvrpc/interceptor/interceptor_test.go b/tikvrpc/interceptor/interceptor_test.go index 4aa5806fbb..c54ea7117e 100644 --- a/tikvrpc/interceptor/interceptor_test.go +++ b/tikvrpc/interceptor/interceptor_test.go @@ -25,12 +25,16 @@ func TestInterceptor(t *testing.T) { chain := NewRPCInterceptorChain() manager := MockInterceptorManager{} it := chain. - Link(manager.CreateMockInterceptor()). - Link(manager.CreateMockInterceptor()). + Link(manager.CreateMockInterceptor("INTERCEPTOR-1")). + Link(manager.CreateMockInterceptor("INTERCEPTOR-2")). Build() _, _ = it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { return nil, nil })("", nil) assert.Equal(t, 2, manager.BeginCount()) assert.Equal(t, 2, manager.EndCount()) + execLog := manager.ExecLog() + assert.Len(t, execLog, 2) + assert.Equal(t, "INTERCEPTOR-1", execLog[0]) + assert.Equal(t, "INTERCEPTOR-2", execLog[1]) } From 47897209eab011e40721dff9eea1f44130497510 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 15:41:16 +0800 Subject: [PATCH 09/16] Fix integration tests Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go index 1fdb4c53b8..12229b616d 100644 --- a/integration_tests/interceptor_test.go +++ b/integration_tests/interceptor_test.go @@ -17,11 +17,8 @@ package tikv_test import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" ) @@ -30,7 +27,6 @@ func TestInterceptor(t *testing.T) { defer func() { assert.NoError(t, store.Close()) }() - store.SetTiKVClient(&mockRPCClient{store.GetTiKVClient()}) manager := interceptor.MockInterceptorManager{} txn, err := store.Begin() @@ -54,16 +50,3 @@ func TestInterceptor(t *testing.T) { assert.Equal(t, 1, manager.EndCount()) manager.Reset() } - -type mockRPCClient struct { - tikv.Client -} - -func (c *mockRPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { - return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - return c.Client.SendRequest(ctx, addr, req, timeout) - })(addr, req) - } - return c.Client.SendRequest(ctx, addr, req, timeout) -} From cdaebf98fa7f34bcf638b7008f337fc81a2ed538 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 15:49:32 +0800 Subject: [PATCH 10/16] Fix integration tests Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go index 12229b616d..80e5dae5b0 100644 --- a/integration_tests/interceptor_test.go +++ b/integration_tests/interceptor_test.go @@ -36,8 +36,13 @@ func TestInterceptor(t *testing.T) { assert.NoError(t, err) err = txn.Commit(context.Background()) assert.NoError(t, err) - assert.Equal(t, 2, manager.BeginCount()) - assert.Equal(t, 2, manager.EndCount()) + if len(manager.ExecLog()) > 0 { + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 2) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[0]) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[1]) + } manager.Reset() txn, err = store.Begin() @@ -46,7 +51,11 @@ func TestInterceptor(t *testing.T) { value, err := txn.Get(context.Background(), []byte("KEY-1")) assert.NoError(t, err) assert.Equal(t, []byte("VALUE-1"), value) - assert.Equal(t, 1, manager.BeginCount()) - assert.Equal(t, 1, manager.EndCount()) + if len(manager.ExecLog()) > 0 { + assert.Equal(t, 1, manager.BeginCount()) + assert.Equal(t, 1, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 1) + assert.Equal(t, "INTERCEPTOR-2", manager.ExecLog()[0]) + } manager.Reset() } From a95a4efecfda902bb7995645b824c42a8d5564a7 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 16:18:54 +0800 Subject: [PATCH 11/16] Move interceptor call from client to client_collapse Signed-off-by: mornyx --- integration_tests/interceptor_test.go | 22 +++++++++------------- internal/client/client.go | 12 ------------ internal/client/client_collapse.go | 10 ++++++++++ 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go index 80e5dae5b0..77241b9a65 100644 --- a/integration_tests/interceptor_test.go +++ b/integration_tests/interceptor_test.go @@ -36,13 +36,11 @@ func TestInterceptor(t *testing.T) { assert.NoError(t, err) err = txn.Commit(context.Background()) assert.NoError(t, err) - if len(manager.ExecLog()) > 0 { - assert.Equal(t, 2, manager.BeginCount()) - assert.Equal(t, 2, manager.EndCount()) - assert.Len(t, manager.ExecLog(), 2) - assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[0]) - assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[1]) - } + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 2) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[0]) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[1]) manager.Reset() txn, err = store.Begin() @@ -51,11 +49,9 @@ func TestInterceptor(t *testing.T) { value, err := txn.Get(context.Background(), []byte("KEY-1")) assert.NoError(t, err) assert.Equal(t, []byte("VALUE-1"), value) - if len(manager.ExecLog()) > 0 { - assert.Equal(t, 1, manager.BeginCount()) - assert.Equal(t, 1, manager.EndCount()) - assert.Len(t, manager.ExecLog(), 1) - assert.Equal(t, "INTERCEPTOR-2", manager.ExecLog()[0]) - } + assert.Equal(t, 1, manager.BeginCount()) + assert.Equal(t, 1, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 1) + assert.Equal(t, "INTERCEPTOR-2", manager.ExecLog()[0]) manager.Reset() } diff --git a/internal/client/client.go b/internal/client/client.go index d2ff5fa12f..fb5eb2baea 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -59,7 +59,6 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/util" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -369,18 +368,7 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time. } // SendRequest sends a Request to server and receives Response. -// If tikvrpc.Interceptor has been set in ctx, it will be used to wrap RPC action. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { - return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - return c.sendRequest(ctx, target, req, timeout) - })(addr, req) - } - return c.sendRequest(ctx, addr, req, timeout) -} - -// sendRequest sends a Request to server and receives Response. -func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context())) defer span1.Finish() diff --git a/internal/client/client_collapse.go b/internal/client/client_collapse.go index 11ea8b5beb..219aac398a 100644 --- a/internal/client/client_collapse.go +++ b/internal/client/client_collapse.go @@ -42,6 +42,7 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "golang.org/x/sync/singleflight" ) @@ -65,6 +66,15 @@ func (r reqCollapse) Close() error { } func (r reqCollapse) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return r.sendRequest(ctx, target, req, timeout) + })(addr, req) + } + return r.sendRequest(ctx, addr, req, timeout) +} + +func (r reqCollapse) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if r.Client == nil { panic("client should not be nil") } From 6da64bfa0a1ef686102b925f6309ef415b354398 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 18:17:00 +0800 Subject: [PATCH 12/16] Add comments Signed-off-by: mornyx --- tikvrpc/interceptor/interceptor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index b46f86e512..c69ac3c5f2 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -56,6 +56,11 @@ import ( // } // } // txn.SetRPCInterceptor(GetLogInterceptor()) +// +// NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. +// This is because there may be some exceptions, such as: request collapsed, request batched, +// no valid connection etc. If you have questions about the execution location of RPCInterceptor, +// please refer to: internal/client/client_collapse.go#SendRequest. // ``` type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc From 7197af621f632818f7ce569acdef09bcd35d2177 Mon Sep 17 00:00:00 2001 From: mornyx Date: Thu, 16 Dec 2021 18:19:48 +0800 Subject: [PATCH 13/16] Fix comment Signed-off-by: mornyx --- tikvrpc/interceptor/interceptor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index c69ac3c5f2..9a210b1be3 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -56,12 +56,12 @@ import ( // } // } // txn.SetRPCInterceptor(GetLogInterceptor()) +// ``` // // NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. // This is because there may be some exceptions, such as: request collapsed, request batched, // no valid connection etc. If you have questions about the execution location of RPCInterceptor, // please refer to: internal/client/client_collapse.go#SendRequest. -// ``` type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc // RPCInterceptorFunc is a callable function used to initiate a request to TiKV. From 00112005a413b87700795d89e4ab01694e62ec8b Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 17 Dec 2021 17:26:41 +0800 Subject: [PATCH 14/16] Add client_interceptor Signed-off-by: mornyx --- internal/client/client_collapse.go | 10 ------- internal/client/client_interceptor.go | 43 +++++++++++++++++++++++++++ tikv/kv.go | 2 +- tikvrpc/interceptor/interceptor.go | 6 ++-- 4 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 internal/client/client_interceptor.go diff --git a/internal/client/client_collapse.go b/internal/client/client_collapse.go index 219aac398a..11ea8b5beb 100644 --- a/internal/client/client_collapse.go +++ b/internal/client/client_collapse.go @@ -42,7 +42,6 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "golang.org/x/sync/singleflight" ) @@ -66,15 +65,6 @@ func (r reqCollapse) Close() error { } func (r reqCollapse) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { - return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - return r.sendRequest(ctx, target, req, timeout) - })(addr, req) - } - return r.sendRequest(ctx, addr, req, timeout) -} - -func (r reqCollapse) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { if r.Client == nil { panic("client should not be nil") } diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go new file mode 100644 index 0000000000..36f2f1c758 --- /dev/null +++ b/internal/client/client_interceptor.go @@ -0,0 +1,43 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "time" + + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +var _ Client = interceptedClient{} + +type interceptedClient struct { + Client +} + +// NewInterceptedClient creates a Client which can execute interceptor. +func NewInterceptedClient(client Client) Client { + return interceptedClient{client} +} + +func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return r.Client.SendRequest(ctx, target, req, timeout) + })(addr, req) + } + return r.Client.SendRequest(ctx, addr, req, timeout) +} diff --git a/tikv/kv.go b/tikv/kv.go index 2cb3e9eaf0..caf84592f3 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -180,7 +180,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl ctx: ctx, cancel: cancel, } - store.clientMu.client = client.NewReqCollapse(tikvclient) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.lockResolver = txnlock.NewLockResolver(store) store.wg.Add(2) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index 9a210b1be3..23c5164bc8 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -59,9 +59,9 @@ import ( // ``` // // NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. -// This is because there may be some exceptions, such as: request collapsed, request batched, -// no valid connection etc. If you have questions about the execution location of RPCInterceptor, -// please refer to: internal/client/client_collapse.go#SendRequest. +// This is because there may be some exceptions, such as: request batched, no +// valid connection etc. If you have questions about the execution location of +// RPCInterceptor, please refer to: internal/client/client_collapse.go#SendRequest. type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc // RPCInterceptorFunc is a callable function used to initiate a request to TiKV. From 4430bf19c5e391fe6aa5a369d3898444ec9915d5 Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 17 Dec 2021 17:32:46 +0800 Subject: [PATCH 15/16] Modify comment Signed-off-by: mornyx --- tikvrpc/interceptor/interceptor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go index 23c5164bc8..30555d0ce7 100644 --- a/tikvrpc/interceptor/interceptor.go +++ b/tikvrpc/interceptor/interceptor.go @@ -61,7 +61,9 @@ import ( // NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. // This is because there may be some exceptions, such as: request batched, no // valid connection etc. If you have questions about the execution location of -// RPCInterceptor, please refer to: internal/client/client_collapse.go#SendRequest. +// RPCInterceptor, please refer to: +// tikv/kv.go#NewKVStore() +// internal/client/client_interceptor.go#SendRequest. type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc // RPCInterceptorFunc is a callable function used to initiate a request to TiKV. From 1315d9bb324ed8cb03b79f6732e5943f75d64515 Mon Sep 17 00:00:00 2001 From: mornyx Date: Fri, 17 Dec 2021 17:55:58 +0800 Subject: [PATCH 16/16] Add ut Signed-off-by: mornyx --- internal/client/client_interceptor_test.go | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 internal/client/client_interceptor_test.go diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go new file mode 100644 index 0000000000..08f1c05075 --- /dev/null +++ b/internal/client/client_interceptor_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +type emptyClient struct{} + +func (c emptyClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return nil, nil +} + +func (c emptyClient) Close() error { + return nil +} + +func TestInterceptedClient(t *testing.T) { + executed := false + client := NewInterceptedClient(emptyClient{}) + ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + executed = true + return next(target, req) + } + }) + _, _ = client.SendRequest(ctx, "", nil, 0) + assert.True(t, executed) +}