Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into fk-chec…
Browse files Browse the repository at this point in the history
…k-var-enable-default
  • Loading branch information
crazycs520 committed Dec 7, 2022
2 parents 97efb9c + d7d059c commit a353b7a
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 88 deletions.
1 change: 1 addition & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) {
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")
marked := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// blocking the thread.
// this may happen when TiKV goes down or too busy.
<-(chan struct{})(nil)
return nil
}
s.clientMu.Unlock()
marked = true
}
req.True(marked, "failed to mark the cluster: ")
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(c *config.Config) {
// ... So the tick timeout would be 100ms
c.TickDuration = 10 * time.Millisecond
})
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.ErrorIs(errors.Cause(err), context.DeadlineExceeded)
}
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type fakeStore struct {
id uint64
regions map[uint64]*region

clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
}

type fakeCluster struct {
Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) {
}

func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
if f.onGetRegionCheckpoint != nil {
err := f.onGetRegionCheckpoint(in)
if err != nil {
return nil, err
}
}
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
}
2 changes: 1 addition & 1 deletion br/tests/lightning_fail_fast/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ for CFG in chunk engine; do
! run_lightning --backend tidb --enable-checkpoint=0 --log-file "$TEST_DIR/lightning-tidb.log" --config "tests/$TEST_NAME/$CFG.toml"
[ $? -eq 0 ]

tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062: Duplicate entry '1-1' for key 'tb.uq'"
tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062 (23000): Duplicate entry '1-1' for key 'tb.uq'"

! grep -Fq "restore file completed" $TEST_DIR/lightning-tidb.log
[ $? -eq 0 ]
Expand Down
38 changes: 20 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,15 @@ type Performance struct {
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`

IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -923,16 +924,17 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
PlanReplayerDumpWorkerConcurrency: 1,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
34 changes: 23 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,17 +1573,31 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}

// SetupPlanReplayerHandle setup plan replayer handle
func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.Context) {
func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, workersSctxs []sessionctx.Context) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
do.planReplayerHandle = &planReplayerHandle{}
do.planReplayerHandle.planReplayerTaskCollectorHandle = &planReplayerTaskCollectorHandle{
ctx: ctx,
sctx: collectorSctx,
}
taskCH := make(chan *PlanReplayerDumpTask, 16)
taskStatus := &planReplayerDumpTaskStatus{}
taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{}

do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
ctx: ctx,
sctx: dumperSctx,
taskCH: make(chan *PlanReplayerDumpTask, 16),
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = make([]*planReplayerTaskDumpWorker, 0)
for i := 0; i < len(workersSctxs); i++ {
worker := &planReplayerTaskDumpWorker{
ctx: ctx,
sctx: workersSctxs[i],
taskCH: taskCH,
status: taskStatus,
}
do.planReplayerHandle.planReplayerTaskDumpHandle.workers = append(do.planReplayerHandle.planReplayerTaskDumpHandle.workers, worker)
}
}

Expand All @@ -1598,6 +1612,7 @@ func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) {
// SetupDumpFileGCChecker setup sctx
func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {
do.dumpFileGcChecker.setupSctx(ctx)
do.dumpFileGcChecker.planReplayerTaskStatus = do.planReplayerHandle.status
}

var planReplayerHandleLease atomic.Uint64
Expand Down Expand Up @@ -1650,14 +1665,11 @@ func (do *Domain) StartPlanReplayerHandle() {
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
}()
for {
select {
case <-do.exit:
return
case task := <-do.planReplayerHandle.planReplayerTaskDumpHandle.taskCH:
do.planReplayerHandle.HandlePlanReplayerDumpTask(task)
}
for _, worker := range do.planReplayerHandle.planReplayerTaskDumpHandle.workers {
go worker.run()
}
<-do.exit
do.planReplayerHandle.planReplayerTaskDumpHandle.Close()
}()
}

Expand Down
Loading

0 comments on commit a353b7a

Please sign in to comment.