From af573b472983d06c5e6824d3804d6e755809a64f Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 1 Sep 2023 13:30:04 +0800 Subject: [PATCH 1/4] fix restore metakv without default cf files Signed-off-by: Leavrth --- br/pkg/restore/__debug_bin1116519098 | 0 br/pkg/restore/client.go | 28 ++-- br/pkg/restore/client_test.go | 231 +++++++++++++++++++++++++-- 3 files changed, 233 insertions(+), 26 deletions(-) create mode 100644 br/pkg/restore/__debug_bin1116519098 diff --git a/br/pkg/restore/__debug_bin1116519098 b/br/pkg/restore/__debug_bin1116519098 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 5bb09c84695dc..1e97453a6eea5 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2871,6 +2871,11 @@ func (rc *Client) RestoreMetaKVFiles( failpoint.Return(errors.New("failpoint: failed before id maps saved")) }) + log.Info("start to restore meta files", + zap.Int("total files", len(files)), + zap.Int("default files", len(filesInDefaultCF)), + zap.Int("write files", len(filesInWriteCF))) + if schemasReplace.NeedConstructIdMap() { // Preconstruct the map and save it into external storage. if err := rc.PreConstructAndSaveIDMap( @@ -2987,6 +2992,7 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( if i == 0 { rangeMax = f.MaxTs rangeMin = f.MinTs + batchSize = f.Length } else { if f.MinTs <= rangeMax && batchSize+f.Length <= MetaKVBatchSize { rangeMin = mathutil.Min(rangeMin, f.MinTs) @@ -3019,16 +3025,18 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( writeIdx = toWriteIdx } } - if i == len(defaultFiles)-1 { - _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) - if err != nil { - return errors.Trace(err) - } - _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) - if err != nil { - return errors.Trace(err) - } - } + } + + // restore the left meta kv files and entries + // Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty + // Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles) + _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) + if err != nil { + return errors.Trace(err) + } + _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) + if err != nil { + return errors.Trace(err) } return nil diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 081145eb0566e..fe206f0b4b663 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -648,22 +648,32 @@ func MockEmptySchemasReplace() *stream.SchemasReplace { ) } -func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { +func TestRestoreBatchMetaKVFiles(t *testing.T) { + client := restore.MockClient(nil) files := []*backuppb.DataFileInfo{} + // test empty files and entries + next, err := client.RestoreBatchMetaKVFiles(context.Background(), files[0:], nil, make([]*restore.KvEntryWithTS, 0), math.MaxUint64, nil, nil, "") + require.NoError(t, err) + require.Equal(t, 0, len(next)) +} + +func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{} batchCount := 0 client := restore.MockClient(nil) sr := MockEmptySchemasReplace() err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), - files, - files, + files_default, + files_write, sr, nil, nil, func( ctx context.Context, - defaultFiles []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -671,16 +681,19 @@ func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { + require.Equal(t, 0, len(entries)) + require.Equal(t, 0, len(files)) batchCount++ return nil, nil }, ) require.Nil(t, err) - require.Equal(t, batchCount, 0) + require.Equal(t, batchCount, 2) } -func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { - files := []*backuppb.DataFileInfo{ +func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -688,20 +701,66 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { }, } batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) client := restore.MockClient(nil) sr := MockEmptySchemasReplace() err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), - files, + files_default, + files_write, + sr, + nil, nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.DefaultCF, cf) + batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.WriteCF, cf) + } + require.Equal(t, uint64(math.MaxUint64), filterTS) + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 1) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + } + files_write := []*backuppb.DataFileInfo{} + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, sr, nil, nil, func( ctx context.Context, - fs []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -709,17 +768,157 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { - if len(fs) > 0 { - result[batchCount] = fs + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.WriteCF, cf) batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.DefaultCF, cf) } + require.Equal(t, uint64(math.MaxUint64), filterTS) return nil, nil }, ) require.Nil(t, err) require.Equal(t, batchCount, 1) - require.Equal(t, len(result), 1) - require.Equal(t, result[0], files) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + sr, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } else { + require.Equal(t, 0, len(entries)) + } + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) +} + +func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + sr, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } else { + require.Equal(t, 0, len(entries)) + } + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) } func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { @@ -988,13 +1187,13 @@ func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { Path: "f1", MinTs: 100, MaxTs: 120, - Length: 1, + Length: 100, }, { Path: "f2", MinTs: 100, MaxTs: 120, - Length: restore.MetaKVBatchSize, + Length: restore.MetaKVBatchSize - 100, }, { Path: "f3", From fb66fa940aa1d930195af9cb39b05565dc5cf906 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 1 Sep 2023 13:33:10 +0800 Subject: [PATCH 2/4] remove the debug file Signed-off-by: Leavrth --- br/pkg/restore/__debug_bin1116519098 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 br/pkg/restore/__debug_bin1116519098 diff --git a/br/pkg/restore/__debug_bin1116519098 b/br/pkg/restore/__debug_bin1116519098 deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 77988d4bb7b0e7040f7eef924bb4114499357d0d Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 4 Sep 2023 17:06:30 +0800 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: BornChanger <97348524+BornChanger@users.noreply.github.com> --- br/pkg/restore/client_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index fe206f0b4b663..9e450577bf3a0 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -841,9 +841,9 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { require.Equal(t, uint64(100), files[0].MinTs) require.Equal(t, uint64(110), filterTS) return nil, nil - } else { - require.Equal(t, 0, len(entries)) } + + require.Equal(t, 0, len(entries)) } return nil, nil }, @@ -909,9 +909,10 @@ func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { require.Equal(t, uint64(100), files[0].MinTs) require.Equal(t, uint64(110), filterTS) return nil, nil - } else { - require.Equal(t, 0, len(entries)) } + + require.Equal(t, 0, len(entries)) + } return nil, nil }, From 5c63e62f22e57c10ca8e3906fed2362dfc6c702e Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 4 Sep 2023 17:29:19 +0800 Subject: [PATCH 4/4] go fmt Signed-off-by: Leavrth --- br/pkg/restore/client_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 9e450577bf3a0..d9722c42e3abb 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -842,7 +842,6 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { require.Equal(t, uint64(110), filterTS) return nil, nil } - require.Equal(t, 0, len(entries)) } return nil, nil @@ -910,9 +909,7 @@ func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { require.Equal(t, uint64(110), filterTS) return nil, nil } - require.Equal(t, 0, len(entries)) - } return nil, nil },