Skip to content

Commit

Permalink
Merge branch 'master' into dev/functions
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored May 18, 2022
2 parents afdf11f + 9b1ef78 commit 5a1bb1a
Show file tree
Hide file tree
Showing 233 changed files with 4,646 additions and 3,208 deletions.
17 changes: 6 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ut: tools/bin/ut tools/bin/xprog failpoint-enable
gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -timeout 20m -cover $(PACKAGES_TIDB_TESTS) -coverprofile=coverage.txt -check.p true > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -timeout 20m -cover $(PACKAGES_TIDB_TESTS) -coverprofile=coverage.txt > gotest.log || { $(FAILPOINT_DISABLE); cat 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

gotest_in_verify_ci: tools/bin/xprog tools/bin/ut failpoint-enable
Expand All @@ -151,11 +151,6 @@ race: failpoint-enable
@$(FAILPOINT_DISABLE)
@$(CLEAN_UT_BINARY)

leak: failpoint-enable
@export log_level=debug; \
$(GOTEST) -tags leak $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

server:
ifeq ($(TARGET), "")
CGO_ENABLED=1 $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o bin/tidb-server tidb-server/main.go
Expand Down Expand Up @@ -273,7 +268,7 @@ ifeq ("$(pkg)", "")
else
@echo "Running unit test for github.com/pingcap/tidb/$(pkg)"
@export log_level=fatal; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' -cover github.com/pingcap/tidb/$(pkg) -check.p true -check.timeout 4s || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' -cover github.com/pingcap/tidb/$(pkg) || { $(FAILPOINT_DISABLE); exit 1; }
endif
@$(FAILPOINT_DISABLE)

Expand Down Expand Up @@ -336,15 +331,15 @@ br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
@export TZ='Asia/Shanghai';
$(GOTEST) $(RACE_FLAG) -ldflags '$(LDFLAGS)' -tags leak $(ARGS) -coverprofile=coverage.txt || ( make failpoint-disable && exit 1 )
$(GOTEST) $(RACE_FLAG) -ldflags '$(LDFLAGS)' $(ARGS) -coverprofile=coverage.txt || ( make failpoint-disable && exit 1 )
@make failpoint-disable
br_unit_test_in_verify_ci: export ARGS=$$($(BR_PACKAGES))
br_unit_test_in_verify_ci: tools/bin/gotestsum
@make failpoint-enable
@export TZ='Asia/Shanghai';
@mkdir -p $(TEST_COVERAGE_DIR)
CGO_ENABLED=1 tools/bin/gotestsum --junitfile "$(TEST_COVERAGE_DIR)/br-junit-report.xml" -- $(RACE_FLAG) -ldflags '$(LDFLAGS)' \
-tags leak $(ARGS) -coverprofile="$(TEST_COVERAGE_DIR)/br_cov.unit_test.out" || ( make failpoint-disable && exit 1 )
$(ARGS) -coverprofile="$(TEST_COVERAGE_DIR)/br_cov.unit_test.out" || ( make failpoint-disable && exit 1 )
@make failpoint-disable

br_integration_test: br_bins build_br build_for_br_integration_test
Expand Down Expand Up @@ -401,12 +396,12 @@ build_dumpling:

dumpling_unit_test: export DUMPLING_ARGS=$$($(DUMPLING_PACKAGES))
dumpling_unit_test: failpoint-enable
$(DUMPLING_GOTEST) $(RACE_FLAG) -coverprofile=coverage.txt -covermode=atomic -tags leak $(DUMPLING_ARGS) || ( make failpoint-disable && exit 1 )
$(DUMPLING_GOTEST) $(RACE_FLAG) -coverprofile=coverage.txt -covermode=atomic $(DUMPLING_ARGS) || ( make failpoint-disable && exit 1 )
@make failpoint-disable
dumpling_unit_test_in_verify_ci: export DUMPLING_ARGS=$$($(DUMPLING_PACKAGES))
dumpling_unit_test_in_verify_ci: failpoint-enable tools/bin/gotestsum
@mkdir -p $(TEST_COVERAGE_DIR)
CGO_ENABLED=1 tools/bin/gotestsum --junitfile "$(TEST_COVERAGE_DIR)/dumpling-junit-report.xml" -- -tags leak $(DUMPLING_ARGS) \
CGO_ENABLED=1 tools/bin/gotestsum --junitfile "$(TEST_COVERAGE_DIR)/dumpling-junit-report.xml" -- $(DUMPLING_ARGS) \
$(RACE_FLAG) -coverprofile="$(TEST_COVERAGE_DIR)/dumpling_cov.unit_test.out" || ( make failpoint-disable && exit 1 )
@make failpoint-disable

Expand Down
4 changes: 2 additions & 2 deletions bindinfo/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package bindinfo_test
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
Expand Down
4 changes: 2 additions & 2 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newStreamCheckCommand() *cobra.Command {
Short: "get the metadata of log dir.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCheck)
return streamCommand(cmd, task.StreamMetadata)
},
}
return command
Expand All @@ -171,7 +171,7 @@ func streamCommand(command *cobra.Command, cmdName string) error {
}

