Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce RPC interceptor mechanism #389

Merged
merged 19 commits into from
Dec 18, 2021
Merged

introduce RPC interceptor mechanism #389

merged 19 commits into from
Dec 18, 2021

Conversation

mornyx
Copy link
Contributor

@mornyx mornyx commented Dec 6, 2021

What's changed

Introduce RPC Interceptor mechanism. This is originally designed for TopSQL functionality, but its design is general purpose.

Original PR which introduced this feature:

What is RPCInterceptor

RPCInterceptor is used to decorate the RPC requests to TiKV.

type RPCInterceptor func(next RPCInterceptorFunc) RPCInterceptorFunc

type RPCInterceptorFunc func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error)

The definition of an interceptor is: Given an RPCInterceptorFunc, we will get the decorated RPCInterceptorFunc with additional logic before and after the execution of the given RPCInterceptorFunc.

How to implement an RPCInterceptor

We can implement an RPCInterceptor like this:

func LogInterceptor(next RPCInterceptorFunc) RPCInterceptorFunc {
    return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
        log.Println("before")
        resp, err := next(target, req)
        log.Println("after")
        return resp, err
    }
}

txn.AddRPCInterceptor(LogInterceptor)

Or you want to inject some dependent modules:

func GetLogInterceptor(lg *log.Logger) RPCInterceptor {
    return func(next RPCInterceptorFunc) RPCInterceptorFunc {
        return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
            lg.Println("before")
            resp, err := next(target, req)
            lg.Println("after")
            return resp, err
        }
    }
}

txn.AddRPCInterceptor(GetLogInterceptor())

Signed-off-by: mornyx mornyx.z@gmail.com

Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
@mornyx mornyx marked this pull request as ready for review December 7, 2021 06:03
Copy link
Contributor

@crazycs520 crazycs520 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mornyx mornyx changed the title add interceptor mechanism introduce RPC interceptor mechanism Dec 9, 2021
Signed-off-by: mornyx <mornyx.z@gmail.com>
@breezewish
Copy link
Member

@zhongzc PTAL

Copy link
Member

@breezewish breezewish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rest LGTM

tikvrpc/interceptor.go Outdated Show resolved Hide resolved
tikvrpc/interceptor.go Outdated Show resolved Hide resolved
tikvrpc/interceptor.go Outdated Show resolved Hide resolved
tikvrpc/interceptor.go Outdated Show resolved Hide resolved
tikvrpc/interceptor.go Outdated Show resolved Hide resolved
txnkv/transaction/txn.go Outdated Show resolved Hide resolved
txnkv/transaction/txn.go Outdated Show resolved Hide resolved
@@ -203,6 +205,13 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]
ctx = context.WithValue(ctx, retry.TxnStartKey, s.version)
bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars)

if s.interceptor != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that there are some places missed calling interceptor, for example, batch coprocessor. Although currently we don't need it, it would be great to have that from the perspective of client-go. If don't know how to deal with it correctly, then at least add a TODO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users need to know that they are using KVTxn or KVSnapshot, and the effective area of Interceptor is limited to a specific KVTxn or KVSnapshot.
So when users don't need to use abstract transactions or snapshots, but need to send requests directly (e.g. copr), they need to explicitly add the Interceptor to the ctx associated with the request. That's why we need to provide CtxWithInterceptor API to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the implementation of SQL Exec Count, we have done this to count the distribution of copr requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that actually batch coprocessor is not handled by client-go? Then I think it would be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All coprocessor requests will eventually be processed by ClientHelper in client-go, but ClientHelper is just a simple wrap of inner client, and only provide a SendReq method. In this pattern, I think it is reasonable to let user explicitly call WithCtxInterceptor on ctx.

The ctx of transaction and snapshot is constructed internally, so we provide high-level AddInterceptor() for transaction and snapshot; The ctx requested by the coprocessor is constructed by the user and passed in through ClientHelper.SendReq(ctx), so we let user to call WithCtxInterceptor explicitly.

There is a unified principle here: Where the context.Context constructed, where the Interceptor bound.

