Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: fix unstable unit test, where request id is not accurate (#1674) #1679

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 319 additions & 1 deletion cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func newMockServiceSpecificAddr(

// waitRequestID waits request ID larger than the given allocated ID
func waitRequestID(c *check.C, allocatedID uint64) {
err := retry.Run(time.Millisecond*20, 10, func() error {
err := retry.Run(time.Millisecond*10, 20, func() error {
if currentRequestID() > allocatedID {
return nil
}
Expand Down Expand Up @@ -2387,3 +2387,321 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
time.Sleep(time.Second)
cancel()
}
<<<<<<< HEAD
=======

// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection
// with unstable TiKV store correctly. The test workflow is as follows:
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
regionID4 := uint64(4)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait the second region is scheduled
time.Sleep(time.Millisecond * 500)
close(ch1)
server1.Stop()
// wait the kvClientPendingRegionDelay ends, and the second region is processed
time.Sleep(time.Second * 2)
cancel()
wg.Wait()
}

// TestClientErrNoPendingRegion has the similar procedure with TestClientV1UnlockRangeReentrant
// The difference is the delay injected point for region 2
func (s *etcdSuite) TestClientErrNoPendingRegion(c *check.C) {
defer testleak.AfterTest(c)()
clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()
// test for client v1
s.testClientErrNoPendingRegion(c)

enableKVClientV2 = true
// test for client v2
s.testClientErrNoPendingRegion(c)
}

func (s *etcdSuite) testClientErrNoPendingRegion(c *check.C) {
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
regionID4 := uint64(4)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected error\")")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->2*sleep(1000)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamCloseDelay", "sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamCloseDelay")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
// wait the second region is scheduled
time.Sleep(time.Millisecond * 500)
waitRequestID(c, baseAllocatedID+1)
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized
waitRequestID(c, baseAllocatedID+2)
initialized = mockInitializedEvent(regionID4, currentRequestID())
ch1 <- initialized
// wait the kvClientPendingRegionDelay ends, and the second region is processed
time.Sleep(time.Second * 2)
cancel()
close(ch1)
server1.Stop()
wg.Wait()
}

// TestKVClientForceReconnect force reconnect gRPC stream can work
func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
defer func() {
close(ch1)
server1.Stop()
server1Stopped <- struct{}{}
}()
for {
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
break
}
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
waitRequestID(c, baseAllocatedID+1)
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized

<-server1Stopped

var requestIds sync.Map
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv2 := newMockChangeDataService(c, ch2)
srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
// Reuse the same listen addresss as server 1 to simulate TiKV handles the
// gRPC stream terminate and reconnect.
server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()

// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

initialized = mockInitializedEvent(regionID3, requestID.(uint64))
ch2 <- initialized

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID3,
RequestId: requestID.(uint64),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135},
},
}}
ch2 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
},
}

for _, expectedEv := range expected {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
}

cancel()
}

func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

clientv2 := enableKVClientV2
defer func() {
enableKVClientV2 = clientv2
}()

// test kv client v1
enableKVClientV2 = false
s.testKVClientForceReconnect(c)

enableKVClientV2 = true
s.testKVClientForceReconnect(c)
}
>>>>>>> 7f5a772... kv/client: fix unstable unit test, where request id is not accurate (#1674)