switch cmdName {
case task.StreamCheck:
case task.StreamMetadata:
{
// do nothing.
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package backup
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/klauspost/compress/zstd.(*blockDec).startDecoder"),
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/checksum/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package checksum
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

Expand All @@ -28,6 +28,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
4 changes: 2 additions & 2 deletions br/pkg/conn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package conn
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

Expand All @@ -27,6 +27,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
4 changes: 2 additions & 2 deletions br/pkg/lightning/checkpoints/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package checkpoints_test
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/klauspost/compress/zstd.(*blockDec).startDecoder"),
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/common/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package common_test
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/mydump/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package mydump
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/metautil/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package metautil
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

Expand All @@ -26,6 +26,6 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
4 changes: 2 additions & 2 deletions br/pkg/pdutil/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package pdutil
import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,11 @@ func (rc *Client) RestoreKVFiles(
summary.CollectInt("File", 1)
log.Info("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, file, rule, rc.restoreTS)
startTS := rc.startTS
if file.Cf == stream.DefaultCF {
startTS = rc.shiftStartTS
}
return rc.fileImporter.ImportKVFiles(ectx, file, rule, startTS, rc.restoreTS)
})
}
}
Expand Down
33 changes: 19 additions & 14 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,12 @@ func (importer *FileImporter) ImportKVFileForRegion(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
info *RegionInfo,
) RPCResult {
// Try to download file.
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, restoreTs)
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, startTS, restoreTS)
if !result.OK() {
errDownload := result.Err
for _, e := range multierr.Errors(errDownload) {
Expand Down Expand Up @@ -380,11 +381,13 @@ func (importer *FileImporter) ClearFiles(ctx context.Context, pdClient pd.Client
return nil
}

// ImportKVFiles restores the kv events.
func (importer *FileImporter) ImportKVFiles(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) error {
startTime := time.Now()
log.Debug("import kv files", zap.String("file", file.Path))
Expand All @@ -398,10 +401,10 @@ func (importer *FileImporter) ImportKVFiles(
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey))

rs := utils.InitialRetryState(16, 100*time.Millisecond, 4*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, rs)
rs := utils.InitialRetryState(32, 100*time.Millisecond, 8*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *RegionInfo) RPCResult {
return importer.ImportKVFileForRegion(ctx, file, rule, restoreTs, r)
return importer.ImportKVFileForRegion(ctx, file, rule, startTS, restoreTS, r)
})

log.Debug("download and apply file done",
Expand Down Expand Up @@ -801,7 +804,8 @@ func (importer *FileImporter) downloadAndApplyKVFile(
file *backuppb.DataFileInfo,
rules *RewriteRules,
regionInfo *RegionInfo,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) RPCResult {
leader := regionInfo.Leader
if leader == nil {
Expand All @@ -823,12 +827,13 @@ func (importer *FileImporter) downloadAndApplyKVFile(
Name: file.Path,
Cf: file.Cf,
// TODO fill the length
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
RestoreTs: restoreTs,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
StartSnapshotTs: startTS,
RestoreTs: restoreTS,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
}

reqCtx := &kvrpcpb.Context{
Expand All @@ -849,7 +854,7 @@ func (importer *FileImporter) downloadAndApplyKVFile(
return RPCResultFromError(errors.Trace(err))
}
if resp.GetError() != nil {
logutil.CL(ctx).Warn("backup meet error", zap.Stringer("error", resp.GetError()))
logutil.CL(ctx).Warn("import meet error", zap.Stringer("error", resp.GetError()))
return RPCResultFromPBError(resp.GetError())
}
return RPCResultOK()
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/restore/import_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ type OverRegionsInRangeController struct {
metaClient SplitClient

errors error
rs utils.RetryState
rs *utils.RetryState
}

// OverRegionsInRange creates a controller that cloud be used to scan regions in a range and
// apply a function over these regions.
// You can then call the `Run` method for applying some functions.
func OverRegionsInRange(start, end []byte, metaClient SplitClient, retryStatus utils.RetryState) OverRegionsInRangeController {
func OverRegionsInRange(start, end []byte, metaClient SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController {
// IMPORTANT: we record the start/end key with TimeStamp.
// but scanRegion will drop the TimeStamp and the end key is exclusive.
// if we do not use PrefixNextKey. we might scan fewer regions than we expected.
Expand Down Expand Up @@ -84,15 +84,14 @@ func (o *OverRegionsInRangeController) tryFindLeader(ctx context.Context, region

// handleInRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, result RPCResult, region *RegionInfo) (cont bool) {

if nl := result.StoreError.GetNotLeader(); nl != nil {
if nl.Leader != nil {
region.Leader = nl.Leader
// try the new leader immediately.
return true
}
// we retry manually, simply record the retry event.
o.rs.RecordRetry()
time.Sleep(o.rs.ExponentialBackoff())
// There may not be leader, waiting...
leader, err := o.tryFindLeader(ctx, region)
if err != nil {
Expand Down
Loading

0 comments on commit 5a1bb1a

Please sign in to comment.