Skip to content

Commit

Permalink
ddl: support truncate table for label rules (#26932)
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Aug 23, 2021
1 parent 9cd3c35 commit be4cdb2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 11 deletions.
22 changes: 12 additions & 10 deletions ddl/label/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,20 @@ func (r *Rule) Reset(id int64, dbName, tableName string, partName ...string) *Ru
}

var hasDBKey, hasTableKey, hasPartitionKey bool
for _, label := range r.Labels {
if label.Key == dbKey {
label.Value = dbName
for i := range r.Labels {
switch r.Labels[i].Key {
case dbKey:
r.Labels[i].Value = dbName
hasDBKey = true
}
if label.Key == tableKey {
label.Value = tableName
case tableKey:
r.Labels[i].Value = tableName
hasTableKey = true
}
if isPartition && label.Key == partitionKey {
label.Value = partName[0]
hasPartitionKey = true
case partitionKey:
if isPartition {
r.Labels[i].Value = partName[0]
hasPartitionKey = true
}
default:
}
}

Expand Down
12 changes: 11 additions & 1 deletion ddl/label/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (t *testRuleSuite) TestApplyAttributesSpec(c *C) {
c.Assert(rule.Labels[1].Key, Equals, "attr2")
}

func (t *testRuleSuite) TestResetID(c *C) {
func (t *testRuleSuite) TestReset(c *C) {
rule := NewRule()
rule.Reset(1, "db1", "t1")
c.Assert(rule.ID, Equals, "schema/db1/t1")
Expand All @@ -46,4 +46,14 @@ func (t *testRuleSuite) TestResetID(c *C) {

r1 := rule.Clone()
c.Assert(rule, DeepEquals, r1)

r2 := rule.Reset(2, "db2", "t2", "p2")
c.Assert(r2.ID, Equals, "schema/db2/t2/p2")
c.Assert(r2.Labels, HasLen, 3)
c.Assert(rule.Labels[0].Value, Equals, "db2")
c.Assert(rule.Labels[1].Value, Equals, "t2")
c.Assert(rule.Labels[2].Value, Equals, "p2")
r = r2.Rule.(map[string]string)
c.Assert(r["start_key"], Equals, "7480000000000000ff025f720000000000fa")
c.Assert(r["end_key"], Equals, "7480000000000000ff035f720000000000fa")
}
39 changes: 39 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,45 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
}

tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L)
ids := []string{tableRuleID}
if tblInfo.GetPartitionInfo() != nil {
for _, def := range tblInfo.GetPartitionInfo().Definitions {
ids = append(ids, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L))
}
}

oldRules, err := infosync.GetLabelRules(context.TODO(), ids)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to get PD the label rule")
}

var newRules []*label.Rule
for _, r := range oldRules {
if r.ID == tableRuleID {
newRules = append(newRules, r.Clone().Reset(newTableID, job.SchemaName, tblInfo.Name.L))
}
}

if tblInfo.GetPartitionInfo() != nil {
for _, r := range oldRules {
for _, def := range tblInfo.GetPartitionInfo().Definitions {
if r.ID == fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L) {
newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L))
}
}
}
}

// update the key range with same rule id.
patch := label.NewRulePatch(newRules, nil)
err = infosync.UpdateLabelRules(context.TODO(), patch)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the label rules")
}

// Clear the tiflash replica available status.
if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.AvailablePartitionIDs = nil
Expand Down
35 changes: 35 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,3 +892,38 @@ func GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) {
}
return rules, err
}

// GetLabelRules gets the label rules according to the given IDs from PD.
func GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) {
if len(ruleIDs) == 0 {
return nil, nil
}

is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}

if is.etcdCli == nil {
return nil, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return nil, errors.Errorf("pd unavailable")
}

ids, err := json.Marshal(ruleIDs)
if err != nil {
return nil, err
}

rules := []*label.Rule{}
res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
}
return rules, err
}

0 comments on commit be4cdb2

Please sign in to comment.