@@ -118,6 +118,12 @@ func (s *Scanner) Next() error {
if !s.valid {
return errors.New("scanner iterator is invalid")
}
if s.snapshot.interceptor != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what's the principle to place these interceptor settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The principle is: trace the origin of the ctx associated with the inner gRPC request.

In this example, ctx is created on L117:

bo := retry.NewBackofferWithVars(context.WithValue(context.Background(), retry.TxnStartKey, s.snapshot.version), scannerNextMaxBackoff, s.snapshot.vars)

So we immediately bind the Interceptor on L121:

if s.snapshot.interceptor != nil {
	// User has called snapshot.SetInterceptor() to explicitly set an interceptor, we
	// need to bind it to ctx so that the internal client can perceive and execute
	// it before initiating an RPC request.
	bo.SetCtx(tikvrpc.SetInterceptorIntoCtx(bo.GetCtx(), s.snapshot.interceptor))
}

Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
Copy link
Member

@breezewish breezewish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest LGTM. @disksing @sticnarf @andylokandy PTAL, thanks!

tikvrpc/interceptor/interceptor.go Show resolved Hide resolved
Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
Copy link
Collaborator

@sticnarf sticnarf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mornyx
Copy link
Contributor Author

mornyx commented Dec 16, 2021

The integration test of this PR always fails. Can someone help me confirm if it is caused by my integration test case (integration_tests/interceptor_test.go#TestInterceptor)? I'm not familiar with this yet, thanks!

@breezewish
Copy link
Member

breezewish commented Dec 16, 2021

The integration test of this PR always fails. Can someone help me confirm if it is caused by my integration test case (integration_tests/interceptor_test.go#TestInterceptor)? I'm not familiar with this yet, thanks!

Yeah, or course. The failure output indicate that it's the interceptor test that failed:

2021-12-16T07:28:53.1102516Z --- FAIL: TestInterceptor (0.01s)
2021-12-16T07:28:53.1104432Z     interceptor_test.go:43: 
2021-12-16T07:28:53.1105140Z         	Error Trace:	interceptor_test.go:43
2021-12-16T07:28:53.1105735Z         	Error:      	Not equal: 
2021-12-16T07:28:53.1106289Z         	            	expected: 2
2021-12-16T07:28:53.1106809Z         	            	actual  : 4
2021-12-16T07:28:53.1107361Z         	Test:       	TestInterceptor
2021-12-16T07:28:53.1107973Z     interceptor_test.go:44: 
2021-12-16T07:28:53.1108607Z         	Error Trace:	interceptor_test.go:44
2021-12-16T07:28:53.1109198Z         	Error:      	Not equal: 
2021-12-16T07:28:53.1109736Z         	            	expected: 2
2021-12-16T07:28:53.1110237Z         	            	actual  : 4
2021-12-16T07:28:53.1110799Z         	Test:       	TestInterceptor
2021-12-16T07:28:53.1111411Z     interceptor_test.go:53: 
2021-12-16T07:28:53.1112024Z         	Error Trace:	interceptor_test.go:53
2021-12-16T07:28:53.1112626Z         	Error:      	Not equal: 
2021-12-16T07:28:53.1113162Z         	            	expected: 1
2021-12-16T07:28:53.1113664Z         	            	actual  : 2
2021-12-16T07:28:53.1114433Z         	Test:       	TestInterceptor
2021-12-16T07:28:53.1115100Z     interceptor_test.go:54: 
2021-12-16T07:28:53.1115780Z         	Error Trace:	interceptor_test.go:54
2021-12-16T07:28:53.1116712Z         	Error:      	Not equal: 
2021-12-16T07:28:53.1117151Z         	            	expected: 1
2021-12-16T07:28:53.1117550Z         	            	actual  : 2
2021-12-16T07:28:53.1118000Z         	Test:       	TestInterceptor

There are many "[ERROR]" logs in the wild, but they are unrelated and are produced in normal test process. You need to expand the full output log to see the actual failure cases.

@sticnarf
Copy link
Collaborator

The integration test of this PR always fails. Can someone help me confirm if it is caused by my integration test case (integration_tests/interceptor_test.go#TestInterceptor)? I'm not familiar with this yet, thanks!

goroutine 89 [running]:
runtime/debug.Stack()
	/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
runtime/debug.PrintStack()
	/usr/lib/go/src/runtime/debug/stack.go:16 +0x19
github.com/tikv/client-go/v2/tikvrpc/interceptor.(*MockInterceptorManager).CreateMockInterceptor.func1.1({0xc00005a830, 0xf}, 0x34dc728)
	/home/yilin/Code/client-go/tikvrpc/interceptor/interceptor.go:174 +0x125
github.com/tikv/client-go/v2/internal/client.(*RPCClient).SendRequest(0xc00057bd80, {0x1e31578, 0xc0003d65d0}, {0xc00005a830, 0xf}, 0xc0003fa000, 0x6fc23ac00)
	/home/yilin/Code/client-go/internal/client/client.go:377 +0x10e
github.com/tikv/client-go/v2/internal/client.reqCollapse.SendRequest({{0x1e121a8, 0xc00057bd80}}, {0x1e31578, 0xc0003d65d0}, {0xc00005a830, 0xf}, 0xbd6, 0xbd6)
	/home/yilin/Code/client-go/internal/client/client_collapse.go:74 +0xd6
integration_tests_test.(*mockRPCClient).SendRequest.func1({0x18a4500, 0x0}, 0x28)
	/home/yilin/Code/client-go/integration_tests/interceptor_test.go:65 +0x3f
github.com/tikv/client-go/v2/tikvrpc/interceptor.(*MockInterceptorManager).CreateMockInterceptor.func1.1({0xc00005a830, 0xf}, 0x34dc728)
	/home/yilin/Code/client-go/tikvrpc/interceptor/interceptor.go:177 +0x1a4
integration_tests_test.(*mockRPCClient).SendRequest(0xc000164e50, {0x1e31578, 0xc0003d65d0}, {0xc00005a830, 0x1}, 0xc0001209e0, 0x6fc23ac00)
	/home/yilin/Code/client-go/integration_tests/interceptor_test.go:66 +0x139
github.com/tikv/client-go/v2/internal/locate.(*RegionRequestSender).sendReqToRegion(0xc00017aa60, 0xc000162d80, 0xc0003125b0, 0xc0001ad180, 0xc00017a338)
	/home/yilin/Code/client-go/internal/locate/region_request.go:1130 +0x8a4
github.com/tikv/client-go/v2/internal/locate.(*RegionRequestSender).SendReqCtx(0xc00017aa60, 0xc000162d80, 0xc0001ad180, {0x2, 0x1, 0x1}, 0x6fc23ac00, 0x0, {0x0, 0x0, ...})
	/home/yilin/Code/client-go/internal/locate/region_request.go:982 +0x1090
github.com/tikv/client-go/v2/internal/locate.(*RegionRequestSender).SendReq(...)
	/home/yilin/Code/client-go/internal/locate/region_request.go:231
github.com/tikv/client-go/v2/txnkv/transaction.actionPrewrite.handleSingleBatch({0x0}, 0xc0002c4a80, 0xc000162d80, {{0x2, 0x1, 0x1}, {0x1e47ff0, 0xc00012ecc0}, 0x1})
	/home/yilin/Code/client-go/txnkv/transaction/prewrite.go:212 +0xb67
github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).doActionOnBatches(0xc0002c4a80, 0xc000162d80, {0x1e1d848, 0xc0003e4080}, {0xc0003d66c0, 0xc00012eca0, 0xc00017aec0})
	/home/yilin/Code/client-go/txnkv/transaction/2pc.go:824 +0x31b
github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).doActionOnGroupMutations(0xc0002c4a80, 0xc000162d80, {0x1e1d848, 0xc0003e4080}, {0xc0003d6660, 0x1, 0xc0003e4080})
	/home/yilin/Code/client-go/txnkv/transaction/2pc.go:805 +0xcaf
github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).doActionOnMutations(0xc0002c4a80, 0xc0003d65d0, {0x1e1d848, 0xc0003e4080}, {0x1e47ff0, 0xc00012ec60})
	/home/yilin/Code/client-go/txnkv/transaction/2pc.go:588 +0xc7
github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).prewriteMutations(0x8, 0xc000162d80, {0x1e47ff0, 0xc00012ec60})
	/home/yilin/Code/client-go/txnkv/transaction/prewrite.go:370 +0x23e
