From 504d468e306a60c465980399880661a682d6cb5b Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 24 Jul 2019 12:10:37 +0800 Subject: [PATCH 1/3] tikv: forbid to try to get a connection forever (#11391) --- store/tikv/client_batch.go | 19 +++++++++++++++---- store/tikv/client_test.go | 23 +++++++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 937f8221e2bef..36318dd9b8e1f 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -408,15 +408,26 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) { // Choose a connection by round-robbin. - var cli *batchCommandsClient - for { + var cli *batchCommandsClient = nil + var target string = "" + for i := 0; i < len(a.batchCommandsClients); i++ { a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) - cli = a.batchCommandsClients[a.index] + target = a.batchCommandsClients[a.index].target // The lock protects the batchCommandsClient from been closed while it's inuse. - if cli.tryLockForSend() { + if a.batchCommandsClients[a.index].tryLockForSend() { + cli = a.batchCommandsClients[a.index] break } } + if cli == nil { + logutil.Logger(context.Background()).Warn("no available connections", zap.String("target", target)) + for _, entry := range entries { + // Please ensure the error is handled in region cache correctly. + entry.err = errors.New("no available connections") + close(entry.res) + } + return + } defer cli.unlockForSend() maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests))) diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index ab6a84d87b9ba..54c0d49fcfa33 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -15,6 +15,7 @@ package tikv import ( "context" + "fmt" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/tikvrpc" ) func TestT(t *testing.T) { @@ -96,3 +98,24 @@ func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) { _, err = sendBatchRequest(context.Background(), "", a, req, 0) c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) } + +func (s *testClientSuite) TestSendWhenReconnect(c *C) { + server, port := startMockTikvService() + c.Assert(port > 0, IsTrue) + + rpcClient := newRPCClient(config.Security{}) + addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + conn, err := rpcClient.getConnArray(addr) + c.Assert(err, IsNil) + + // Suppose all connections are re-establishing. + for _, client := range conn.batchConn.batchCommandsClients { + client.lockForRecreate() + } + + req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) + _, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second) + c.Assert(err.Error() == "no available connections", IsTrue) + conn.Close() + server.Stop() +} From dbc9fd510a69aadfc4c7a0d060c53a8bf0c54eda Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 2 Aug 2019 13:26:41 +0800 Subject: [PATCH 2/3] fix CI Signed-off-by: Shuaipeng Yu --- store/tikv/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 54c0d49fcfa33..c8c4bbd1f92ea 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -113,7 +113,7 @@ func (s *testClientSuite) TestSendWhenReconnect(c *C) { client.lockForRecreate() } - req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) + req := &tikvrpc.Request{Type: tikvrpc.CmdEmpty, Empty: &tikvpb.BatchCommandsEmptyRequest{}} _, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second) c.Assert(err.Error() == "no available connections", IsTrue) conn.Close() From b954e72d41d9396082dc10a614961c478230d5cf Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 2 Aug 2019 15:07:39 +0800 Subject: [PATCH 3/3] make the case stable Signed-off-by: Shuaipeng Yu --- store/tikv/client_fail_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index 96f64bd571629..ad49b5040da1b 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -61,6 +61,7 @@ func (s *testClientFailSuite) TestPanicInRecvLoop(c *C) { time.Sleep(time.Second) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil) + time.Sleep(time.Second) req := &tikvrpc.Request{ Type: tikvrpc.CmdEmpty,