Skip to content

Commit

Permalink
Merge branch 'master' into issue-26178
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Aug 30, 2021
2 parents e98d0dd + 00662f4 commit bfba2bb
Show file tree
Hide file tree
Showing 59 changed files with 2,052 additions and 1,004 deletions.
20 changes: 0 additions & 20 deletions .github/workflows/issue_assigned.yml

This file was deleted.

15 changes: 8 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func BuildBackupRangeAndSchema(
continue
}

idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.RowIDAllocType)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.SequenceType)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.AutoRandomType)

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -294,14 +290,19 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = seqAlloc.NextGlobalAutoID()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -311,7 +312,7 @@ func BuildBackupRangeAndSchema(
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewPanickingAllocators(base int64) autoid.Allocators {
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
func (alloc *panickingAllocator) Rebase(newBase int64, allocIDs bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
Expand Down
11 changes: 6 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
// tidb_rowid have a default value.
defaultCols[model.ExtraHandleName.String()] = struct{}{}

for _, dataFile := range tableInfo.DataFiles {
// only check the first file of this table.
if len(tableInfo.DataFiles) > 0 {
dataFile := tableInfo.DataFiles[0]
log.L().Info("datafile to check", zap.String("db", tableInfo.DB),
zap.String("table", tableInfo.Name), zap.String("path", dataFile.FileMeta.Path))
// get columns name from data file.
dataFileMeta := dataFile.FileMeta

Expand All @@ -608,7 +612,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}
if colsFromDataFile == nil && colCountFromDataFile == 0 {
log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path))
continue
return msgs, nil
}

if colsFromDataFile == nil {
Expand Down Expand Up @@ -669,9 +673,6 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
tableInfo.DB, tableInfo.Name, col, col))
}
}
if len(msgs) > 0 {
return msgs, nil
}
}
return msgs, nil
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,12 +1452,12 @@ func (tr *TableRestore) restoreTable(
// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.PKIsHandle && tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(tr.tableInfo.ID, cp.AllocBase, false); err != nil {
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(cp.AllocBase, false); err != nil {
return false, err
}
} else {
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(tr.tableInfo.ID, cp.AllocBase, false); err != nil {
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(cp.AllocBase, false); err != nil {
return false, err
}
}
Expand Down
59 changes: 59 additions & 0 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,65 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
// Case 4:
// table4 has two datafiles for table. we only check the first file.
// we expect the check success.
{
[]*config.IgnoreColumns{
{
DB: "db1",
Table: "table2",
Columns: []string{"cola"},
},
},
"",
0,
true,
map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Tables: map[string]*checkpoints.TidbTableInfo{
"table2": {
ID: 1,
DB: "db1",
Name: "table2",
Core: &model.TableInfo{
Columns: []*model.ColumnInfo{
{
// colB has the default value
Name: model.NewCIStr("colB"),
DefaultIsExpr: true,
},
},
},
},
},
},
},
&mydump.MDTableMeta{
DB: "db1",
Name: "table2",
DataFiles: []mydump.FileInfo{
{
FileMeta: mydump.SourceFileMeta{
FileSize: 1 * units.TiB,
Path: case2File,
Type: mydump.SourceTypeCSV,
},
},
{
FileMeta: mydump.SourceFileMeta{
FileSize: 1 * units.TiB,
Path: case2File,
// This type will make the check failed.
// but it's the second file for table.
// so it's unreachable so this case will success.
Type: mydump.SourceTypeIgnore,
},
},
},
},
},
}

for _, ca := range cases {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) {
DB: dbInfo,
}
// Get the next AutoIncID
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID(table.Info.ID)
idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType)
globalAutoID, err := idAlloc.NextGlobalAutoID()
c.Assert(err, IsNil, Commentf("Error allocate next auto id"))
c.Assert(autoIncID, Equals, uint64(globalAutoID))
// Alter AutoIncID to the next AutoIncID + 100
Expand Down
12 changes: 0 additions & 12 deletions circle.yml

This file was deleted.