github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).execute(0xc0002c4a80, {0x1e31578, 0xc0003d65d0})
	/home/yilin/Code/client-go/txnkv/transaction/2pc.go:1243 +0x4c5
github.com/tikv/client-go/v2/txnkv/transaction.(*KVTxn).Commit(0xc0001acb40, {0x1e31508, 0xc00005a180})
	/home/yilin/Code/client-go/txnkv/transaction/txn.go:427 +0xcff
integration_tests_test.TestInterceptor(0xc000582d00)
	/home/yilin/Code/client-go/integration_tests/interceptor_test.go:41 +0x265
testing.tRunner(0xc000582d00, 0x1c1c530)
	/usr/lib/go/src/testing/testing.go:1259 +0x102
created by testing.(*T).Run
	/usr/lib/go/src/testing/testing.go:1306 +0x35a

From the stack, we can see the interceptor is called twice in the test. One is in the mockRPCClient.SendRequest and the other is around the actual RPC to TiKV.

Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
@sticnarf
Copy link
Collaborator

Emm, I don't think it a good idea that this interceptor integration test is only effective with TiKV. People usually don't start a TiKV to run these integration tests during development.

What do you think about moving the interceptor for SendRequest from RPCClient to reqCollapse?

Signed-off-by: mornyx <mornyx.z@gmail.com>
@mornyx
Copy link
Contributor Author

