Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into enhance-restore-kvf…
Browse files Browse the repository at this point in the history
…iles
  • Loading branch information
joccau committed Nov 14, 2022
2 parents 8eb3c0a + ba6ae45 commit eda5a08
Show file tree
Hide file tree
Showing 66 changed files with 1,481 additions and 426 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2825,8 +2825,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=",
version = "v0.0.0-20221014081430-26e28e6a281a",
sum = "h1:FYgKV9znRQmzVrrJDZ0gUfMIvKLAMU1tu1UKJib8bEQ=",
version = "v0.0.0-20221026112947-f8d61344b172",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3429,15 +3429,15 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=",
version = "v2.0.1-0.20221017092635-91be9c6ce6c0",
sum = "h1:NvQHWk0GeXSLEBbmGMPnDMc0to0a3ogzgIRbTKw8MHI=",
version = "v2.0.1-0.20221031063202-30e803b7082c",
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:REQOR1XraH1fT9BCoNBPZs1CAe+w7VPLU+d+si7DLYo=",
version = "v0.0.0-20221010134149-d50e5fe43f14",
sum = "h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=",
version = "v0.0.0-20221031025758-80f0d8ca4d07",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) build --remote_download_minimal $(BAZEL_CMD_CONFIG) \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server-check_/tidb-server-check ./bin
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"db.go",
"import.go",
"import_retry.go",
"log_client.go",
"merge.go",
"pipeline_items.go",
"range.go",
Expand Down Expand Up @@ -41,6 +42,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//config",
"//ddl",
"//ddl/util",
Expand Down Expand Up @@ -131,6 +133,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//infoschema",
"//kv",
"//meta/autoid",
Expand Down
195 changes: 25 additions & 170 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

var id uint64

type metaMaker = func(files ...*backuppb.DataFileInfo) *backuppb.Metadata

func wm(start, end, minBegin uint64) *backuppb.DataFileInfo {
i := wr(start, end, minBegin)
i.IsMeta = true
Expand Down Expand Up @@ -155,7 +157,7 @@ func (b *mockMetaBuilder) b(useV2 bool) (*storage.LocalStorage, string) {
return s, path
}

func TestReadMetaBetweenTS(t *testing.T) {
func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
Expand Down Expand Up @@ -251,103 +253,12 @@ func TestReadMetaBetweenTS(t *testing.T) {
}
}

func TestReadMetaBetweenTSV2(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
startTS uint64
endTS uint64
expectedShiftTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
startTS: 4,
endTS: 5,
expectedShiftTS: 3,
expected: []int{0, 1},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
startTS: 50,
endTS: 99,
expectedShiftTS: 1,
expected: []int{0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 98,
expected: []int{1, 0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5)),
m2(wr(101, 200, 101), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 101,
expected: []int{1},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()
init := LogFileManagerInit{
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
req.NoError(err)
metas, err := cli.readStreamMeta(ctx)
req.NoError(err)
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadMetaBetweenTS(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m2) })
}

func TestReadFromMetadata(t *testing.T) {
func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
Expand Down Expand Up @@ -413,70 +324,9 @@ func TestReadFromMetadata(t *testing.T) {
}
}

func TestReadFromMetadataV2(t *testing.T) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
untilTS: 10,
expected: []int{0, 1, 2},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
untilTS: 99,
expected: []int{0},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.LoadUntil(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
for _, m := range meta.metadata {
metas = append(metas, m)
}
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadFromMetadata(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadFromMetadataWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadFromMetadataWithVersion(t, m2) })
}

func dataFileInfoMatches(t *testing.T, listA []*backuppb.DataFileInfo, listB ...*backuppb.DataFileInfo) {
Expand Down Expand Up @@ -528,7 +378,7 @@ func formatL(l []*backuppb.DataFileInfo) string {
return "[" + strings.Join(r.Item, ", ") + "]"
}

func TestFileManager(t *testing.T) {
func testFileManagerWithMeta(t *testing.T, m metaMaker) {
type Case struct {
Metadata []*backuppb.Metadata
StartTS int
Expand All @@ -544,9 +394,9 @@ func TestFileManager(t *testing.T) {
cases := []Case{
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 2,
RestoreTS: 60,
Expand All @@ -556,9 +406,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 5,
RestoreTS: 80,
Expand All @@ -570,9 +420,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 6,
RestoreTS: 80,
Expand Down Expand Up @@ -629,3 +479,8 @@ func TestFileManager(t *testing.T) {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { run(t, c) })
}
}

func TestFileManger(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testFileManagerWithMeta(t, m) })
t.Run("MetaV2", func(t *testing.T) { testFileManagerWithMeta(t, m2) })
}
2 changes: 2 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ go_library(
"@com_github_aws_aws_sdk_go//service/s3/s3manager",
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_golang_snappy//:snappy",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
Expand Down
30 changes: 30 additions & 0 deletions br/pkg/utils/iter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "iter",
srcs = [
"combinator_types.go",
"combinators.go",
"iter.go",
"source.go",
"source_types.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils/iter",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/utils",
"@org_golang_x_exp//constraints",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "iter_test",
srcs = ["combinator_test.go"],
flaky = True,
race = "on",
deps = [
":iter",
"@com_github_stretchr_testify//require",
],
)
3 changes: 2 additions & 1 deletion ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ go_test(
"//parser/terror",
"//parser/types",
"//planner/core",
"//server",
"//session",
"//sessionctx",
"//sessionctx/stmtctx",
Expand Down Expand Up @@ -256,7 +257,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
Expand Down
Loading

0 comments on commit eda5a08

Please sign in to comment.