2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
if err != nil {
return errors.Trace(err)
}
err = autoRandAlloc.Rebase(tblInfo.ID, nextAutoIncID, false)
err = autoRandAlloc.Rebase(nextAutoIncID, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func getCurrentTable(d *ddl, schemaID, tableID int64) (table.Table, error) {
if err != nil {
return nil, errors.Trace(err)
}
alloc := autoid.NewAllocator(d.store, schemaID, false, autoid.RowIDAllocType)
alloc := autoid.NewAllocator(d.store, schemaID, tblInfo.ID, false, autoid.RowIDAllocType)
tbl, err := table.TableFromMeta(autoid.NewAllocators(alloc), tblInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type DDL interface {
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, tableIdent ast.Ident, ifExists bool) (err error)
AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down
97 changes: 93 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/placementpolicy"
"github.com/pingcap/tidb/util/set"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2225,7 +2226,7 @@ func checkCharsetAndCollation(cs string, co string) error {
func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error {
allocs := autoid.NewAllocatorsFromTblInfo(d.store, schemaID, tbInfo)
if alloc := allocs.Get(tp); alloc != nil {
err := alloc.Rebase(tbInfo.ID, newEnd, false)
err := alloc.Rebase(newEnd, false)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -2683,7 +2684,7 @@ func adjustNewBaseToNextGlobalID(ctx sessionctx.Context, t table.Table, tp autoi
if alloc == nil {
return newBase, nil
}
autoID, err := alloc.NextGlobalAutoID(t.Meta().ID)
autoID, err := alloc.NextGlobalAutoID()
if err != nil {
return newBase, errors.Trace(err)
}
Expand Down Expand Up @@ -6078,7 +6079,6 @@ func (d *ddl) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident, spec
}
return ErrInvalidAttributesSpec.GenWithStackByArgs(err)
}

rule.Reset(meta.ID, schema.Name.L, meta.Name.L)

job := &model.Job{
Expand Down Expand Up @@ -6126,7 +6126,6 @@ func (d *ddl) AlterTablePartitionAttributes(ctx sessionctx.Context, ident ast.Id
}
return ErrInvalidAttributesSpec.GenWithStackByArgs(sb.String(), err)
}

rule.Reset(partitionID, schema.Name.L, meta.Name.L, spec.PartitionNames[0].L)

job := &model.Job{
Expand All @@ -6146,3 +6145,93 @@ func (d *ddl) AlterTablePartitionAttributes(ctx sessionctx.Context, ident ast.Id
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func buildPolicyInfo(stmt *ast.CreatePlacementPolicyStmt) (*placementpolicy.PolicyInfo, error) {
policyInfo := &placementpolicy.PolicyInfo{}
policyInfo.Name = stmt.PolicyName
for _, opt := range stmt.PlacementOptions {
switch opt.Tp {
case ast.PlacementOptionPrimaryRegion:
policyInfo.PrimaryRegion = opt.StrValue
case ast.PlacementOptionRegions:
policyInfo.Regions = opt.StrValue
case ast.PlacementOptionFollowerCount:
policyInfo.Followers = opt.UintValue
case ast.PlacementOptionVoterCount:
policyInfo.Voters = opt.UintValue
case ast.PlacementOptionLearnerCount:
policyInfo.Learners = opt.UintValue
case ast.PlacementOptionSchedule:
policyInfo.Schedule = opt.StrValue
case ast.PlacementOptionConstraints:
policyInfo.Constraints = opt.StrValue
case ast.PlacementOptionLearnerConstraints:
policyInfo.LearnerConstraints = opt.StrValue
case ast.PlacementOptionFollowerConstraints:
policyInfo.FollowerConstraints = opt.StrValue
case ast.PlacementOptionVoterConstraints:
policyInfo.VoterConstraints = opt.StrValue
case ast.PlacementOptionLeaderConstraints:
policyInfo.LeaderConstraints = opt.StrValue
default:
return nil, errors.Trace(errors.New("unknown placement policy option"))
}
}
return policyInfo, nil
}

func (d *ddl) CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) (err error) {
policyName := stmt.PolicyName
is := d.GetInfoSchemaWithInterceptor(ctx)
// Check policy existence.
_, ok := is.PolicyByName(policyName)
if ok {
err = infoschema.ErrPlacementPolicyExists.GenWithStackByArgs(policyName)
if stmt.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}
// Auto fill the policyID when it is inserted.
policyInfo, err := buildPolicyInfo(stmt)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaName: policyInfo.Name.L,
Type: model.ActionCreatePlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policyInfo},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) (err error) {
policyName := stmt.PolicyName
is := d.GetInfoSchemaWithInterceptor(ctx)
// Check policy existence.
policy, ok := is.PolicyByName(policyName)
if !ok {
err = infoschema.ErrPlacementPolicyNotExists.GenWithStackByArgs(policyName)
if stmt.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
return err
}

job := &model.Job{
SchemaID: policy.ID,
SchemaName: policy.Name.L,
Type: model.ActionDropPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policyName},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterTableAttributes(t, job)
case model.ActionAlterTablePartitionAttributes:
ver, err = onAlterTablePartitionAttributes(t, job)
case model.ActionCreatePlacementPolicy:
ver, err = onCreatePlacementPolicy(d, t, job)
case model.ActionDropPlacementPolicy:
ver, err = onDropPlacementPolicy(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
Loading

0 comments on commit bfba2bb

Please sign in to comment.