Skip to content

Commit

Permalink
*: flashback/recover a table will clear its placement settings (#31670)
Browse files Browse the repository at this point in the history
close #31668
  • Loading branch information
lcwangchao authored Jan 14, 2022
1 parent cd56aba commit f637fed
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 59 deletions.
18 changes: 9 additions & 9 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,11 +2181,11 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("recover table tp1")
tk.MustQuery("show create table tp1").Check(testkit.Rows("tp1 CREATE TABLE `tp1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp1")

// test flashback
Expand All @@ -2201,11 +2201,11 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("flashback table tp2")
tk.MustQuery("show create table tp2").Check(testkit.Rows("tp2 CREATE TABLE `tp2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp2")

// test recover after police drop
Expand All @@ -2217,10 +2217,10 @@ func (s *testDBSuite6) TestRecoverTableWithPlacementPolicy(c *C) {
tk.MustExec("flashback table tp2 to tp3")
tk.MustQuery("show create table tp3").Check(testkit.Rows("tp3 CREATE TABLE `tp3` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2,r3\" */,\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r3\" REGIONS=\"r3,r4\" */)"))
" PARTITION `p2` VALUES LESS THAN (10000))"))
checkExistTableBundlesInPD(c, s.dom, "test", "tp3")
}
36 changes: 28 additions & 8 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,8 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

bundles, err := placement.NewFullTableBundles(t, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
// Clear all placement when recover
err = clearTablePlacementAndBundles(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -465,6 +459,32 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return ver, nil
}

func clearTablePlacementAndBundles(tblInfo *model.TableInfo) error {
var bundles []*placement.Bundle
if tblInfo.PlacementPolicyRef != nil || tblInfo.DirectPlacementOpts != nil {
tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = nil
bundles = append(bundles, placement.NewBundle(tblInfo.ID))
}

if tblInfo.Partition != nil {
for i := range tblInfo.Partition.Definitions {
par := &tblInfo.Partition.Definitions[i]
if par.PlacementPolicyRef != nil || par.DirectPlacementOpts != nil {
par.PlacementPolicyRef = nil
par.DirectPlacementOpts = nil
bundles = append(bundles, placement.NewBundle(par.ID))
}
}
}

if len(bundles) == 0 {
return nil
}

return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}

// mockRecoverTableCommitErrOnce uses to make sure
// `mockRecoverTableCommitErr` only mock error once.
var mockRecoverTableCommitErrOnce uint32
Expand Down
42 changes: 0 additions & 42 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,6 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand All @@ -639,40 +635,6 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
return err
}

// recoverTablePlacement is used when recover/flashback table.
// It will replace the placement policy of table with the direct options because the original policy may be deleted
func recoverTablePlacement(snapshotMeta *meta.Meta, tblInfo *model.TableInfo) (*model.TableInfo, error) {
if ref := tblInfo.PlacementPolicyRef; ref != nil {
policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

tblInfo.PlacementPolicyRef = nil
tblInfo.DirectPlacementOpts = policy.PlacementSettings
}

if tblInfo.Partition != nil {
for idx := range tblInfo.Partition.Definitions {
def := &tblInfo.Partition.Definitions[idx]
ref := def.PlacementPolicyRef
if ref == nil {
continue
}

policy, err := snapshotMeta.GetPolicy(ref.ID)
if err != nil {
return nil, errors.Trace(err)
}

def.PlacementPolicyRef = nil
def.DirectPlacementOpts = policy.PlacementSettings
}
}

return tblInfo, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down Expand Up @@ -799,10 +761,6 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return err
}

if tblInfo, err = recoverTablePlacement(m, tblInfo); err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
Expand Down

0 comments on commit f637fed

Please sign in to comment.