mornyx commented Dec 16, 2021

Emm, I don't think it a good idea that this interceptor integration test is only effective with TiKV. People usually don't start a TiKV to run these integration tests during development.

What do you think about moving the interceptor for SendRequest from RPCClient to reqCollapse?

Looks good, I have updated~

Copy link
Collaborator

@sticnarf sticnarf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just come up with another question... When a transaction fails to commit, it will launch a background goroutine in twoPhaseCommit.cleanup to remove some prewritten locks. Do you think interceptors should be set to these requests?

internal/client/client_collapse.go Outdated Show resolved Hide resolved
Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
@mornyx
Copy link
Contributor Author

mornyx commented Dec 16, 2021

/run-all-tests

@zhongzc
Copy link

zhongzc commented Dec 17, 2021

Can you reproduce the following error locally?

  go mod edit -replace=github.com/tikv/client-go/v2=../client-go
  go mod tidy
  go build cmd/go-ycsb/*


db/tikv/txn.go:37:13: undefined: tikv.NewTxnClient

@mornyx
Copy link
Contributor Author

mornyx commented Dec 17, 2021

Can you reproduce the following error locally?

  go mod edit -replace=github.com/tikv/client-go/v2=../client-go
  go mod tidy
  go build cmd/go-ycsb/*


db/tikv/txn.go:37:13: undefined: tikv.NewTxnClient

Yes, I reproduced it. It seems that this problem has existed before, all open PRs have this error.

Signed-off-by: mornyx <mornyx.z@gmail.com>
Signed-off-by: mornyx <mornyx.z@gmail.com>
@breezewish
Copy link
Member

Please have a test to cover the change from 0011200

Signed-off-by: mornyx <mornyx.z@gmail.com>
@mornyx
Copy link
Contributor Author

mornyx commented Dec 17, 2021

Please have a test to cover the change from 0011200

done for this

@mornyx
Copy link
Contributor Author

mornyx commented Dec 17, 2021

Hi @sticnarf , could you please help to merge this PR? Thank you

@zhongzc zhongzc merged commit 6165dba into tikv:master Dec 18, 2021
@mornyx mornyx deleted the interceptor branch May 25, 2022 04:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants