Skip to content

Commit

Permalink
*: cherry pick #37252 #37179 (#38481)
Browse files Browse the repository at this point in the history
ref #37171
  • Loading branch information
lcwangchao authored Oct 14, 2022
1 parent 5263a0a commit 852d31d
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 53 deletions.
15 changes: 15 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ func (d *ddl) PollTiFlashRoutine() {
if err != nil {
logutil.BgLogger().Fatal("TiFlashManagement init failed", zap.Error(err))
}

hasSetTiFlashGroup := false
nextSetTiFlashGroupTime := time.Now()
for {
select {
case <-d.ctx.Done():
Expand All @@ -586,6 +589,18 @@ func (d *ddl) PollTiFlashRoutine() {
failpoint.Inject("BeforePollTiFlashReplicaStatusLoop", func() {
failpoint.Continue()
})

if !hasSetTiFlashGroup && !time.Now().Before(nextSetTiFlashGroupTime) {
// We should set tiflash rule group a higher index than other placement groups to forbid override by them.
// Once `SetTiFlashGroupConfig` succeed, we do not need to invoke it again. If failed, we should retry it util success.
if err = infosync.SetTiFlashGroupConfig(d.ctx); err != nil {
logutil.BgLogger().Warn("SetTiFlashGroupConfig failed", zap.Error(err))
nextSetTiFlashGroupTime = time.Now().Add(time.Minute)
} else {
hasSetTiFlashGroup = true
}
}

sctx, err := d.sessPool.get()
if err == nil {
if d.ownerManager.IsOwner() {
Expand Down
18 changes: 18 additions & 0 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,21 @@ func TestTiFlashBatchUnsupported(t *testing.T) {
require.Equal(t, "In total 2 tables: 1 succeed, 0 failed, 1 skipped", tk.Session().GetSessionVars().StmtCtx.GetMessage())
tk.MustGetErrCode("alter database information_schema set tiflash replica 1", 8200)
}

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
_ = testkit.NewTestKit(t, s.store)
timeout := time.Now().Add(10 * time.Second)
errMsg := "time out"
for time.Now().Before(timeout) {
time.Sleep(100 * time.Millisecond)
if s.tiflash.GroupIndex != 0 {
errMsg = "invalid group index"
break
}
}
require.Equal(t, placement.RuleIndexTiFlash, s.tiflash.GroupIndex, errMsg)
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexTable)
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexPartition)
}
4 changes: 0 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,6 @@ func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDe
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 && tableHasPlacementSettings(tblInfo) {
return nil, errors.Trace(dbterror.ErrIncompatibleTiFlashAndPlacement)
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions ddl/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
)

const (
// TiFlashRuleGroupID is the rule group id of tiflash
TiFlashRuleGroupID = "tiflash"
// BundleIDPrefix is the bundle prefix of all rule bundles from TiDB_DDL statements.
BundleIDPrefix = "TiDB_DDL_"
// PDBundleID is the bundle name of pd, the default bundle for all regions.
Expand Down
7 changes: 7 additions & 0 deletions ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
Learner PeerRoleType = "learner"
)

// RuleGroupConfig defines basic config of rule group
type RuleGroupConfig struct {
ID string `json:"id"`
Index int `json:"index"`
Override bool `json:"override"`
}

// Rule is the core placement rule struct. Check https://github.com/tikv/pd/blob/master/server/schedule/placement/rule.go.
type Rule struct {
GroupID string `json:"group_id"`
Expand Down
16 changes: 0 additions & 16 deletions ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,3 @@ func checkPlacementPolicyNotUsedByTable(tblInfo *model.TableInfo, policy *model.

return nil
}

func tableHasPlacementSettings(tblInfo *model.TableInfo) bool {
if tblInfo.PlacementPolicyRef != nil {
return true
}

if tblInfo.Partition != nil {
for _, def := range tblInfo.Partition.Definitions {
if def.PlacementPolicyRef != nil {
return true
}
}
}

return false
}
80 changes: 65 additions & 15 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/dbterror"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -583,8 +583,23 @@ func TestPlacementMode(t *testing.T) {

}

func checkTiflashReplicaSet(t *testing.T, do *domain.Domain, db, tb string, cnt uint64) {
tbl, err := do.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(tb))
require.NoError(t, err)

tiflashReplica := tbl.Meta().TiFlashReplica
if cnt == 0 {
require.Nil(t, tiflashReplica)
return
}

CheckPlacementRule(infosync.GetMockTiFlash(), *infosync.MakeNewRule(tbl.Meta().ID, 1, nil))
require.NotNil(t, tiflashReplica)
require.Equal(t, cnt, tiflashReplica.Count)
}

func TestPlacementTiflashCheck(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
store, do, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`))
Expand All @@ -593,39 +608,65 @@ func TestPlacementTiflashCheck(t *testing.T) {
require.NoError(t, err)
}()

tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()

tk.MustExec("use test")
tk.MustExec("drop placement policy if exists p1")
tk.MustExec("drop placement policy if exists p2")
tk.MustExec("drop table if exists tp")

tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'")
defer tk.MustExec("drop placement policy if exists p1")

tk.MustExec("create placement policy p2 primary_region='r2' regions='r1,r2'")
defer tk.MustExec("drop placement policy if exists p2")

tk.MustExec(`CREATE TABLE tp (id INT) PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
defer tk.MustExec("drop table if exists tp")
checkTiflashReplicaSet(t, do, "test", "tp", 0)
tk.MustExec("alter table tp set tiflash replica 1")

err := tk.ExecToErr("alter table tp placement policy p1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
err = tk.ExecToErr("alter table tp partition p0 placement policy p1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
tk.MustExec("alter table tp placement policy p1")
checkExistTableBundlesInPD(t, do, "test", "tp")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("alter table tp partition p0 placement policy p2")
checkExistTableBundlesInPD(t, do, "test", "tp")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))

tk.MustExec("drop table tp")
tk.MustExec(`CREATE TABLE tp (id INT) placement policy p1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
checkTiflashReplicaSet(t, do, "test", "tp", 0)

tk.MustExec("alter table tp set tiflash replica 1")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
checkExistTableBundlesInPD(t, do, "test", "tp")
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
Expand All @@ -639,8 +680,11 @@ func TestPlacementTiflashCheck(t *testing.T) {
PARTITION p0 VALUES LESS THAN (100) placement policy p1 ,
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
checkTiflashReplicaSet(t, do, "test", "tp", 0)

tk.MustExec("alter table tp set tiflash replica 1")
checkExistTableBundlesInPD(t, do, "test", "tp")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
Expand All @@ -654,8 +698,11 @@ func TestPlacementTiflashCheck(t *testing.T) {
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
checkTiflashReplicaSet(t, do, "test", "tp", 0)

tk.MustExec("alter table tp set tiflash replica 1")
checkExistTableBundlesInPD(t, do, "test", "tp")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
Expand All @@ -669,8 +716,11 @@ func TestPlacementTiflashCheck(t *testing.T) {
PARTITION p0 VALUES LESS THAN (100) PLACEMENT POLICY p1,
PARTITION p1 VALUES LESS THAN (1000)
)`)
err = tk.ExecToErr("alter table tp set tiflash replica 1")
require.True(t, dbterror.ErrIncompatibleTiFlashAndPlacement.Equal(err))
checkTiflashReplicaSet(t, do, "test", "tp", 0)

tk.MustExec("alter table tp set tiflash replica 1")
checkExistTableBundlesInPD(t, do, "test", "tp")
checkTiflashReplicaSet(t, do, "test", "tp", 1)
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
Expand Down
15 changes: 0 additions & 15 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,11 +1076,6 @@ func (w *worker) onSetTableFlashReplica(d *ddlCtx, t *meta.Meta, job *model.Job)
return ver, errors.Trace(err)
}

if replicaInfo.Count > 0 && tableHasPlacementSettings(tblInfo) {
job.State = model.JobStateCancelled
return ver, errors.Trace(dbterror.ErrIncompatibleTiFlashAndPlacement)
}

// Ban setting replica count for tables in system database.
if tidb_util.IsMemOrSysDB(job.SchemaName) {
return ver, errors.Trace(dbterror.ErrUnsupportedAlterReplicaForSysTable)
Expand Down Expand Up @@ -1419,11 +1414,6 @@ func onAlterTablePartitionPlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return 0, err
}

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 {
job.State = model.JobStateCancelled
return 0, errors.Trace(dbterror.ErrIncompatibleTiFlashAndPlacement)
}

ptInfo := tblInfo.GetPartitionInfo()
var partitionDef *model.PartitionDefinition
definitions := ptInfo.Definitions
Expand Down Expand Up @@ -1489,11 +1479,6 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
return 0, err
}

if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0 {
job.State = model.JobStateCancelled
return 0, errors.Trace(dbterror.ErrIncompatibleTiFlashAndPlacement)
}

if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(t, job, policyRefInfo); err != nil {
return 0, errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,16 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rul
return is.labelRuleManager.GetLabelRules(ctx, ruleIDs)
}

// SetTiFlashGroupConfig is a helper function to set tiflash rule group config
func SetTiFlashGroupConfig(ctx context.Context) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("SetTiFlashGroupConfig")
return is.tiflashPlacementManager.SetTiFlashGroupConfig(ctx)
}

// SetTiFlashPlacementRule is a helper function to set placement rule.
// It is discouraged to use SetTiFlashPlacementRule directly,
// use `ConfigureTiFlashPDForTable`/`ConfigureTiFlashPDForPartitions` instead.
Expand Down
Loading

0 comments on commit 852d31d

Please sign in to comment.