Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc,retry: fix leader missing by extending region retry duration #5269

Merged
merged 6 commits into from
Apr 30, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,7 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema
logTime = now
}
return err
}, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(),
retry.WithIsRetryableErr(isRetryable))
}, retry.WithBackoffBaseDelay(10), retry.WithIsRetryableErr(isRetryable))

return snap, err
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ type CDCKVClient interface {
ctx context.Context,
span regionspan.ComparableSpan,
ts uint64,
enableOldValue bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the enableOldValue flag in this PR? Is there an issue related to this item?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enableOldValue param has been deprecated monthes ago, it's a cleanup for dead code.

lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
Expand All @@ -308,6 +307,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
clusterID uint64

grpcPool GrpcPool
Expand All @@ -329,11 +329,13 @@ func NewCDCClient(
regionCache *tikv.RegionCache,
pdClock pdtime.Clock,
changefeed string,
cfg *config.KVClientConfig,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

c = &CDCClient{
clusterID: clusterID,
config: cfg,
pd: pd,
kvStorage: kvStorage,
grpcPool: grpcPool,
Expand Down Expand Up @@ -404,14 +406,12 @@ type PullerInitialization interface {
// The `Start` and `End` field in input span must be memcomparable encoded.
func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(ctx, c, span,
lockResolver, isPullerInit,
enableOldValue, ts, eventCh)
s := newEventFeedSession(
ctx, c, span, lockResolver, isPullerInit, ts, eventCh)
return s.eventFeed(ctx, ts)
}

Expand Down Expand Up @@ -475,25 +475,22 @@ func newEventFeedSession(
totalSpan regionspan.ComparableSpan,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
enableOldValue bool,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs, client.changefeed)
return &eventFeedSession{
client: client,
totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit),
regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit),
regionCh: make(chan singleRegionInfo, defaultRegionChanSize),
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize),
rangeLock: rangeLock,
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
id: id,
Expand Down Expand Up @@ -922,6 +919,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
limit := 20
nextSpan := span

// Max backoff 500ms.
scanRegionMaxBackoff := int64(500)
for {
var (
regions []*tikv.Region
Expand Down Expand Up @@ -961,7 +960,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
zap.Reflect("regions", metas),
zap.String("changefeed", s.client.changefeed))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffMaxDelay(scanRegionMaxBackoff),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
if retryErr != nil {
return retryErr
}
Expand Down
13 changes: 9 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -199,13 +200,15 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "",
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
100, false, lockResolver, isPullInit, eventCh)
100, lockResolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -291,13 +294,15 @@ func prepareBench(b *testing.B, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "")
cdcClient := NewCDCClient(
ctx, pdClient, kvStorage, grpcPool, regionCache, pdtime.NewClock4Test(), "",
config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")},
100, false, lockResolver, isPullInit, eventCh)
100, lockResolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading