diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index d99a1956b79..1aa0d3fe3a3 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -795,7 +795,7 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema logTime = now } return err - }, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable)) + }, retry.WithBackoffBaseDelay(10), retry.WithIsRetryableErr(isRetryable)) return snap, err } diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 241fd715003..0edc10addef 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -293,7 +293,6 @@ type CDCKVClient interface { ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, @@ -307,6 +306,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client + config *config.KVClientConfig clusterID uint64 grpcPool GrpcPool @@ -318,7 +318,14 @@ type CDCClient struct { } // NewCDCClient creates a CDCClient instance -func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) { +func NewCDCClient( + ctx context.Context, + pd pd.Client, + kvStorage tikv.Storage, + grpcPool GrpcPool, + regionCache *tikv.RegionCache, + cfg *config.KVClientConfig, +) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) var store TiKVStorage @@ -333,6 +340,7 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp c = &CDCClient{ clusterID: clusterID, + config: cfg, pd: pd, kvStorage: store, grpcPool: grpcPool, @@ -393,14 +401,12 @@ type PullerInitialization interface { // The `Start` and `End` field in input span must be memcomparable encoded. func (c *CDCClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error { s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span, - lockResolver, isPullerInit, - enableOldValue, ts, eventCh) + lockResolver, isPullerInit, ts, eventCh) return s.eventFeed(ctx, ts) } @@ -441,8 +447,7 @@ type eventFeedSession struct { // The queue is used to store region that reaches limit rateLimitQueue []regionErrorInfo - rangeLock *regionspan.RegionRangeLock - enableOldValue bool + rangeLock *regionspan.RegionRangeLock // To identify metrics of different eventFeedSession id string @@ -468,25 +473,22 @@ func newEventFeedSession( totalSpan regionspan.ComparableSpan, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, - enableOldValue bool, startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) - kvClientCfg := config.GetGlobalServerConfig().KVClient return &eventFeedSession{ client: client, regionCache: regionCache, kvStorage: kvStorage, totalSpan: totalSpan, eventCh: eventCh, - regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit), + regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit), regionCh: make(chan singleRegionInfo, defaultRegionChanSize), errCh: make(chan regionErrorInfo, defaultRegionChanSize), requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize), rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize), rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs), - enableOldValue: enableOldValue, lockResolver: lockResolver, isPullerInit: isPullerInit, id: id, @@ -693,11 +695,8 @@ func (s *eventFeedSession) requestRegionToStore( } requestID := allocID() - extraOp := kvrpcpb.ExtraOp_Noop - if s.enableOldValue { - extraOp = kvrpcpb.ExtraOp_ReadOldValue - } - + // Always read old value. + extraOp := kvrpcpb.ExtraOp_ReadOldValue rpcCtx := sri.rpcCtx regionID := rpcCtx.Meta.GetId() req := &cdcpb.ChangeDataRequest{ @@ -897,6 +896,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( nextSpan := span captureAddr := util.CaptureAddrFromCtx(ctx) + // Max backoff 500ms. + scanRegionMaxBackoff := int64(500) for { var ( regions []*tikv.Region @@ -926,7 +927,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas)) return nil - }, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffMaxDelay(scanRegionMaxBackoff), + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) if retryErr != nil { return retryErr } @@ -1330,7 +1332,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } -func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) { +func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { case cdcpb.Event_Row_DELETE: @@ -1350,14 +1352,10 @@ func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bo StartTs: entry.StartTs, CRTs: entry.CommitTs, RegionID: regionID, + OldValue: entry.GetOldValue(), }, } - // when old-value is disabled, it is still possible for the tikv to send a event containing the old value - // we need avoid a old-value sent to downstream when old-value is disabled - if enableOldValue { - revent.Val.OldValue = entry.GetOldValue() - } return revent, nil } diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index a6b6647ec50..8cc25026810 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" @@ -192,11 +193,11 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -283,11 +284,11 @@ func prepareBench(b *testing.B, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, lockresolver, isPullInit, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 057799318a1..c4566f11d7d 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -93,7 +93,7 @@ func (s *clientSuite) TestNewClient(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache) + cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) c.Assert(cli, check.NotNil) } @@ -101,11 +101,10 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) testCases := []struct { - regionID uint64 - entry *cdcpb.Event_Row - enableOldValue bool - expected model.RegionFeedEvent - err string + regionID uint64 + entry *cdcpb.Event_Row + expected model.RegionFeedEvent + err string }{{ regionID: 1, entry: &cdcpb.Event_Row{ @@ -115,7 +114,6 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { Value: []byte("v1"), OpType: cdcpb.Event_Row_PUT, }, - enableOldValue: false, expected: model.RegionFeedEvent{ RegionID: 1, Val: &model.RawKVEntry{ @@ -136,7 +134,6 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { Value: []byte("v2"), OpType: cdcpb.Event_Row_DELETE, }, - enableOldValue: false, expected: model.RegionFeedEvent{ RegionID: 2, Val: &model.RawKVEntry{ @@ -148,28 +145,6 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { RegionID: 2, }, }, - }, { - regionID: 3, - entry: &cdcpb.Event_Row{ - StartTs: 1, - CommitTs: 2, - Key: []byte("k2"), - Value: []byte("v2"), - OldValue: []byte("ov2"), - OpType: cdcpb.Event_Row_PUT, - }, - enableOldValue: false, - expected: model.RegionFeedEvent{ - RegionID: 3, - Val: &model.RawKVEntry{ - OpType: model.OpTypePut, - StartTs: 1, - CRTs: 2, - Key: []byte("k2"), - Value: []byte("v2"), - RegionID: 3, - }, - }, }, { regionID: 4, entry: &cdcpb.Event_Row{ @@ -180,7 +155,6 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { OldValue: []byte("ov3"), OpType: cdcpb.Event_Row_PUT, }, - enableOldValue: true, expected: model.RegionFeedEvent{ RegionID: 4, Val: &model.RawKVEntry{ @@ -202,12 +176,12 @@ func (s *clientSuite) TestAssembleRowEvent(c *check.C) { Value: []byte("v2"), OpType: cdcpb.Event_Row_UNKNOWN, }, - enableOldValue: false, - err: "[CDC:ErrUnknownKVEventType]unknown kv optype: UNKNOWN, entry: start_ts:1 commit_ts:2 key:\"k2\" value:\"v2\" ", + err: "[CDC:ErrUnknownKVEventType]unknown kv optype: UNKNOWN, entry: start_ts:1 " + + "commit_ts:2 key:\"k2\" value:\"v2\" ", }} for _, tc := range testCases { - event, err := assembleRowEvent(tc.regionID, tc.entry, tc.enableOldValue) + event, err := assembleRowEvent(tc.regionID, tc.entry) c.Assert(event, check.DeepEquals, tc.expected) if err != nil { c.Assert(err.Error(), check.Equals, tc.err) @@ -366,12 +340,12 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -466,12 +440,12 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -565,12 +539,12 @@ func (s *clientSuite) TestHandleError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -723,13 +697,13 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) var wg2 sync.WaitGroup wg2.Add(1) go func() { defer wg2.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockResolver, isPullInit, eventCh) c.Assert(cerror.ErrVersionIncompatible.Equal(err), check.IsTrue) }() @@ -792,14 +766,14 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) var wg2 sync.WaitGroup wg2.Add(1) go func() { defer wg2.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockResolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockResolver, isPullInit, eventCh) c.Assert(cerror.ErrClusterIDMismatch.Equal(err), check.IsTrue) }() @@ -859,12 +833,12 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1309,12 +1283,12 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockerResolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, lockerResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1420,12 +1394,12 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 40) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1550,13 +1524,13 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() defer close(eventCh) - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1760,13 +1734,13 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1837,13 +1811,13 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -1915,12 +1889,12 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2025,12 +1999,12 @@ func (s *clientSuite) TestResolveLock(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2125,13 +2099,13 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) var clientWg sync.WaitGroup clientWg.Add(1) go func() { defer clientWg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(err, check.Equals, errUnreachable) }() @@ -2273,12 +2247,12 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2453,12 +2427,12 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2668,12 +2642,12 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2763,12 +2737,12 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2845,12 +2819,12 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2912,12 +2886,12 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -2990,12 +2964,12 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -3141,12 +3115,12 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -3257,12 +3231,12 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -3377,12 +3351,12 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) wg.Add(1) go func() { defer wg.Done() - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -3469,14 +3443,14 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 10) baseAllocatedID := currentRequestID() wg.Add(1) go func() { defer wg.Done() - err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockResolver, isPullInit, eventCh) + err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, lockResolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) }() @@ -3549,14 +3523,13 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) { func createFakeEventFeedSession(ctx context.Context) *eventFeedSession { return newEventFeedSession(ctx, - &CDCClient{regionLimiters: defaultRegionEventFeedLimiters}, + &CDCClient{regionLimiters: defaultRegionEventFeedLimiters, config: config.GetDefaultServerConfig().KVClient}, nil, /*regionCache*/ nil, /*kvStorage*/ regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - nil, /*lockResolver*/ - nil, /*isPullerInit*/ - false, /*enableOldValue*/ - 100, /*startTs*/ + nil, /*lockResolver*/ + nil, /*isPullerInit*/ + 100, /*startTs*/ nil /*eventCh*/) } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index f579821726b..42b57d83993 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -154,23 +154,20 @@ type regionWorker struct { metrics *regionWorkerMetrics - enableOldValue bool - storeAddr string + storeAddr string } func newRegionWorker(s *eventFeedSession, addr string) *regionWorker { - cfg := config.GetGlobalServerConfig().KVClient worker := ®ionWorker{ - session: s, - inputCh: make(chan *regionStatefulEvent, regionWorkerInputChanSize), - outputCh: s.eventCh, - errorCh: make(chan error, 1), - statesManager: newRegionStateManager(-1), - rtsManager: newRegionTsManager(), - rtsUpdateCh: make(chan *regionTsInfo, 1024), - enableOldValue: s.enableOldValue, - storeAddr: addr, - concurrent: cfg.WorkerConcurrent, + session: s, + inputCh: make(chan *regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *regionTsInfo, 1024), + storeAddr: addr, + concurrent: s.client.config.WorkerConcurrent, } return worker } @@ -643,7 +640,7 @@ func (w *regionWorker) handleEventEntry( w.session.regionRouter.Release(state.sri.rpcCtx.Addr) cachedEvents := state.matcher.matchCachedRow(state.initialized) for _, cachedEvent := range cachedEvents { - revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue) + revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { return errors.Trace(err) } @@ -657,7 +654,7 @@ func (w *regionWorker) handleEventEntry( state.matcher.matchCachedRollbackRow(state.initialized) case cdcpb.Event_COMMITTED: w.metrics.metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) + revent, err := assembleRowEvent(regionID, entry) if err != nil { return errors.Trace(err) } @@ -701,7 +698,7 @@ func (w *regionWorker) handleEventEntry( entry.GetType(), entry.GetOpType()) } - revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) + revent, err := assembleRowEvent(regionID, entry) if err != nil { return errors.Trace(err) } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index b76d2c9c1d7..e19d1e2b8b7 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -163,7 +163,6 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { eventCh := make(chan model.RegionFeedEvent, 2) s := createFakeEventFeedSession(ctx) s.eventCh = eventCh - s.enableOldValue = true span := regionspan.Span{}.Hack() state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index 6520f2559ce..97c72be603b 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/txnutil" @@ -152,14 +153,14 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage tikv.Storage, kvStor grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) regionCache := tikv.NewRegionCache(pdCli) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) startTS := mustGetTimestamp(t, storage) lockresolver := txnutil.NewLockerResolver(storage) isPullInit := &mockPullerInit{} go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, lockresolver, isPullInit, eventCh) require.Equal(t, err, context.Canceled) }() @@ -242,13 +243,13 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) regionCache := tikv.NewRegionCache(pdCli) - cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache) + cli := NewCDCClient(context.Background(), pdCli, storage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient) startTS := mustGetTimestamp(t, storage) lockresolver := txnutil.NewLockerResolver(storage) isPullInit := &mockPullerInit{} go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, lockresolver, isPullInit, checker.eventCh) require.Equal(t, err, context.Canceled) }() @@ -270,7 +271,7 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage tikv.Storage, if i == 1 { checker = newEventChecker(t) go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, lockresolver, isPullInit, checker.eventCh) require.Equal(t, err, context.Canceled) }() } diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 4105ee6e9f3..1ce895fb030 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/cdc/sorter/memory" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/regionspan" @@ -70,12 +71,13 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { if err != nil { return nil, errors.Trace(err) } + kvCfg := config.GetGlobalServerConfig().KVClient var plr puller.Puller kvStorage := ctx.GlobalVars().KVStorage // kvStorage can be nil only in the test if kvStorage != nil { plr = puller.NewPuller(ctx, pdCli, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, kvStorage, startTs, - []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false) + []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, kvCfg) } return &ddlPullerImpl{ diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 653f9e2b7e8..930fd046c91 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/pipeline" "github.com/pingcap/tiflow/pkg/regionspan" @@ -66,10 +67,12 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) + kvCfg := config.GetGlobalServerConfig().KVClient + ctxC = util.PutRoleInCtx(ctxC, util.RoleProcessor) // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/tiflow/issues/2301. plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, - n.replicaInfo.StartTs, n.tableSpan(ctx), true) + n.replicaInfo.StartTs, n.tableSpan(ctx), kvCfg) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) return nil diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b3ab2ec1c8c..206128820ab 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -664,6 +664,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S kvStorage := ctx.GlobalVars().KVStorage ddlspans := []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()} checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status) + kvCfg := config.GetGlobalServerConfig().KVClient stdCtx := util.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName) stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID) ddlPuller := puller.NewPuller( @@ -672,7 +673,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, - checkpointTs, ddlspans, false) + checkpointTs, ddlspans, kvCfg) meta, err := kv.GetSnapshotMeta(kvStorage, checkpointTs) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 571f613a73e..e39cd5ec644 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller/frontier" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/util" @@ -52,15 +53,14 @@ type Puller interface { } type pullerImpl struct { - kvCli kv.CDCKVClient - kvStorage tikv.Storage - checkpointTs uint64 - spans []regionspan.ComparableSpan - outputCh chan *model.RawKVEntry - tsTracker frontier.Frontier - resolvedTs uint64 - initialized int64 - enableOldValue bool + kvCli kv.CDCKVClient + kvStorage tikv.Storage + checkpointTs uint64 + spans []regionspan.ComparableSpan + outputCh chan *model.RawKVEntry + tsTracker frontier.Frontier + resolvedTs uint64 + initialized int64 } // NewPuller create a new Puller fetch event start from checkpointTs @@ -73,7 +73,7 @@ func NewPuller( kvStorage tidbkv.Storage, checkpointTs uint64, spans []regionspan.Span, - enableOldValue bool, + cfg *config.KVClientConfig, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -87,17 +87,16 @@ func NewPuller( // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. tsTracker := frontier.NewFrontier(0, comparableSpans...) - kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache) + kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache, cfg) p := &pullerImpl{ - kvCli: kvCli, - kvStorage: tikvStorage, - checkpointTs: checkpointTs, - spans: comparableSpans, - outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), - tsTracker: tsTracker, - resolvedTs: checkpointTs, - initialized: 0, - enableOldValue: enableOldValue, + kvCli: kvCli, + kvStorage: tikvStorage, + checkpointTs: checkpointTs, + spans: comparableSpans, + outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), + tsTracker: tsTracker, + resolvedTs: checkpointTs, + initialized: 0, } return p } @@ -114,7 +113,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, p.enableOldValue, lockresolver, p, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockresolver, p, eventCh) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index def434506fc..6d4fc1869d6 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" @@ -63,6 +64,7 @@ func newMockCDCKVClient( kvStorage tikv.Storage, grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, + cfg *config.KVClientConfig, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -73,7 +75,6 @@ func (mc *mockCDCKVClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit kv.PullerInitialization, eventCh chan<- model.RegionFeedEvent, @@ -116,7 +117,6 @@ func (s *pullerSuite) newPullerForTest( ctx, cancel := context.WithCancel(context.Background()) store, err := mockstore.NewMockStore() c.Assert(err, check.IsNil) - enableOldValue := true backupNewCDCKVClient := kv.NewCDCKVClient kv.NewCDCKVClient = newMockCDCKVClient defer func() { @@ -127,7 +127,7 @@ func (s *pullerSuite) newPullerForTest( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdCli) defer regionCache.Close() - plr := NewPuller(ctx, pdCli, grpcPool, regionCache, store, checkpointTs, spans, enableOldValue) + plr := NewPuller(ctx, pdCli, grpcPool, regionCache, store, checkpointTs, spans, config.GetDefaultServerConfig().KVClient) wg.Add(1) go func() { defer wg.Done() diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 1bcc70b254d..688967d268e 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -296,7 +296,10 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) } return err - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDDLMaxRetryTime), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithBackoffMaxDelay(backoffMaxDelayInMs), + retry.WithMaxTries(defaultDDLMaxRetry), + retry.WithIsRetryableErr(cerror.IsRetryableError)) } func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error { @@ -604,7 +607,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML zap.Int("num of Rows", dmls.rowCount), zap.Int("bucket", bucket)) return nil - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDMLMaxRetryTime), retry.WithIsRetryableErr(isRetryableDMLError)) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithBackoffMaxDelay(backoffMaxDelayInMs), + retry.WithMaxTries(defaultDMLMaxRetry), + retry.WithIsRetryableErr(isRetryableDMLError)) } type preparedDMLs struct { diff --git a/cdc/sink/mysql_params.go b/cdc/sink/mysql_params.go index b76a318c50d..3b85435dab4 100644 --- a/cdc/sink/mysql_params.go +++ b/cdc/sink/mysql_params.go @@ -36,8 +36,6 @@ const ( DefaultWorkerCount = 16 DefaultMaxTxnRow = 256 - defaultDMLMaxRetryTime = 8 - defaultDDLMaxRetryTime = 20 defaultTiDBTxnMode = "optimistic" defaultFlushInterval = time.Millisecond * 50 defaultBatchReplaceEnabled = true @@ -48,6 +46,11 @@ const ( defaultSafeMode = true ) +var ( + defaultDMLMaxRetry uint64 = 8 + defaultDDLMaxRetry uint64 = 20 +) + var defaultParams = &sinkParams{ workerCount: DefaultWorkerCount, maxTxnRow: DefaultMaxTxnRow, diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index ee1e61bf0f6..ae3de13c0d6 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -882,7 +882,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) - for i := 0; i < defaultDMLMaxRetryTime; i++ { + for i := 0; i < int(defaultDMLMaxRetry); i++ { mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -894,8 +894,11 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { } backupGetDBConn := GetDBConnImpl GetDBConnImpl = mockGetDBConnErrDatabaseNotExists + backupMaxRetry := defaultDMLMaxRetry + defaultDMLMaxRetry = 2 defer func() { GetDBConnImpl = backupGetDBConn + defaultDMLMaxRetry = backupMaxRetry }() ctx, cancel := context.WithCancel(context.Background()) diff --git a/errors.toml b/errors.toml index 043f2eb2605..faf9fcf0805 100755 --- a/errors.toml +++ b/errors.toml @@ -778,7 +778,7 @@ pulsar send message failed ["CDC:ErrReachMaxTry"] error = ''' -reach maximum try: %d +reach maximum try: %s, error: %s ''' ["CDC:ErrReactorFinished"] diff --git a/pkg/api/internal/rest/request.go b/pkg/api/internal/rest/request.go index c4d63c02f14..37d663ee328 100644 --- a/pkg/api/internal/rest/request.go +++ b/pkg/api/internal/rest/request.go @@ -57,7 +57,7 @@ type Request struct { // retry options backoffBaseDelay time.Duration backoffMaxDelay time.Duration - maxRetries int64 + maxRetries uint64 // output err error @@ -187,8 +187,8 @@ func (r *Request) WithBackoffMaxDelay(delay time.Duration) *Request { return r } -// MaxRetries specifies the maximum times a request will retry. -func (r *Request) WithMaxRetries(maxRetries int64) *Request { +// WithMaxRetries specifies the maximum times a request will retry. +func (r *Request) WithMaxRetries(maxRetries uint64) *Request { if r.err != nil { return r } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 915064aab65..4688c4b6d82 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) { }, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(25 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, @@ -240,6 +241,9 @@ num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" +[kv-client] +region-retry-duration = "3s" + [debug] enable-db-sorter = false [debug.db] @@ -309,9 +313,10 @@ server-worker-pool-size = 16 Security: &config.SecurityConfig{}, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(3 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, @@ -446,9 +451,10 @@ cert-allowed-cn = ["dd","ee"] }, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(25 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 2e6206bc0bd..d605a6efe9b 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -105,7 +105,8 @@ const ( "kv-client": { "worker-concurrent": 8, "worker-pool-size": 0, - "region-scan-limit": 40 + "region-scan-limit": 40, + "region-retry-duration": 25000000000 }, "debug": { "enable-table-actor": false, diff --git a/pkg/config/kvclient.go b/pkg/config/kvclient.go index 0df0261c367..9b28ef16d91 100644 --- a/pkg/config/kvclient.go +++ b/pkg/config/kvclient.go @@ -13,6 +13,8 @@ package config +import cerror "github.com/pingcap/tiflow/pkg/errors" + // KVClientConfig represents config for kv client type KVClientConfig struct { // how many workers will be used for a single region worker @@ -21,4 +23,23 @@ type KVClientConfig struct { WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` // region incremental scan limit for one table in a single store RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` + // the total retry duration of connecting a region + RegionRetryDuration TomlDuration `toml:"region-retry-duration" json:"region-retry-duration"` +} + +// ValidateAndAdjust validates and adjusts the kv client configuration +func (c *KVClientConfig) ValidateAndAdjust() error { + if c.WorkerConcurrent <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be at least 1") + } + if c.RegionScanLimit <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be at least 1") + } + if c.RegionRetryDuration <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be positive") + } + return nil } diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 4873f74096b..ac21f666d47 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -96,6 +96,9 @@ var defaultServerConfig = &ServerConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, // 0 will use NumCPU() * 2 RegionScanLimit: 40, + // The default TiKV region election timeout is [10s, 20s], + // Use 25 seconds to cover region leader missing. + RegionRetryDuration: TomlDuration(25 * time.Second), }, Debug: &DebugConfig{ EnableTableActor: false, @@ -247,11 +250,8 @@ func (c *ServerConfig) ValidateAndAdjust() error { if c.KVClient == nil { c.KVClient = defaultCfg.KVClient } - if c.KVClient.WorkerConcurrent <= 0 { - return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") - } - if c.KVClient.RegionScanLimit <= 0 { - return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + if err = c.KVClient.ValidateAndAdjust(); err != nil { + return errors.Trace(err) } if c.Debug == nil { diff --git a/pkg/config/server_config_test.go b/pkg/config/server_config_test.go index b164defe61d..96478cff8bf 100644 --- a/pkg/config/server_config_test.go +++ b/pkg/config/server_config_test.go @@ -15,6 +15,7 @@ package config import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -93,3 +94,14 @@ func TestDBConfigValidateAndAdjust(t *testing.T) { conf.CleanupSpeedLimit = 0 require.Error(t, conf.ValidateAndAdjust()) } + +func TestKVClientConfigValidateAndAdjust(t *testing.T) { + t.Parallel() + conf := GetDefaultServerConfig().Clone().KVClient + + require.Nil(t, conf.ValidateAndAdjust()) + conf.RegionRetryDuration = TomlDuration(time.Second) + require.Nil(t, conf.ValidateAndAdjust()) + conf.RegionRetryDuration = -TomlDuration(time.Second) + require.Error(t, conf.ValidateAndAdjust()) +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 48efe058868..c272e84a941 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -261,7 +261,9 @@ var ( ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota")) // retry error - ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry")) + ErrReachMaxTry = errors.Normalize("reach maximum try: %s, error: %s", + errors.RFCCodeText("CDC:ErrReachMaxTry"), + ) // tcp server error ErrTCPServerClosed = errors.Normalize("The TCP server has been closed", errors.RFCCodeText("CDC:ErrTCPServerClosed")) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 692e2da1e73..4296ac48455 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -64,7 +64,7 @@ var ( ) // set to var instead of const for mocking the value to speedup test -var maxTries int64 = 8 +var maxTries uint64 = 8 // Client is a simple wrapper that adds retry to etcd RPC type Client struct { diff --git a/pkg/pdutil/region_label.go b/pkg/pdutil/region_label.go index e7a53c4fdf9..48d9f9593ae 100644 --- a/pkg/pdutil/region_label.go +++ b/pkg/pdutil/region_label.go @@ -55,7 +55,7 @@ const ( }` ) -var defaultMaxRetry int64 = 3 +var defaultMaxRetry uint64 = 3 // pdAPIClient is api client of Placement Driver. type pdAPIClient struct { diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 724195a3f0c..942f040253f 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -15,14 +15,16 @@ package retry import ( "math" + "time" ) const ( // defaultBackoffBaseInMs is the initial duration, in Millisecond defaultBackoffBaseInMs = 10.0 // defaultBackoffCapInMs is the max amount of duration, in Millisecond - defaultBackoffCapInMs = 100.0 - defaultMaxTries = 3 + defaultBackoffCapInMs = 100.0 + defaultMaxTries = math.MaxUint64 + defaultMaxRetryDuration = time.Duration(0) ) // Option ... @@ -32,18 +34,20 @@ type Option func(*retryOptions) type IsRetryable func(error) bool type retryOptions struct { - maxTries int64 - backoffBaseInMs float64 - backoffCapInMs float64 - isRetryable IsRetryable + totalRetryDuration time.Duration + maxTries uint64 + backoffBaseInMs float64 + backoffCapInMs float64 + isRetryable IsRetryable } func newRetryOptions() *retryOptions { return &retryOptions{ - maxTries: defaultMaxTries, - backoffBaseInMs: defaultBackoffBaseInMs, - backoffCapInMs: defaultBackoffCapInMs, - isRetryable: func(err error) bool { return true }, + totalRetryDuration: defaultMaxRetryDuration, + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, } } @@ -65,19 +69,20 @@ func WithBackoffMaxDelay(delayInMs int64) Option { } } -// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used -func WithMaxTries(tries int64) Option { +// WithMaxTries configures maximum tries, if tries is 0, 1 will be used +func WithMaxTries(tries uint64) Option { return func(o *retryOptions) { - if tries > 0 { - o.maxTries = tries + if tries == 0 { + tries = 1 } + o.maxTries = tries } } -// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled -func WithInfiniteTries() Option { +// WithTotalRetryDuratoin configures the total retry duration. +func WithTotalRetryDuratoin(retryDuration time.Duration) Option { return func(o *retryOptions) { - o.maxTries = math.MaxInt64 + o.totalRetryDuration = retryDuration } } diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index c7f9d5d8b6e..2080e7cb348 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -97,7 +97,7 @@ func TestDoCancelInfiniteRetry(t *testing.T) { return errors.New("test") } - err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) require.Equal(t, errors.Cause(err), context.DeadlineExceeded) require.GreaterOrEqual(t, callCount, 1, "tries: %d", callCount) require.Less(t, callCount, math.MaxInt64) @@ -114,7 +114,7 @@ func TestDoCancelAtBeginning(t *testing.T) { return errors.New("test") } - err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) require.Equal(t, errors.Cause(err), context.Canceled) require.Equal(t, callCount, 0, "tries:%d", callCount) } @@ -147,16 +147,58 @@ func TestDoCornerCases(t *testing.T) { require.Regexp(t, "test", errors.Cause(err)) require.Equal(t, callCount, 2) - var i int64 - for i = -10; i < 10; i++ { + var i uint64 + for i = 0; i < 10; i++ { callCount = 0 - err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) + err = Do(context.Background(), f, + WithBackoffBaseDelay(int64(i)), WithBackoffMaxDelay(int64(i)), WithMaxTries(i)) require.Regexp(t, "test", errors.Cause(err)) require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) - if i > 0 { - require.Equal(t, int64(callCount), i) + if i == 0 { + require.Equal(t, 1, callCount) } else { - require.Equal(t, callCount, defaultMaxTries) + require.Equal(t, int(i), callCount) } } } + +func TestTotalRetryDuration(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("test") + } + + start := time.Now() + err := Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 1, int(math.Round(time.Since(start).Seconds()))) + + start = time.Now() + err = Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(2*time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 2, int(math.Round(time.Since(start).Seconds()))) +} + +func TestRetryError(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("some error info") + } + + err := Do( + context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithMaxTries(2), + ) + require.Regexp(t, "some error info", errors.Cause(err)) + require.Regexp(t, ".*some error info.*", err.Error()) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err.Error()) +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index c5482ebe130..8f6bc12507c 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -17,6 +17,7 @@ import ( "context" "math" "math/rand" + "strconv" "time" "github.com/pingcap/errors" @@ -26,7 +27,8 @@ import ( // Operation is the action need to retry type Operation func() error -// Do execute the specified function at most maxTries times until it succeeds or got canceled +// Do execute the specified function. +// By default, it retries infinitely until it succeeds or got canceled. func Do(ctx context.Context, operation Operation, opts ...Option) error { retryOption := setOptions(opts...) return run(ctx, operation, retryOption) @@ -48,7 +50,8 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } var t *time.Timer - try := 0 + var start time.Time + try := uint64(0) backOff := time.Duration(0) for { err := op() @@ -61,8 +64,17 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } try++ - if int64(try) >= retryOption.maxTries { - return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries) + if try >= retryOption.maxTries { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(strconv.Itoa(int(retryOption.maxTries)), err) + } + if retryOption.totalRetryDuration > 0 { + if start.IsZero() { + start = time.Now() + } else if time.Since(start) > retryOption.totalRetryDuration { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(retryOption.totalRetryDuration, err) + } } backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) diff --git a/pkg/txnutil/gc/gc_service_test.go b/pkg/txnutil/gc/gc_service_test.go index d32214a8b3e..a4211130513 100644 --- a/pkg/txnutil/gc/gc_service_test.go +++ b/pkg/txnutil/gc/gc_service_test.go @@ -62,7 +62,8 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { s.pdCli.retryCount = 0 err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, "changefeed2", TTL, 65) c.Assert(err, check.NotNil) - c.Assert(err.Error(), check.Equals, "[CDC:ErrReachMaxTry]reach maximum try: 9: not pd leader") + c.Assert(err.Error(), check.Equals, + "[CDC:ErrReachMaxTry]reach maximum try: 9, error: not pd leader: not pd leader") s.pdCli.retryThreshold = 3 s.pdCli.retryCount = 0