Skip to content

Commit

Permalink
Merge branch 'master' into part-table-topn
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Nov 26, 2022
2 parents e1b4211 + 844ad32 commit e285406
Show file tree
Hide file tree
Showing 94 changed files with 11,123 additions and 9,779 deletions.
52 changes: 30 additions & 22 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ def go_deps():
name = "com_github_dgraph_io_ristretto",
build_file_proto_mode = "disable_global",
importpath = "github.com/dgraph-io/ristretto",
sum = "h1:Wrc3UKTS+cffkOx0xRGFC+ZesNuTfn0ThvEC72N0krk=",
version = "v0.1.1-0.20220403145359-8e850b710d6d",
sum = "h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=",
version = "v0.1.1",
)
go_repository(
name = "com_github_dgrijalva_jwt_go",
Expand Down Expand Up @@ -2880,8 +2880,8 @@ def go_deps():
name = "com_github_pingcap_check",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/check",
sum = "h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0=",
version = "v0.0.0-20191107115940-caf2b9e6ccf4",
sum = "h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=",
version = "v0.0.0-20200212061837-5e12011dc712",
)
go_repository(
name = "com_github_pingcap_errors",
Expand Down Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
version = "v0.0.0-20221114102356-3debb6820e46",
sum = "h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=",
version = "v0.0.0-20221123043343-cdc67325f05f",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3375,8 +3375,8 @@ def go_deps():
name = "com_github_spaolacci_murmur3",
build_file_proto_mode = "disable_global",
importpath = "github.com/spaolacci/murmur3",
sum = "h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=",
version = "v0.0.0-20180118202830-f09979ecbc72",
sum = "h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=",
version = "v1.1.0",
)
go_repository(
name = "com_github_spf13_afero",
Expand Down Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
version = "v2.0.3-0.20221108030801-9c0835c80eba",
sum = "h1:5FFJAKukKDTsLqrEeeDgC89aDAteGEFXBHwKRa3wnnQ=",
version = "v2.0.3-0.20221125022819-f05c6886bbad",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -3861,6 +3861,14 @@ def go_deps():
sum = "h1:b1zWmYuuHz7gO9kDcM/EpHGr06UgsYNRpNJzI2kFiLM=",
version = "v1.5.0",
)
go_repository(
name = "com_google_cloud_go_compute_metadata",
build_file_proto_mode = "disable",
importpath = "cloud.google.com/go/compute/metadata",
sum = "h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ=",
version = "v0.2.0",
)

go_repository(
name = "com_google_cloud_go_datastore",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -4406,22 +4414,22 @@ def go_deps():
name = "org_golang_x_mod",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/mod",
sum = "h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=",
version = "v0.6.0",
sum = "h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=",
version = "v0.7.0",
)
go_repository(
name = "org_golang_x_net",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/net",
sum = "h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=",
version = "v0.1.0",
sum = "h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE=",
version = "v0.0.0-20220411215720-9780585627b5",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_sync",
Expand All @@ -4434,15 +4442,15 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=",
version = "v0.1.0",
sum = "h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sum = "h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw=",
version = "v0.1.0",
sum = "h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_text",
Expand All @@ -4455,8 +4463,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=",
version = "v0.1.0",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "099a9fb96a376ccbbb7d291ed4ecbdfd42f6bc822ab77ae6f1b5cb9e914e94fa",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.35.0/rules_go-v0.35.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.35.0/rules_go-v0.35.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
],
)

Expand Down
17 changes: 17 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,12 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
// data-engines that need to be restore or import. Otherwise, all data-engines should
// be finished already.

handleDataEngineThisRun := false
idxEngineCfg := &backend.EngineConfig{
TableInfo: tr.tableInfo,
}
if indexEngineCp.Status < checkpoints.CheckpointStatusClosed {
handleDataEngineThisRun = true
indexWorker := rc.indexWorkers.Apply()
defer rc.indexWorkers.Recycle(indexWorker)

Expand Down Expand Up @@ -370,11 +372,26 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
return errors.Trace(restoreErr)
}

// if data engine is handled in previous run and we continue importing from checkpoint
if !handleDataEngineThisRun {
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
}
}

if cp.Status < checkpoints.CheckpointStatusIndexImported {
var err error
if indexEngineCp.Status < checkpoints.CheckpointStatusImported {
err = tr.importKV(ctx, closedIndexEngine, rc, indexEngineID)
failpoint.Inject("FailBeforeIndexEngineImported", func() {
finished := rc.status.FinishedFileSize.Load()
total := rc.status.TotalFileSize.Load()
tr.logger.Warn("print lightning status",
zap.Int64("finished", finished),
zap.Int64("total", total),
zap.Bool("equal", finished == total))
panic("forcing failure due to FailBeforeIndexEngineImported")
})
}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
return resp, nil
}

func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
return nil, nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_checkpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ for i in $(seq "$TABLE_COUNT"); do
done
set -e

# at the failure of last table, all data engines are imported so finished == total
grep "print lightning status" "$TEST_DIR/lightning.log" | grep -q "equal=true"

export GO_FAILPOINTS="$SLOWDOWN_FAILPOINTS"
# After everything is done, there should be no longer new calls to ImportEngine
# (and thus `kill_lightning_after_one_import` will spare this final check)
Expand Down
13 changes: 8 additions & 5 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker)
}

func (b *backfillScheduler) initCopReqSenderPool() {
Expand All @@ -744,9 +744,9 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if copCtx == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender")
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
Expand Down Expand Up @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
return nil
}

func injectCheckBackfillWorkerNum(curWorkerSize int) error {
func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
if isMergeWorker {
return nil
}
failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
Expand Down
2 changes: 0 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) {

// Prepare schema.
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t_partition;")
tk.MustExec(`create table t_partition (
c1 int, c2 int, c3 int
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr.
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}

// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
Expand Down
5 changes: 0 additions & 5 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))")
})
}
Expand All @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
})
}
Expand Down Expand Up @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) {
s := createFailDBSuite(t)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec(`create table partition_add_idx (
id int not null,
hired date not null
Expand Down
10 changes: 6 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,13 +753,15 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool {
}

// tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong.
func tryFallbackToTxnMerge(job *model.Job, err error) {
func tryFallbackToTxnMerge(job *model.Job, err error) error {
if job.State != model.JobStateRollingback {
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
job.SnapshotVer = 0
job.RowCount = 0
return nil
}
return err
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
Expand Down Expand Up @@ -801,13 +803,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
tryFallbackToTxnMerge(job, err)
err = tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
tryFallbackToTxnMerge(job, err)
err = tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
}
if !done {
Expand All @@ -820,7 +822,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
} else {
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
tryFallbackToTxnMerge(job, err)
err = tryFallbackToTxnMerge(job, err)
}
ingest.LitBackCtxMgr.Unregister(job.ID)
return false, ver, errors.Trace(err)
Expand Down
Loading

0 comments on commit e285406

Please sign in to comment.