diff --git a/integration_tests/interceptor_test.go b/integration_tests/interceptor_test.go new file mode 100644 index 0000000000..77241b9a65 --- /dev/null +++ b/integration_tests/interceptor_test.go @@ -0,0 +1,57 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +func TestInterceptor(t *testing.T) { + store := NewTestStore(t) + defer func() { + assert.NoError(t, store.Close()) + }() + manager := interceptor.MockInterceptorManager{} + + txn, err := store.Begin() + txn.SetRPCInterceptor(manager.CreateMockInterceptor("INTERCEPTOR-1")) + assert.NoError(t, err) + err = txn.Set([]byte("KEY-1"), []byte("VALUE-1")) + assert.NoError(t, err) + err = txn.Commit(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 2) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[0]) + assert.Equal(t, "INTERCEPTOR-1", manager.ExecLog()[1]) + manager.Reset() + + txn, err = store.Begin() + txn.SetRPCInterceptor(manager.CreateMockInterceptor("INTERCEPTOR-2")) + assert.NoError(t, err) + value, err := txn.Get(context.Background(), []byte("KEY-1")) + assert.NoError(t, err) + assert.Equal(t, []byte("VALUE-1"), value) + assert.Equal(t, 1, manager.BeginCount()) + assert.Equal(t, 1, manager.EndCount()) + assert.Len(t, manager.ExecLog(), 1) + assert.Equal(t, "INTERCEPTOR-2", manager.ExecLog()[0]) + manager.Reset() +} diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go new file mode 100644 index 0000000000..36f2f1c758 --- /dev/null +++ b/internal/client/client_interceptor.go @@ -0,0 +1,43 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "time" + + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +var _ Client = interceptedClient{} + +type interceptedClient struct { + Client +} + +// NewInterceptedClient creates a Client which can execute interceptor. +func NewInterceptedClient(client Client) Client { + return interceptedClient{client} +} + +func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil { + return it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return r.Client.SendRequest(ctx, target, req, timeout) + })(addr, req) + } + return r.Client.SendRequest(ctx, addr, req, timeout) +} diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go new file mode 100644 index 0000000000..08f1c05075 --- /dev/null +++ b/internal/client/client_interceptor_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" +) + +type emptyClient struct{} + +func (c emptyClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + return nil, nil +} + +func (c emptyClient) Close() error { + return nil +} + +func TestInterceptedClient(t *testing.T) { + executed := false + client := NewInterceptedClient(emptyClient{}) + ctx := interceptor.WithRPCInterceptor(context.Background(), func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + executed = true + return next(target, req) + } + }) + _, _ = client.SendRequest(ctx, "", nil, 0) + assert.True(t, executed) +} diff --git a/tikv/kv.go b/tikv/kv.go index 2cb3e9eaf0..caf84592f3 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -180,7 +180,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl ctx: ctx, cancel: cancel, } - store.clientMu.client = client.NewReqCollapse(tikvclient) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.lockResolver = txnlock.NewLockResolver(store) store.wg.Add(2) diff --git a/tikvrpc/interceptor/interceptor.go b/tikvrpc/interceptor/interceptor.go new file mode 100644 index 0000000000..30555d0ce7 --- /dev/null +++ b/tikvrpc/interceptor/interceptor.go @@ -0,0 +1,207 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interceptor + +import ( + "context" + "sync/atomic" + + "github.com/tikv/client-go/v2/tikvrpc" +) + +// RPCInterceptor is used to decorate the RPC requests to TiKV. +// +// The definition of an interceptor is: Given an RPCInterceptorFunc, we will +// get the decorated RPCInterceptorFunc with additional logic before and after +// the execution of the given RPCInterceptorFunc. +// +// The decorated RPCInterceptorFunc will be executed before and after the real +// RPC request is initiated to TiKV. +// +// We can implement an RPCInterceptor like this: +// ``` +// func LogInterceptor(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// log.Println("before") +// resp, err := next(target, req) +// log.Println("after") +// return resp, err +// } +// } +// txn.SetRPCInterceptor(LogInterceptor) +// ``` +// +// Or you want to inject some dependent modules: +// ``` +// func GetLogInterceptor(lg *log.Logger) RPCInterceptor { +// return func(next RPCInterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// lg.Println("before") +// resp, err := next(target, req) +// lg.Println("after") +// return resp, err +// } +// } +// } +// txn.SetRPCInterceptor(GetLogInterceptor()) +// ``` +// +// NOTE: Interceptor calls may not correspond one-to-one with the underlying gRPC requests. +// This is because there may be some exceptions, such as: request batched, no +// valid connection etc. If you have questions about the execution location of +// RPCInterceptor, please refer to: +// tikv/kv.go#NewKVStore() +// internal/client/client_interceptor.go#SendRequest. +type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc + +// RPCInterceptorFunc is a callable function used to initiate a request to TiKV. +// It is mainly used as the parameter and return value of RPCInterceptor. +type RPCInterceptorFunc func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) + +// RPCInterceptorChain is used to combine multiple interceptors into one. +// Multiple interceptors will be executed in the order of link time, but are more +// similar to the onion model: The earlier the interceptor is executed, the later +// it will return. +// +// We can use RPCInterceptorChain like this: +// ``` +// func Interceptor1(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// fmt.Println("begin-interceptor-1") +// defer fmt.Println("end-interceptor-1") +// return next(target, req) +// } +// } +// func Interceptor2(next InterceptorFunc) RPCInterceptorFunc { +// return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +// fmt.Println("begin-interceptor-2") +// defer fmt.Println("end-interceptor-2") +// return next(target, req) +// } +// } +// txn.SetRPCInterceptor(NewRPCInterceptorChain().Link(Interceptor1).Link(Interceptor2).Build()) +// ``` +// +// Then every time an RPC request is initiated, the following text will be printed: +// ``` +// begin-interceptor-1 +// begin-interceptor-2 +// /* do request & respond here */ +// end-interceptor-2 +// end-interceptor-1 +// ``` +type RPCInterceptorChain struct { + chain []RPCInterceptor +} + +// NewRPCInterceptorChain creates an empty RPCInterceptorChain. +func NewRPCInterceptorChain() *RPCInterceptorChain { + return &RPCInterceptorChain{} +} + +// Link is used to link the next RPCInterceptor. +// Multiple interceptors will be executed in the order of link time. +func (c *RPCInterceptorChain) Link(it RPCInterceptor) *RPCInterceptorChain { + c.chain = append(c.chain, it) + return c +} + +// Build merges the previously linked interceptors into one. +func (c *RPCInterceptorChain) Build() RPCInterceptor { + return func(next RPCInterceptorFunc) RPCInterceptorFunc { + for n := len(c.chain) - 1; n >= 0; n-- { + next = c.chain[n](next) + } + return next + } +} + +// ChainRPCInterceptors chains multiple RPCInterceptors into one. +// Multiple RPCInterceptors will be executed in the order of their parameters. +// See RPCInterceptorChain for more information. +func ChainRPCInterceptors(its ...RPCInterceptor) RPCInterceptor { + chain := NewRPCInterceptorChain() + for _, it := range its { + chain.Link(it) + } + return chain.Build() +} + +type interceptorCtxKeyType struct{} + +var interceptorCtxKey = interceptorCtxKeyType{} + +// WithRPCInterceptor is a helper function used to bind RPCInterceptor with ctx. +func WithRPCInterceptor(ctx context.Context, interceptor RPCInterceptor) context.Context { + return context.WithValue(ctx, interceptorCtxKey, interceptor) +} + +// GetRPCInterceptorFromCtx gets the RPCInterceptor bound by the previous call +// to WithRPCInterceptor, and returns nil if there is none. +func GetRPCInterceptorFromCtx(ctx context.Context) RPCInterceptor { + if v := ctx.Value(interceptorCtxKey); v != nil { + return v.(RPCInterceptor) + } + return nil +} + +/* Suite for testing */ + +// MockInterceptorManager can be used to create Interceptor and record the +// number of executions of the created Interceptor. +type MockInterceptorManager struct { + begin int32 + end int32 + execLog []string // interceptor name list +} + +// NewMockInterceptorManager creates an empty MockInterceptorManager. +func NewMockInterceptorManager() *MockInterceptorManager { + return &MockInterceptorManager{execLog: []string{}} +} + +// CreateMockInterceptor creates an RPCInterceptor for testing. +func (m *MockInterceptorManager) CreateMockInterceptor(name string) RPCInterceptor { + return func(next RPCInterceptorFunc) RPCInterceptorFunc { + return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + m.execLog = append(m.execLog, name) + atomic.AddInt32(&m.begin, 1) + defer atomic.AddInt32(&m.end, 1) + return next(target, req) + } + } +} + +// Reset clear all counters. +func (m *MockInterceptorManager) Reset() { + atomic.StoreInt32(&m.begin, 0) + atomic.StoreInt32(&m.end, 0) + m.execLog = []string{} +} + +// BeginCount gets how many times the previously created Interceptor has been executed. +func (m *MockInterceptorManager) BeginCount() int { + return int(atomic.LoadInt32(&m.begin)) +} + +// EndCount gets how many times the previously created Interceptor has been returned. +func (m *MockInterceptorManager) EndCount() int { + return int(atomic.LoadInt32(&m.end)) +} + +// ExecLog gets execution log of all interceptors. +func (m *MockInterceptorManager) ExecLog() []string { + return m.execLog +} diff --git a/tikvrpc/interceptor/interceptor_test.go b/tikvrpc/interceptor/interceptor_test.go new file mode 100644 index 0000000000..c54ea7117e --- /dev/null +++ b/tikvrpc/interceptor/interceptor_test.go @@ -0,0 +1,40 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interceptor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tikv/client-go/v2/tikvrpc" +) + +func TestInterceptor(t *testing.T) { + chain := NewRPCInterceptorChain() + manager := MockInterceptorManager{} + it := chain. + Link(manager.CreateMockInterceptor("INTERCEPTOR-1")). + Link(manager.CreateMockInterceptor("INTERCEPTOR-2")). + Build() + _, _ = it(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return nil, nil + })("", nil) + assert.Equal(t, 2, manager.BeginCount()) + assert.Equal(t, 2, manager.EndCount()) + execLog := manager.ExecLog() + assert.Len(t, execLog, 2) + assert.Equal(t, "INTERCEPTOR-1", execLog[0]) + assert.Equal(t, "INTERCEPTOR-2", execLog[1]) +} diff --git a/tikvrpc/interceptor/main_test.go b/tikvrpc/interceptor/main_test.go new file mode 100644 index 0000000000..1641c7d9a9 --- /dev/null +++ b/tikvrpc/interceptor/main_test.go @@ -0,0 +1,25 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package interceptor + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 56cb7f293d..4ade6ecbce 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -59,6 +59,7 @@ import ( tikv "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" @@ -113,6 +114,8 @@ type KVTxn struct { resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil diskFullOpt kvrpcpb.DiskFullOpt commitTSUpperBoundCheck func(uint64) bool + // interceptor is used to decorate the RPC request logic related to the txn. + interceptor interceptor.RPCInterceptor } // NewTiKVTxn creates a new KVTxn. @@ -243,6 +246,24 @@ func (txn *KVTxn) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) { txn.GetSnapshot().SetResourceGroupTagger(tagger) } +// SetRPCInterceptor sets interceptor.RPCInterceptor for the transaction and its related snapshot. +// interceptor.RPCInterceptor will be executed before each RPC request is initiated. +// Note that SetRPCInterceptor will replace the previously set interceptor. +func (txn *KVTxn) SetRPCInterceptor(it interceptor.RPCInterceptor) { + txn.interceptor = it + txn.GetSnapshot().SetRPCInterceptor(it) +} + +// AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. +func (txn *KVTxn) AddRPCInterceptor(it interceptor.RPCInterceptor) { + if txn.interceptor == nil { + txn.SetRPCInterceptor(it) + return + } + txn.interceptor = interceptor.ChainRPCInterceptors(txn.interceptor, it) + txn.GetSnapshot().AddRPCInterceptor(it) +} + // SetSchemaAmender sets an amender to update mutations after schema change. func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { txn.schemaAmender = sa @@ -351,6 +372,13 @@ func (txn *KVTxn) Commit(ctx context.Context) error { sessionID = val.(uint64) } + if txn.interceptor != nil { + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor) + } + var err error // If the txn use pessimistic lock, committer is initialized. committer := txn.committer @@ -458,6 +486,12 @@ func (txn *KVTxn) rollbackPessimisticLocks() error { return nil } bo := retry.NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) + if txn.interceptor != nil { + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), txn.interceptor)) + } keys := txn.collectLockedKeys() return txn.committer.pessimisticRollbackMutations(bo, &PlainMutations{keys: keys}) } @@ -534,6 +568,12 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, // LockKeys tries to lock the entries with the keys in KV store. // lockCtx is the context for lock, lockCtx.lockWaitTime in ms func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error { + if txn.interceptor != nil { + // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor) + } // Exclude keys that are already locked. var err error keys := make([][]byte, 0, len(keysInput)) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index c8959b804f..a73f7635f4 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -47,6 +47,7 @@ import ( "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnlock" "go.uber.org/zap" ) @@ -118,6 +119,12 @@ func (s *Scanner) Next() error { if !s.valid { return errors.New("scanner iterator is invalid") } + if s.snapshot.interceptor != nil { + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.snapshot.interceptor)) + } var err error for { s.idx++ diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 8f7039bb51..84401c3f5e 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -57,6 +57,7 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" @@ -136,6 +137,8 @@ type KVSnapshot struct { resourceGroupTag []byte // resourceGroupTagger is use to set the kv request resource group tag if resourceGroupTag is nil. resourceGroupTagger tikvrpc.ResourceGroupTagger + // interceptor is used to decorate the RPC request logic related to the snapshot. + interceptor interceptor.RPCInterceptor } // NewTiKVSnapshot creates a snapshot of an TiKV store. @@ -203,6 +206,13 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) + if s.interceptor != nil { + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor)) + } + // Create a map to collect key-values from region servers. var mu sync.Mutex err := s.batchGetKeysByRegions(bo, keys, func(k, v []byte) { @@ -468,6 +478,14 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) + + if s.interceptor != nil { + // User has called snapshot.SetRPCInterceptor() to explicitly set an interceptor, we + // need to bind it to ctx so that the internal client can perceive and execute + // it before initiating an RPC request. + bo.SetCtx(interceptor.WithRPCInterceptor(bo.GetCtx(), s.interceptor)) + } + val, err := s.get(ctx, bo, k) s.recordBackoffInfo(bo) if err != nil { @@ -734,6 +752,22 @@ func (s *KVSnapshot) SetResourceGroupTagger(tagger tikvrpc.ResourceGroupTagger) s.resourceGroupTagger = tagger } +// SetRPCInterceptor sets interceptor.RPCInterceptor for the snapshot. +// interceptor.RPCInterceptor will be executed before each RPC request is initiated. +// Note that SetRPCInterceptor will replace the previously set interceptor. +func (s *KVSnapshot) SetRPCInterceptor(it interceptor.RPCInterceptor) { + s.interceptor = it +} + +// AddRPCInterceptor adds an interceptor, the order of addition is the order of execution. +func (s *KVSnapshot) AddRPCInterceptor(it interceptor.RPCInterceptor) { + if s.interceptor == nil { + s.SetRPCInterceptor(it) + return + } + s.interceptor = interceptor.ChainRPCInterceptors(s.interceptor, it) +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt))