diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index f0d58352cf5..4581fc0d051 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -800,8 +800,7 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema logTime = now } return err - }, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), - retry.WithIsRetryableErr(isRetryable)) + }, retry.WithBackoffBaseDelay(10), retry.WithIsRetryableErr(isRetryable)) return snap, err } diff --git a/cdc/kv/client.go b/cdc/kv/client.go index bafbfbfa07c..54c733e8205 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -294,7 +294,6 @@ type CDCKVClient interface { ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, @@ -308,6 +307,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client + config *config.KVClientConfig clusterID uint64 grpcPool GrpcPool @@ -329,11 +329,13 @@ func NewCDCClient( regionCache *tikv.RegionCache, pdClock pdtime.Clock, changefeed string, + cfg *config.KVClientConfig, ) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) c = &CDCClient{ clusterID: clusterID, + config: cfg, pd: pd, kvStorage: kvStorage, grpcPool: grpcPool, @@ -404,14 +406,12 @@ type PullerInitialization interface { // The `Start` and `End` field in input span must be memcomparable encoded. func (c *CDCClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, eventCh chan<- model.RegionFeedEvent, ) error { - s := newEventFeedSession(ctx, c, span, - lockResolver, isPullerInit, - enableOldValue, ts, eventCh) + s := newEventFeedSession( + ctx, c, span, lockResolver, isPullerInit, ts, eventCh) return s.eventFeed(ctx, ts) } @@ -450,8 +450,7 @@ type eventFeedSession struct { // The queue is used to store region that reaches limit rateLimitQueue []regionErrorInfo - rangeLock *regionspan.RegionRangeLock - enableOldValue bool + rangeLock *regionspan.RegionRangeLock // To identify metrics of different eventFeedSession id string @@ -475,25 +474,22 @@ func newEventFeedSession( totalSpan regionspan.ComparableSpan, lockResolver txnutil.LockResolver, isPullerInit PullerInitialization, - enableOldValue bool, startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) - kvClientCfg := config.GetGlobalServerConfig().KVClient rangeLock := regionspan.NewRegionRangeLock( totalSpan.Start, totalSpan.End, startTs, client.changefeed) return &eventFeedSession{ client: client, totalSpan: totalSpan, eventCh: eventCh, - regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit), + regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit), regionCh: make(chan singleRegionInfo, defaultRegionChanSize), errCh: make(chan regionErrorInfo, defaultRegionChanSize), requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize), rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize), rangeLock: rangeLock, - enableOldValue: enableOldValue, lockResolver: lockResolver, isPullerInit: isPullerInit, id: id, @@ -708,11 +704,8 @@ func (s *eventFeedSession) requestRegionToStore( } requestID := allocID() - extraOp := kvrpcpb.ExtraOp_Noop - if s.enableOldValue { - extraOp = kvrpcpb.ExtraOp_ReadOldValue - } - + // Always read old value. + extraOp := kvrpcpb.ExtraOp_ReadOldValue rpcCtx := sri.rpcCtx regionID := rpcCtx.Meta.GetId() req := &cdcpb.ChangeDataRequest{ @@ -922,6 +915,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( limit := 20 nextSpan := span + // Max backoff 500ms. + scanRegionMaxBackoff := int64(500) for { var ( regions []*tikv.Region @@ -961,7 +956,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( zap.Reflect("regions", metas), zap.String("changefeed", s.client.changefeed)) return nil - }, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffMaxDelay(scanRegionMaxBackoff), + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) if retryErr != nil { return retryErr } @@ -1385,7 +1381,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } -func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) { +func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { case cdcpb.Event_Row_DELETE: @@ -1405,14 +1401,10 @@ func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bo StartTs: entry.StartTs, CRTs: entry.CommitTs, RegionID: regionID, + OldValue: entry.GetOldValue(), }, } - // when old-value is disabled, it is still possible for the tikv to send a event containing the old value - // we need avoid a old-value sent to downstream when old-value is disabled - if enableOldValue { - revent.Val.OldValue = entry.GetOldValue() - } return revent, nil } diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index d42ea3d76b3..383b2848c06 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/store/mockstore/mockcopr" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/retry" @@ -193,13 +194,15 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -285,13 +288,15 @@ func prepareBench(b *testing.B, regionNum int) ( defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 6778ce47e4d..25559f48fe5 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -70,17 +70,18 @@ func TestNewClient(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cli := NewCDCClient( + context.Background(), pdClient, nil, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) require.NotNil(t, cli) } func TestAssembleRowEvent(t *testing.T) { testCases := []struct { - regionID uint64 - entry *cdcpb.Event_Row - enableOldValue bool - expected model.RegionFeedEvent - err string + regionID uint64 + entry *cdcpb.Event_Row + expected model.RegionFeedEvent + err string }{{ regionID: 1, entry: &cdcpb.Event_Row{ @@ -90,7 +91,6 @@ func TestAssembleRowEvent(t *testing.T) { Value: []byte("v1"), OpType: cdcpb.Event_Row_PUT, }, - enableOldValue: false, expected: model.RegionFeedEvent{ RegionID: 1, Val: &model.RawKVEntry{ @@ -111,7 +111,6 @@ func TestAssembleRowEvent(t *testing.T) { Value: []byte("v2"), OpType: cdcpb.Event_Row_DELETE, }, - enableOldValue: false, expected: model.RegionFeedEvent{ RegionID: 2, Val: &model.RawKVEntry{ @@ -123,28 +122,6 @@ func TestAssembleRowEvent(t *testing.T) { RegionID: 2, }, }, - }, { - regionID: 3, - entry: &cdcpb.Event_Row{ - StartTs: 1, - CommitTs: 2, - Key: []byte("k2"), - Value: []byte("v2"), - OldValue: []byte("ov2"), - OpType: cdcpb.Event_Row_PUT, - }, - enableOldValue: false, - expected: model.RegionFeedEvent{ - RegionID: 3, - Val: &model.RawKVEntry{ - OpType: model.OpTypePut, - StartTs: 1, - CRTs: 2, - Key: []byte("k2"), - Value: []byte("v2"), - RegionID: 3, - }, - }, }, { regionID: 4, entry: &cdcpb.Event_Row{ @@ -155,7 +132,6 @@ func TestAssembleRowEvent(t *testing.T) { OldValue: []byte("ov3"), OpType: cdcpb.Event_Row_PUT, }, - enableOldValue: true, expected: model.RegionFeedEvent{ RegionID: 4, Val: &model.RawKVEntry{ @@ -177,12 +153,12 @@ func TestAssembleRowEvent(t *testing.T) { Value: []byte("v2"), OpType: cdcpb.Event_Row_UNKNOWN, }, - enableOldValue: false, - err: "[CDC:ErrUnknownKVEventType]unknown kv optype: UNKNOWN, entry: start_ts:1 commit_ts:2 key:\"k2\" value:\"v2\" ", + err: "[CDC:ErrUnknownKVEventType]unknown kv optype: UNKNOWN, entry: start_ts:1 " + + "commit_ts:2 key:\"k2\" value:\"v2\" ", }} for _, tc := range testCases { - event, err := assembleRowEvent(tc.regionID, tc.entry, tc.enableOldValue) + event, err := assembleRowEvent(tc.regionID, tc.entry) require.Equal(t, tc.expected, event) if err != nil { require.Equal(t, tc.err, err.Error()) @@ -342,7 +318,9 @@ func TestConnectOfflineTiKV(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + context.Background(), pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), + "", config.GetDefaultServerConfig().KVClient) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -351,7 +329,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, false, lockResolver, isPullInit, eventCh) + 1, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -442,14 +420,16 @@ func TestRecvLargeMessageSize(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, false, lockResolver, isPullInit, eventCh) + 1, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -540,14 +520,16 @@ func TestHandleError(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -697,7 +679,9 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -705,7 +689,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -763,7 +747,9 @@ func TestClusterIDMismatch(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -772,7 +758,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -830,14 +816,16 @@ func testHandleFeedEvent(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1284,14 +1272,16 @@ func TestStreamSendWithError(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, false, lockerResolver, isPullInit, eventCh) + 100, lockerResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1395,14 +1385,16 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1524,7 +1516,9 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1532,7 +1526,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1730,7 +1724,9 @@ func TestIncompatibleTiKV(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1738,7 +1734,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1806,7 +1802,9 @@ func TestNoPendingRegionError(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1814,7 +1812,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1883,14 +1881,16 @@ func TestDropStaleRequest(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1992,14 +1992,16 @@ func TestResolveLock(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2093,7 +2095,9 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2101,7 +2105,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, errUnreachable, err) }() @@ -2244,14 +2248,16 @@ func testEventAfterFeedStop(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2423,14 +2429,16 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2637,14 +2645,16 @@ func TestResolveLockNoCandidate(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2731,14 +2741,16 @@ func TestFailRegionReentrant(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2811,14 +2823,16 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2876,14 +2890,16 @@ func testClientErrNoPendingRegion(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2952,14 +2968,16 @@ func testKVClientForceReconnect(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3099,14 +3117,16 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3214,14 +3234,16 @@ func TestEvTimeUpdate(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3334,14 +3356,16 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3424,7 +3448,9 @@ func TestPrewriteNotMatchError(t *testing.T) { defer grpcPool.Close() regionCache := tikv.NewRegionCache(pdClient) defer regionCache.Close() - cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "") + cdcClient := NewCDCClient( + ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "", + config.GetDefaultServerConfig().KVClient) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3433,7 +3459,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, false, lockResolver, isPullInit, eventCh) + 100, lockResolver, isPullInit, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3506,12 +3532,14 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession(ctx context.Context) *eventFeedSession { return newEventFeedSession(ctx, - &CDCClient{regionLimiters: defaultRegionEventFeedLimiters}, + &CDCClient{ + regionLimiters: defaultRegionEventFeedLimiters, + config: config.GetDefaultServerConfig().KVClient, + }, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - nil, /*lockResolver*/ - nil, /*isPullerInit*/ - false, /*enableOldValue*/ - 100, /*startTs*/ + nil, /*lockResolver*/ + nil, /*isPullerInit*/ + 100, /*startTs*/ nil /*eventCh*/) } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 35912609566..20e266af017 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -154,23 +154,20 @@ type regionWorker struct { metrics *regionWorkerMetrics - enableOldValue bool - storeAddr string + storeAddr string } func newRegionWorker(s *eventFeedSession, addr string) *regionWorker { - cfg := config.GetGlobalServerConfig().KVClient worker := ®ionWorker{ - session: s, - inputCh: make(chan *regionStatefulEvent, regionWorkerInputChanSize), - outputCh: s.eventCh, - errorCh: make(chan error, 1), - statesManager: newRegionStateManager(-1), - rtsManager: newRegionTsManager(), - rtsUpdateCh: make(chan *regionTsInfo, 1024), - enableOldValue: s.enableOldValue, - storeAddr: addr, - concurrent: cfg.WorkerConcurrent, + session: s, + inputCh: make(chan *regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *regionTsInfo, 1024), + storeAddr: addr, + concurrent: s.client.config.WorkerConcurrent, } return worker } @@ -653,7 +650,7 @@ func (w *regionWorker) handleEventEntry( w.session.regionRouter.Release(state.sri.rpcCtx.Addr) cachedEvents := state.matcher.matchCachedRow() for _, cachedEvent := range cachedEvents { - revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue) + revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { return errors.Trace(err) } @@ -666,7 +663,7 @@ func (w *regionWorker) handleEventEntry( } case cdcpb.Event_COMMITTED: w.metrics.metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) + revent, err := assembleRowEvent(regionID, entry) if err != nil { return errors.Trace(err) } @@ -710,7 +707,7 @@ func (w *regionWorker) handleEventEntry( entry.GetType(), entry.GetOpType()) } - revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) + revent, err := assembleRowEvent(regionID, entry) if err != nil { return errors.Trace(err) } diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 3b425b45081..ae9dd4da874 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/cdc/sorter/memory" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/regionspan" @@ -71,6 +72,7 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { if err != nil { return nil, errors.Trace(err) } + kvCfg := config.GetGlobalServerConfig().KVClient var plr puller.Puller kvStorage := ctx.GlobalVars().KVStorage // kvStorage can be nil only in the test @@ -84,7 +86,9 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { // Add "_ddl_puller" to make it different from table pullers. ctx.ChangefeedVars().ID+"_ddl_puller", startTs, - []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, false) + []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, + kvCfg, + ) } return &ddlPullerImpl{ diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 48c8793dc06..733647e49d5 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/pipeline" "github.com/pingcap/tiflow/pkg/regionspan" @@ -71,6 +72,7 @@ func (n *pullerNode) start(ctx pipeline.NodeContext, wg *errgroup.Group, isActor ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) ctxC = util.PutRoleInCtx(ctxC, util.RoleProcessor) + kvCfg := config.GetGlobalServerConfig().KVClient // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/tiflow/issues/2301. plr := puller.NewPuller( @@ -81,7 +83,10 @@ func (n *pullerNode) start(ctx pipeline.NodeContext, wg *errgroup.Group, isActor ctx.GlobalVars().KVStorage, ctx.GlobalVars().PDClock, n.changefeed, - n.replicaInfo.StartTs, n.tableSpan(ctx), true) + n.replicaInfo.StartTs, + n.tableSpan(ctx), + kvCfg, + ) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) return nil diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e70f1d96cb3..64bdd47fbca 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -674,6 +674,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S kvStorage := ctx.GlobalVars().KVStorage ddlspans := []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()} checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status) + kvCfg := config.GetGlobalServerConfig().KVClient stdCtx := util.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName) stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID) stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor) @@ -685,7 +686,10 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S ctx.GlobalVars().KVStorage, ctx.GlobalVars().PDClock, ctx.ChangefeedVars().ID, - checkpointTs, ddlspans, false) + checkpointTs, + ddlspans, + kvCfg, + ) meta, err := kv.GetSnapshotMeta(kvStorage, checkpointTs) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 2754e5f76e5..fd5cf2f466e 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/puller/frontier" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/regionspan" "github.com/pingcap/tiflow/pkg/txnutil" @@ -53,15 +54,14 @@ type Puller interface { } type pullerImpl struct { - kvCli kv.CDCKVClient - kvStorage tikv.Storage - checkpointTs uint64 - spans []regionspan.ComparableSpan - outputCh chan *model.RawKVEntry - tsTracker frontier.Frontier - resolvedTs uint64 - initialized int64 - enableOldValue bool + kvCli kv.CDCKVClient + kvStorage tikv.Storage + checkpointTs uint64 + spans []regionspan.ComparableSpan + outputCh chan *model.RawKVEntry + tsTracker frontier.Frontier + resolvedTs uint64 + initialized int64 } // NewPuller create a new Puller fetch event start from checkpointTs @@ -76,7 +76,7 @@ func NewPuller( changefeed string, checkpointTs uint64, spans []regionspan.Span, - enableOldValue bool, + cfg *config.KVClientConfig, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -90,17 +90,17 @@ func NewPuller( // the initial ts for frontier to 0. Once the puller level resolved ts // initialized, the ts should advance to a non-zero value. tsTracker := frontier.NewFrontier(0, comparableSpans...) - kvCli := kv.NewCDCKVClient(ctx, pdCli, tikvStorage, grpcPool, regionCache, pdClock, changefeed) + kvCli := kv.NewCDCKVClient( + ctx, pdCli, tikvStorage, grpcPool, regionCache, pdClock, changefeed, cfg) p := &pullerImpl{ - kvCli: kvCli, - kvStorage: tikvStorage, - checkpointTs: checkpointTs, - spans: comparableSpans, - outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), - tsTracker: tsTracker, - resolvedTs: checkpointTs, - initialized: 0, - enableOldValue: enableOldValue, + kvCli: kvCli, + kvStorage: tikvStorage, + checkpointTs: checkpointTs, + spans: comparableSpans, + outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), + tsTracker: tsTracker, + resolvedTs: checkpointTs, + initialized: 0, } return p } @@ -118,8 +118,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, p.enableOldValue, - lockResolver, p, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, p, eventCh) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 19310a67814..f82173a18d9 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/regionspan" @@ -62,6 +63,7 @@ func newMockCDCKVClient( regionCache *tikv.RegionCache, pdClock pdtime.Clock, changefeed string, + cfg *config.KVClientConfig, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -72,7 +74,6 @@ func (mc *mockCDCKVClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, - enableOldValue bool, lockResolver txnutil.LockResolver, isPullerInit kv.PullerInitialization, eventCh chan<- model.RegionFeedEvent, @@ -115,7 +116,6 @@ func newPullerForTest( ctx, cancel := context.WithCancel(context.Background()) store, err := mockstore.NewMockStore() require.Nil(t, err) - enableOldValue := true backupNewCDCKVClient := kv.NewCDCKVClient kv.NewCDCKVClient = newMockCDCKVClient defer func() { @@ -128,7 +128,7 @@ func newPullerForTest( defer regionCache.Close() plr := NewPuller( ctx, pdCli, grpcPool, regionCache, store, pdtime.NewClock4Test(), "", - checkpointTs, spans, enableOldValue) + checkpointTs, spans, config.GetDefaultServerConfig().KVClient) wg.Add(1) go func() { defer wg.Done() diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index a55ebdb12e1..8813640e413 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -315,7 +315,7 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve return err }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), - retry.WithMaxTries(defaultDDLMaxRetryTime), + retry.WithMaxTries(defaultDDLMaxRetry), retry.WithIsRetryableErr(cerror.IsRetryableError)) } @@ -609,7 +609,10 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML zap.Int("num of Rows", dmls.rowCount), zap.Int("bucket", bucket)) return nil - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDMLMaxRetryTime), retry.WithIsRetryableErr(isRetryableDMLError)) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithBackoffMaxDelay(backoffMaxDelayInMs), + retry.WithMaxTries(defaultDMLMaxRetry), + retry.WithIsRetryableErr(isRetryableDMLError)) } type preparedDMLs struct { diff --git a/cdc/sink/mysql_params.go b/cdc/sink/mysql_params.go index 21164a75245..20f48011c28 100644 --- a/cdc/sink/mysql_params.go +++ b/cdc/sink/mysql_params.go @@ -52,6 +52,15 @@ const ( defaultDialTimeout = "2m" defaultSafeMode = true defaultTxnIsolationRC = "READ-COMMITTED" +<<<<<<< HEAD:cdc/sink/mysql_params.go +======= + defaultCharacterSet = "utf8mb4" +) + +var ( + defaultDMLMaxRetry uint64 = 8 + defaultDDLMaxRetry uint64 = 20 +>>>>>>> 5476c8b55 (cdc,retry: fix leader missing by extending region retry duration (#5269)):cdc/sink/mysql/mysql_params.go ) var defaultParams = &sinkParams{ diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 509457e9b21..88b91af900d 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -889,7 +889,11 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) +<<<<<<< HEAD:cdc/sink/mysql_test.go for i := 0; i < defaultDMLMaxRetryTime; i++ { +======= + for i := 0; i < int(defaultDMLMaxRetry); i++ { +>>>>>>> 5476c8b55 (cdc,retry: fix leader missing by extending region retry duration (#5269)):cdc/sink/mysql/mysql_test.go mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -901,8 +905,16 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { } backupGetDBConn := GetDBConnImpl GetDBConnImpl = mockGetDBConnErrDatabaseNotExists +<<<<<<< HEAD:cdc/sink/mysql_test.go defer func() { GetDBConnImpl = backupGetDBConn +======= + backupMaxRetry := defaultDMLMaxRetry + defaultDMLMaxRetry = 2 + defer func() { + GetDBConnImpl = backupGetDBConn + defaultDMLMaxRetry = backupMaxRetry +>>>>>>> 5476c8b55 (cdc,retry: fix leader missing by extending region retry duration (#5269)):cdc/sink/mysql/mysql_test.go }() ctx, cancel := context.WithCancel(context.Background()) diff --git a/errors.toml b/errors.toml index 5573fdbd506..9eea0559ba5 100755 --- a/errors.toml +++ b/errors.toml @@ -793,7 +793,7 @@ pulsar send message failed ["CDC:ErrReachMaxTry"] error = ''' -reach maximum try: %d +reach maximum try: %s, error: %s ''' ["CDC:ErrReactorFinished"] diff --git a/pkg/api/internal/rest/request.go b/pkg/api/internal/rest/request.go index d4ede6120c0..e1afe4ae2b2 100644 --- a/pkg/api/internal/rest/request.go +++ b/pkg/api/internal/rest/request.go @@ -57,7 +57,7 @@ type Request struct { // retry options backoffBaseDelay time.Duration backoffMaxDelay time.Duration - maxRetries int64 + maxRetries uint64 // output err error @@ -188,7 +188,7 @@ func (r *Request) WithBackoffMaxDelay(delay time.Duration) *Request { } // WithMaxRetries specifies the maximum times a request will retry. -func (r *Request) WithMaxRetries(maxRetries int64) *Request { +func (r *Request) WithMaxRetries(maxRetries uint64) *Request { if r.err != nil { return r } diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 7aaa5f67947..9d2115dda1a 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) { }, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(25 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, @@ -244,6 +245,9 @@ num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" +[kv-client] +region-retry-duration = "3s" + [debug] enable-db-sorter = false [debug.db] @@ -313,9 +317,10 @@ server-worker-pool-size = 16 Security: &config.SecurityConfig{}, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(3 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, @@ -454,9 +459,10 @@ cert-allowed-cn = ["dd","ee"] }, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + RegionRetryDuration: config.TomlDuration(25 * time.Second), }, Debug: &config.DebugConfig{ EnableTableActor: false, diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 78cac8a84b0..8219ad8dea1 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -105,7 +105,8 @@ const ( "kv-client": { "worker-concurrent": 8, "worker-pool-size": 0, - "region-scan-limit": 40 + "region-scan-limit": 40, + "region-retry-duration": 25000000000 }, "debug": { "enable-table-actor": false, diff --git a/pkg/config/kvclient.go b/pkg/config/kvclient.go index 0df0261c367..9b28ef16d91 100644 --- a/pkg/config/kvclient.go +++ b/pkg/config/kvclient.go @@ -13,6 +13,8 @@ package config +import cerror "github.com/pingcap/tiflow/pkg/errors" + // KVClientConfig represents config for kv client type KVClientConfig struct { // how many workers will be used for a single region worker @@ -21,4 +23,23 @@ type KVClientConfig struct { WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` // region incremental scan limit for one table in a single store RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` + // the total retry duration of connecting a region + RegionRetryDuration TomlDuration `toml:"region-retry-duration" json:"region-retry-duration"` +} + +// ValidateAndAdjust validates and adjusts the kv client configuration +func (c *KVClientConfig) ValidateAndAdjust() error { + if c.WorkerConcurrent <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be at least 1") + } + if c.RegionScanLimit <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be at least 1") + } + if c.RegionRetryDuration <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "region-scan-limit should be positive") + } + return nil } diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index fcd2e31414d..84f415d79ae 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -96,6 +96,9 @@ var defaultServerConfig = &ServerConfig{ WorkerConcurrent: 8, WorkerPoolSize: 0, // 0 will use NumCPU() * 2 RegionScanLimit: 40, + // The default TiKV region election timeout is [10s, 20s], + // Use 25 seconds to cover region leader missing. + RegionRetryDuration: TomlDuration(25 * time.Second), }, Debug: &DebugConfig{ EnableTableActor: false, @@ -250,11 +253,8 @@ func (c *ServerConfig) ValidateAndAdjust() error { if c.KVClient == nil { c.KVClient = defaultCfg.KVClient } - if c.KVClient.WorkerConcurrent <= 0 { - return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") - } - if c.KVClient.RegionScanLimit <= 0 { - return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + if err = c.KVClient.ValidateAndAdjust(); err != nil { + return errors.Trace(err) } if c.Debug == nil { diff --git a/pkg/config/server_config_test.go b/pkg/config/server_config_test.go index 4c979365e1c..26dd8970ee7 100644 --- a/pkg/config/server_config_test.go +++ b/pkg/config/server_config_test.go @@ -15,6 +15,7 @@ package config import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -91,3 +92,14 @@ func TestDBConfigValidateAndAdjust(t *testing.T) { conf.Compression = "invalid" require.Error(t, conf.ValidateAndAdjust()) } + +func TestKVClientConfigValidateAndAdjust(t *testing.T) { + t.Parallel() + conf := GetDefaultServerConfig().Clone().KVClient + + require.Nil(t, conf.ValidateAndAdjust()) + conf.RegionRetryDuration = TomlDuration(time.Second) + require.Nil(t, conf.ValidateAndAdjust()) + conf.RegionRetryDuration = -TomlDuration(time.Second) + require.Error(t, conf.ValidateAndAdjust()) +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 9495827d43b..9dc7c6a563d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -886,7 +886,7 @@ var ( ) // retry error - ErrReachMaxTry = errors.Normalize("reach maximum try: %d", + ErrReachMaxTry = errors.Normalize("reach maximum try: %s, error: %s", errors.RFCCodeText("CDC:ErrReachMaxTry"), ) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 5d15d7ff39d..cfe52c9290d 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -66,7 +66,7 @@ var ( ) // set to var instead of const for mocking the value to speedup test -var maxTries int64 = 8 +var maxTries uint64 = 8 // Client is a simple wrapper that adds retry to etcd RPC type Client struct { diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 724195a3f0c..942f040253f 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -15,14 +15,16 @@ package retry import ( "math" + "time" ) const ( // defaultBackoffBaseInMs is the initial duration, in Millisecond defaultBackoffBaseInMs = 10.0 // defaultBackoffCapInMs is the max amount of duration, in Millisecond - defaultBackoffCapInMs = 100.0 - defaultMaxTries = 3 + defaultBackoffCapInMs = 100.0 + defaultMaxTries = math.MaxUint64 + defaultMaxRetryDuration = time.Duration(0) ) // Option ... @@ -32,18 +34,20 @@ type Option func(*retryOptions) type IsRetryable func(error) bool type retryOptions struct { - maxTries int64 - backoffBaseInMs float64 - backoffCapInMs float64 - isRetryable IsRetryable + totalRetryDuration time.Duration + maxTries uint64 + backoffBaseInMs float64 + backoffCapInMs float64 + isRetryable IsRetryable } func newRetryOptions() *retryOptions { return &retryOptions{ - maxTries: defaultMaxTries, - backoffBaseInMs: defaultBackoffBaseInMs, - backoffCapInMs: defaultBackoffCapInMs, - isRetryable: func(err error) bool { return true }, + totalRetryDuration: defaultMaxRetryDuration, + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, } } @@ -65,19 +69,20 @@ func WithBackoffMaxDelay(delayInMs int64) Option { } } -// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used -func WithMaxTries(tries int64) Option { +// WithMaxTries configures maximum tries, if tries is 0, 1 will be used +func WithMaxTries(tries uint64) Option { return func(o *retryOptions) { - if tries > 0 { - o.maxTries = tries + if tries == 0 { + tries = 1 } + o.maxTries = tries } } -// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled -func WithInfiniteTries() Option { +// WithTotalRetryDuratoin configures the total retry duration. +func WithTotalRetryDuratoin(retryDuration time.Duration) Option { return func(o *retryOptions) { - o.maxTries = math.MaxInt64 + o.totalRetryDuration = retryDuration } } diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index c7f9d5d8b6e..2080e7cb348 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -97,7 +97,7 @@ func TestDoCancelInfiniteRetry(t *testing.T) { return errors.New("test") } - err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) require.Equal(t, errors.Cause(err), context.DeadlineExceeded) require.GreaterOrEqual(t, callCount, 1, "tries: %d", callCount) require.Less(t, callCount, math.MaxInt64) @@ -114,7 +114,7 @@ func TestDoCancelAtBeginning(t *testing.T) { return errors.New("test") } - err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + err := Do(ctx, f, WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) require.Equal(t, errors.Cause(err), context.Canceled) require.Equal(t, callCount, 0, "tries:%d", callCount) } @@ -147,16 +147,58 @@ func TestDoCornerCases(t *testing.T) { require.Regexp(t, "test", errors.Cause(err)) require.Equal(t, callCount, 2) - var i int64 - for i = -10; i < 10; i++ { + var i uint64 + for i = 0; i < 10; i++ { callCount = 0 - err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) + err = Do(context.Background(), f, + WithBackoffBaseDelay(int64(i)), WithBackoffMaxDelay(int64(i)), WithMaxTries(i)) require.Regexp(t, "test", errors.Cause(err)) require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) - if i > 0 { - require.Equal(t, int64(callCount), i) + if i == 0 { + require.Equal(t, 1, callCount) } else { - require.Equal(t, callCount, defaultMaxTries) + require.Equal(t, int(i), callCount) } } } + +func TestTotalRetryDuration(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("test") + } + + start := time.Now() + err := Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 1, int(math.Round(time.Since(start).Seconds()))) + + start = time.Now() + err = Do( + context.Background(), f, + WithBackoffBaseDelay(math.MinInt64), + WithTotalRetryDuratoin(2*time.Second), + ) + require.Regexp(t, "test", errors.Cause(err)) + require.LessOrEqual(t, 2, int(math.Round(time.Since(start).Seconds()))) +} + +func TestRetryError(t *testing.T) { + t.Parallel() + + f := func() error { + return errors.New("some error info") + } + + err := Do( + context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithMaxTries(2), + ) + require.Regexp(t, "some error info", errors.Cause(err)) + require.Regexp(t, ".*some error info.*", err.Error()) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err.Error()) +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index c5482ebe130..8f6bc12507c 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -17,6 +17,7 @@ import ( "context" "math" "math/rand" + "strconv" "time" "github.com/pingcap/errors" @@ -26,7 +27,8 @@ import ( // Operation is the action need to retry type Operation func() error -// Do execute the specified function at most maxTries times until it succeeds or got canceled +// Do execute the specified function. +// By default, it retries infinitely until it succeeds or got canceled. func Do(ctx context.Context, operation Operation, opts ...Option) error { retryOption := setOptions(opts...) return run(ctx, operation, retryOption) @@ -48,7 +50,8 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } var t *time.Timer - try := 0 + var start time.Time + try := uint64(0) backOff := time.Duration(0) for { err := op() @@ -61,8 +64,17 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } try++ - if int64(try) >= retryOption.maxTries { - return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries) + if try >= retryOption.maxTries { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(strconv.Itoa(int(retryOption.maxTries)), err) + } + if retryOption.totalRetryDuration > 0 { + if start.IsZero() { + start = time.Now() + } else if time.Since(start) > retryOption.totalRetryDuration { + return cerror.ErrReachMaxTry. + Wrap(err).GenWithStackByArgs(retryOption.totalRetryDuration, err) + } } backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) diff --git a/pkg/txnutil/gc/gc_service_test.go b/pkg/txnutil/gc/gc_service_test.go index d32214a8b3e..a4211130513 100644 --- a/pkg/txnutil/gc/gc_service_test.go +++ b/pkg/txnutil/gc/gc_service_test.go @@ -62,7 +62,8 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { s.pdCli.retryCount = 0 err = EnsureChangefeedStartTsSafety(ctx, s.pdCli, "changefeed2", TTL, 65) c.Assert(err, check.NotNil) - c.Assert(err.Error(), check.Equals, "[CDC:ErrReachMaxTry]reach maximum try: 9: not pd leader") + c.Assert(err.Error(), check.Equals, + "[CDC:ErrReachMaxTry]reach maximum try: 9, error: not pd leader: not pd leader") s.pdCli.retryThreshold = 3 s.pdCli.retryCount = 0