diff --git a/ddl/ddl.go b/ddl/ddl.go index 433336fb0abac..e6cf06eb8e7cb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -357,13 +357,9 @@ type ddlCtx struct { *waitSchemaSyncedController *schemaVersionManager - // recording the running jobs. - runningJobs struct { - sync.RWMutex - ids map[int64]struct{} - } - // It holds the running DDL jobs ID. - runningJobIDs []string + + runningJobs *runningJobs + // reorgCtx is used for reorganization. reorgCtx struct { sync.RWMutex @@ -626,7 +622,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { autoidCli: opt.AutoIDClient, schemaVersionManager: newSchemaVersionManager(), waitSchemaSyncedController: newWaitSchemaSyncedController(), - runningJobIDs: make([]string, 0, jobRecordCapacity), + runningJobs: newRunningJobs(), } ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx) ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext) @@ -634,8 +630,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl { ddlCtx.mu.interceptor = &BaseInterceptor{} ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx) +<<<<<<< HEAD:ddl/ddl.go ddlCtx.runningJobs.ids = make(map[int64]struct{}) ddlCtx.waiting = atomicutil.NewBool(false) +======= +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/ddl/ddl.go d := &ddl{ ddlCtx: ddlCtx, diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 97b74a694ce01..7c010e269c257 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -465,7 +465,8 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place return nil } - if tb.Meta().TempTableType != model.TempTableNone { + tblInfo := tb.Meta() + if tblInfo.TempTableType != model.TempTableNone { return errors.Trace(dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("placement")) } @@ -476,9 +477,9 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place job := &model.Job{ SchemaID: schema.ID, - TableID: tb.Meta().ID, + TableID: tblInfo.ID, SchemaName: schema.Name.L, - TableName: tb.Meta().Name.L, + TableName: tblInfo.Name.L, Type: model.ActionAlterTablePlacement, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{placementPolicyRef}, @@ -641,6 +642,10 @@ func (d *ddl) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSc Type: model.ActionRecoverSchema, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{recoverSchemaInfo, recoverCheckFlagNone}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: recoverSchemaInfo.Name.L, + Table: model.InvolvingAll, + }}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -2633,6 +2638,11 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, return errors.Trace(fmt.Errorf("except table info")) } args = append(args, info) + jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, + model.InvolvingSchemaInfo{ + Database: dbName.L, + Table: info.Name.L, + }) } if len(args) == 0 { return nil @@ -2697,6 +2707,11 @@ func (d *ddl) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *mode Type: model.ActionCreatePlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{policy, onExist == OnExistReplace}, + // CREATE PLACEMENT does not affect any schemas or tables. + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -2757,7 +2772,18 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error variable.Off, /* tidb_super_read_only */ 0, /* totalRegions */ 0, /* startTS */ +<<<<<<< HEAD:ddl/ddl_api.go 0 /* commitTS */}, +======= + 0, /* commitTS */ + variable.On, /* tidb_ttl_job_enable */ + []kv.KeyRange{} /* flashback key_ranges */}, + // FLASHBACK CLUSTER affects all schemas and tables. + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingAll, + Table: model.InvolvingAll, + }}, +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/ddl/ddl_api.go } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -3506,10 +3532,10 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 if err != nil { return errors.Trace(err) } + tbInfo := t.Meta() var actionType model.ActionType switch tp { case autoid.AutoRandomType: - tbInfo := t.Meta() pkCol := tbInfo.GetPkColInfo() if tbInfo.AutoRandomBits == 0 || pkCol == nil { return errors.Trace(dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomRebaseNotApplicable)) @@ -3543,9 +3569,9 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, Type: actionType, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newBase, force}, @@ -3578,14 +3604,15 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint if err != nil { return errors.Trace(err) } - if t.Meta().TempTableType != model.TempTableNone { + tbInfo := t.Meta() + if tbInfo.TempTableType != model.TempTableNone { return dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits") } - if uVal == t.Meta().ShardRowIDBits { + if uVal == tbInfo.ShardRowIDBits { // Nothing need to do. return nil } - if uVal > 0 && t.Meta().HasClusteredIndex() { + if uVal > 0 && tbInfo.HasClusteredIndex() { return dbterror.ErrUnsupportedShardRowIDBits } err = verifyNoOverflowShardBits(d.sessPool, t, uVal) @@ -3595,9 +3622,9 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint job := &model.Job{ Type: model.ActionShardRowID, SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{uVal}, } @@ -3755,6 +3782,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if err != nil { return errors.Trace(err) } + tbInfo := t.Meta() if err = checkAddColumnTooManyColumns(len(t.Cols()) + 1); err != nil { return errors.Trace(err) } @@ -3766,16 +3794,16 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if col == nil { return nil } - err = CheckAfterPositionExists(t.Meta(), spec.Position) + err = CheckAfterPositionExists(tbInfo, spec.Position) if err != nil { return errors.Trace(err) } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, Type: model.ActionAddColumn, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{col, spec.Position, 0, spec.IfNotExists}, @@ -4258,6 +4286,10 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{defID, ptSchema.ID, ptMeta.ID, partName, spec.WithValidation}, CtxVars: []interface{}{[]int64{ntSchema.ID, ptSchema.ID}, []int64{ntMeta.ID, ptMeta.ID}}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + {Database: ptSchema.Name.L, Table: ptMeta.Name.L}, + {Database: ntSchema.Name.L, Table: ntMeta.Name.L}, + }, } err = d.DoDDLJob(ctx, job) @@ -5991,6 +6023,10 @@ func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{schemas[0].ID, newIdent.Name, schemas[0].Name}, CtxVars: []interface{}{[]int64{schemas[0].ID, schemas[1].ID}, []int64{tableID}}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + {Database: schemas[0].Name.L, Table: oldIdent.Name.L}, + {Database: schemas[1].Name.L, Table: newIdent.Name.L}, + }, } err = d.DoDDLJob(ctx, job) @@ -6006,6 +6042,7 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id newSchemaIDs := make([]int64, 0, len(oldIdents)) tableIDs := make([]int64, 0, len(oldIdents)) oldSchemaNames := make([]*model.CIStr, 0, len(oldIdents)) + involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(oldIdents)*2) var schemas []*model.DBInfo var tableID int64 @@ -6030,16 +6067,22 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID) newSchemaIDs = append(newSchemaIDs, schemas[1].ID) oldSchemaNames = append(oldSchemaNames, &schemas[0].Name) + involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{ + Database: schemas[0].Name.L, Table: oldIdents[i].Name.L, + }, model.InvolvingSchemaInfo{ + Database: schemas[1].Name.L, Table: newIdents[i].Name.L, + }) } job := &model.Job{ - SchemaID: schemas[1].ID, - TableID: tableIDs[0], - SchemaName: schemas[1].Name.L, - Type: model.ActionRenameTables, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames}, - CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs}, + SchemaID: schemas[1].ID, + TableID: tableIDs[0], + SchemaName: schemas[1].Name.L, + Type: model.ActionRenameTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames}, + CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs}, + InvolvingSchemaInfo: involveSchemaInfo, } err = d.DoDDLJob(ctx, job) @@ -6930,6 +6973,7 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error SessionID: ctx.GetSessionVars().ConnectionID, } uniqueTableID := make(map[int64]struct{}) + involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(stmt.TableLocks)) // Check whether the table was already locked by another. for _, tl := range stmt.TableLocks { tb := tl.Table @@ -6954,6 +6998,10 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error } uniqueTableID[t.Meta().ID] = struct{}{} lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type}) + involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{ + Database: schema.Name.L, + Table: t.Meta().Name.L, + }) } unlockTables := ctx.GetAllTableLocks() @@ -6968,6 +7016,8 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error Type: model.ActionLockTable, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{arg}, + + InvolvingSchemaInfo: involveSchemaInfo, } // AddTableLock here is avoiding this job was executed successfully but the session was killed before return. ctx.AddTableLock(lockTables) @@ -7561,6 +7611,159 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool { return false } +<<<<<<< HEAD:ddl/ddl_api.go +======= +// AddResourceGroup implements the DDL interface, creates a resource group. +func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { + groupName := stmt.ResourceGroupName + groupInfo := &model.ResourceGroupInfo{Name: groupName, ResourceGroupSettings: model.NewResourceGroupSettings()} + groupInfo, err = buildResourceGroup(groupInfo, stmt.ResourceGroupOptionList) + if err != nil { + return err + } + + if _, ok := d.GetInfoSchemaWithInterceptor(ctx).ResourceGroupByName(groupName); ok { + if stmt.IfNotExists { + err = infoschema.ErrResourceGroupExists.FastGenByArgs(groupName) + ctx.GetSessionVars().StmtCtx.AppendNote(err) + return nil + } + return infoschema.ErrResourceGroupExists.GenWithStackByArgs(groupName) + } + + if err := d.checkResourceGroupValidation(groupInfo); err != nil { + return err + } + + logutil.BgLogger().Debug("create resource group", zap.String("name", groupName.O), zap.Stringer("resource group settings", groupInfo.ResourceGroupSettings)) + groupIDs, err := d.genGlobalIDs(1) + if err != nil { + return err + } + groupInfo.ID = groupIDs[0] + + job := &model.Job{ + SchemaName: groupName.L, + Type: model.ActionCreateResourceGroup, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{groupInfo, false}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, + } + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return err +} + +func (*ddl) checkResourceGroupValidation(groupInfo *model.ResourceGroupInfo) error { + _, err := resourcegroup.NewGroupFromOptions(groupInfo.Name.L, groupInfo.ResourceGroupSettings) + return err +} + +// DropResourceGroup implements the DDL interface. +func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) (err error) { + groupName := stmt.ResourceGroupName + if groupName.L == rg.DefaultResourceGroupName { + return resourcegroup.ErrDroppingInternalResourceGroup + } + is := d.GetInfoSchemaWithInterceptor(ctx) + // Check group existence. + group, ok := is.ResourceGroupByName(groupName) + if !ok { + err = infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(groupName) + if stmt.IfExists { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + return nil + } + return err + } + + // check to see if some user has dependency on the group + checker := privilege.GetPrivilegeManager(ctx) + if checker == nil { + return errors.New("miss privilege checker") + } + user, matched := checker.MatchUserResourceGroupName(groupName.L) + if matched { + err = errors.Errorf("user [%s] depends on the resource group to drop", user) + return err + } + + job := &model.Job{ + SchemaID: group.ID, + SchemaName: group.Name.L, + Type: model.ActionDropResourceGroup, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{groupName}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, + } + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return err +} + +func buildResourceGroup(oldGroup *model.ResourceGroupInfo, options []*ast.ResourceGroupOption) (*model.ResourceGroupInfo, error) { + groupInfo := &model.ResourceGroupInfo{Name: oldGroup.Name, ID: oldGroup.ID, ResourceGroupSettings: model.NewResourceGroupSettings()} + if oldGroup.ResourceGroupSettings != nil { + *groupInfo.ResourceGroupSettings = *oldGroup.ResourceGroupSettings + } + for _, opt := range options { + err := SetDirectResourceGroupSettings(groupInfo, opt) + if err != nil { + return nil, err + } + } + groupInfo.ResourceGroupSettings.Adjust() + return groupInfo, nil +} + +// AlterResourceGroup implements the DDL interface. +func (d *ddl) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) (err error) { + groupName := stmt.ResourceGroupName + is := d.GetInfoSchemaWithInterceptor(ctx) + // Check group existence. + group, ok := is.ResourceGroupByName(groupName) + if !ok { + err := infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(groupName) + if stmt.IfExists { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + return nil + } + return err + } + newGroupInfo, err := buildResourceGroup(group, stmt.ResourceGroupOptionList) + if err != nil { + return errors.Trace(err) + } + + if err := d.checkResourceGroupValidation(newGroupInfo); err != nil { + return err + } + + logutil.BgLogger().Debug("alter resource group", zap.String("name", groupName.L), zap.Stringer("new resource group settings", newGroupInfo.ResourceGroupSettings)) + + job := &model.Job{ + SchemaID: newGroupInfo.ID, + SchemaName: newGroupInfo.Name.L, + Type: model.ActionAlterResourceGroup, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{newGroupInfo}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, + } + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return err +} + +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/ddl/ddl_api.go func (d *ddl) CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) (err error) { if checkIgnorePlacementDDL(ctx) { return nil @@ -7615,6 +7818,10 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen Type: model.ActionDropPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{policyName}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -7649,6 +7856,10 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem Type: model.ActionAlterPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newPolicyInfo}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -7789,3 +8000,155 @@ func checkTooBigFieldLengthAndTryAutoConvert(tp *types.FieldType, colName string } return nil } +<<<<<<< HEAD:ddl/ddl_api.go +======= + +func (d *ddl) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrName model.CIStr, constr *ast.Constraint) error { + schema, t, err := d.getSchemaAndTableByIdent(ctx, ti) + if err != nil { + return errors.Trace(err) + } + if constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L); constraintInfo != nil { + return infoschema.ErrCheckConstraintDupName.GenWithStackByArgs(constrName.L) + } + + // allocate the temporary constraint name for dependency-check-error-output below. + constrNames := map[string]bool{} + for _, constr := range t.Meta().Constraints { + constrNames[constr.Name.L] = true + } + setEmptyCheckConstraintName(t.Meta().Name.L, constrNames, []*ast.Constraint{constr}) + + // existedColsMap can be used to check the existence of depended. + existedColsMap := make(map[string]struct{}) + cols := t.Cols() + for _, v := range cols { + existedColsMap[v.Name.L] = struct{}{} + } + // check expression if supported + if ok, err := table.IsSupportedExpr(constr); !ok { + return err + } + + dependedColsMap := findDependentColsInExpr(constr.Expr) + dependedCols := make([]model.CIStr, 0, len(dependedColsMap)) + for k := range dependedColsMap { + if _, ok := existedColsMap[k]; !ok { + // The table constraint depended on a non-existed column. + return dbterror.ErrBadField.GenWithStackByArgs(k, "check constraint "+constr.Name+" expression") + } + dependedCols = append(dependedCols, model.NewCIStr(k)) + } + + // build constraint meta info. + tblInfo := t.Meta() + + // check auto-increment column + if table.ContainsAutoIncrementCol(dependedCols, tblInfo) { + return dbterror.ErrCheckConstraintRefersAutoIncrementColumn.GenWithStackByArgs(constr.Name) + } + // check foreign key + if err := table.HasForeignKeyRefAction(tblInfo.ForeignKeys, nil, constr, dependedCols); err != nil { + return err + } + constraintInfo, err := buildConstraintInfo(tblInfo, dependedCols, constr, model.StateNone) + if err != nil { + return errors.Trace(err) + } + // check if the expression is bool type + if err := table.IfCheckConstraintExprBoolType(constraintInfo, tblInfo); err != nil { + return err + } + job := &model.Job{ + SchemaID: schema.ID, + TableID: tblInfo.ID, + SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, + Type: model.ActionAddCheckConstraint, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{constraintInfo}, + Priority: ctx.GetSessionVars().DDLReorgPriority, + } + + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return errors.Trace(err) +} + +func (d *ddl) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrName model.CIStr) error { + is := d.infoCache.GetLatest() + schema, ok := is.SchemaByName(ti.Schema) + if !ok { + return errors.Trace(infoschema.ErrDatabaseNotExists) + } + t, err := is.TableByName(ti.Schema, ti.Name) + if err != nil { + return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)) + } + tblInfo := t.Meta() + + constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L) + if constraintInfo == nil { + return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName) + } + + job := &model.Job{ + SchemaID: schema.ID, + TableID: tblInfo.ID, + SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, + Type: model.ActionDropCheckConstraint, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{constrName}, + } + + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return errors.Trace(err) +} + +func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrName model.CIStr, enforced bool) error { + is := d.infoCache.GetLatest() + schema, ok := is.SchemaByName(ti.Schema) + if !ok { + return errors.Trace(infoschema.ErrDatabaseNotExists) + } + t, err := is.TableByName(ti.Schema, ti.Name) + if err != nil { + return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)) + } + tblInfo := t.Meta() + + constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L) + if constraintInfo == nil { + return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName) + } + + job := &model.Job{ + SchemaID: schema.ID, + TableID: tblInfo.ID, + SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, + Type: model.ActionAlterCheckConstraint, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{constrName, enforced}, + } + + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return errors.Trace(err) +} + +// NewDDLReorgMeta create a DDL ReorgMeta. +func NewDDLReorgMeta(ctx sessionctx.Context) *model.DDLReorgMeta { + tzName, tzOffset := ddlutil.GetTimeZone(ctx) + return &model.DDLReorgMeta{ + SQLMode: ctx.GetSessionVars().SQLMode, + Warnings: make(map[errors.ErrorID]*terror.Error), + WarningsCount: make(map[errors.ErrorID]int64), + Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, + ResourceGroupName: ctx.GetSessionVars().StmtCtx.ResourceGroupName, + Version: model.CurrentReorgMetaVersion, + } +} +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/ddl/ddl_api.go diff --git a/ddl/ddl_api_test.go b/ddl/ddl_api_test.go index f4010015f5456..88a366763ec2b 100644 --- a/ddl/ddl_api_test.go +++ b/ddl/ddl_api_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "context" +<<<<<<< HEAD:ddl/ddl_api_test.go "testing" "github.com/pingcap/tidb/ddl" @@ -24,6 +25,21 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/testkit" +======= + "fmt" + "slices" + "sync" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/util/callback" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + sessiontypes "github.com/pingcap/tidb/pkg/session/types" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/chunk" +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/ddl/ddl_api_test.go "github.com/stretchr/testify/require" "golang.org/x/exp/slices" ) @@ -213,3 +229,66 @@ func enQueueDDLJobs(t *testing.T, sess session.Session, txn kv.Transaction, jobT require.NoError(t, err) } } + +func TestCreateDropCreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + tk.MustExec("create table t (a int);") + + wg := sync.WaitGroup{} + var createErr error + var fpErr error + var createTable bool + + originHook := dom.DDL().GetHook() + onJobUpdated := func(job *model.Job) { + if job.Type == model.ActionDropTable && job.SchemaState == model.StateWriteOnly && !createTable { + fpErr = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockOwnerCheckAllVersionSlow", fmt.Sprintf("return(%d)", job.ID)) + wg.Add(1) + go func() { + _, createErr = tk1.Exec("create table t (b int);") + wg.Done() + }() + createTable = true + } + } + hook := &callback.TestDDLCallback{} + hook.OnJobUpdatedExported.Store(&onJobUpdated) + dom.DDL().SetHook(hook) + tk.MustExec("drop table t;") + dom.DDL().SetHook(originHook) + + wg.Wait() + require.NoError(t, createErr) + require.NoError(t, fpErr) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockOwnerCheckAllVersionSlow")) + + rs := tk.MustQuery("admin show ddl jobs 3;").Rows() + create1JobID := rs[0][0].(string) + dropJobID := rs[1][0].(string) + create0JobID := rs[2][0].(string) + jobRecordSet, err := tk.Exec("select job_meta from mysql.tidb_ddl_history where job_id in (?, ?, ?);", + create1JobID, dropJobID, create0JobID) + require.NoError(t, err) + + var finishTSs []uint64 + req := jobRecordSet.NewChunk(nil) + err = jobRecordSet.Next(context.Background(), req) + require.Greater(t, req.NumRows(), 0) + require.NoError(t, err) + iter := chunk.NewIterator4Chunk(req.CopyConstruct()) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + jobMeta := row.GetBytes(0) + job := model.Job{} + err = job.Decode(jobMeta) + require.NoError(t, err) + finishTSs = append(finishTSs, job.BinlogInfo.FinishedTS) + } + create1TS, dropTS, create0TS := finishTSs[0], finishTSs[1], finishTSs[2] + require.Less(t, create0TS, dropTS, "first create should finish before drop") + require.Less(t, dropTS, create1TS, "second create should finish after drop") +} diff --git a/ddl/ddl_running_jobs.go b/ddl/ddl_running_jobs.go new file mode 100644 index 0000000000000..95faa765bee6e --- /dev/null +++ b/ddl/ddl_running_jobs.go @@ -0,0 +1,113 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package ddl + +import ( + "strconv" + "strings" + "sync" + + "github.com/pingcap/tidb/pkg/parser/model" +) + +type runningJobs struct { + sync.RWMutex + ids map[int64]struct{} + runningSchema map[string]map[string]struct{} // database -> table -> struct{} + runningJobIDs string +} + +func newRunningJobs() *runningJobs { + return &runningJobs{ + ids: make(map[int64]struct{}), + runningSchema: make(map[string]map[string]struct{}), + } +} + +func (j *runningJobs) add(job *model.Job) { + j.Lock() + defer j.Unlock() + j.ids[job.ID] = struct{}{} + j.updateInternalRunningJobIDs() + for _, info := range job.GetInvolvingSchemaInfo() { + if _, ok := j.runningSchema[info.Database]; !ok { + j.runningSchema[info.Database] = make(map[string]struct{}) + } + j.runningSchema[info.Database][info.Table] = struct{}{} + } +} + +func (j *runningJobs) remove(job *model.Job) { + j.Lock() + defer j.Unlock() + delete(j.ids, job.ID) + j.updateInternalRunningJobIDs() + for _, info := range job.GetInvolvingSchemaInfo() { + if db, ok := j.runningSchema[info.Database]; ok { + delete(db, info.Table) + } + if len(j.runningSchema[info.Database]) == 0 { + delete(j.runningSchema, info.Database) + } + } +} + +func (j *runningJobs) checkRunnable(job *model.Job) bool { + j.RLock() + defer j.RUnlock() + for _, info := range job.GetInvolvingSchemaInfo() { + if _, ok := j.runningSchema[model.InvolvingAll]; ok { + return false + } + if info.Database == model.InvolvingNone { + continue + } + if tbls, ok := j.runningSchema[info.Database]; ok { + if _, ok := tbls[model.InvolvingAll]; ok { + return false + } + if info.Table == model.InvolvingNone { + continue + } + if _, ok := tbls[info.Table]; ok { + return false + } + } + } + return true +} + +func (j *runningJobs) allIDs() string { + j.RLock() + defer j.RUnlock() + return j.runningJobIDs +} + +func (j *runningJobs) updateInternalRunningJobIDs() { + var sb strings.Builder + i := 0 + for id := range j.ids { + sb.WriteString(strconv.Itoa(int(id))) + if i != len(j.ids)-1 { + sb.WriteString(",") + } + i++ + } + j.runningJobIDs = sb.String() +} diff --git a/ddl/ddl_running_jobs_test.go b/ddl/ddl_running_jobs_test.go new file mode 100644 index 0000000000000..7fe2f5c46a7b0 --- /dev/null +++ b/ddl/ddl_running_jobs_test.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package ddl + +import ( + "sort" + "strconv" + "strings" + "testing" + + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/stretchr/testify/require" +) + +func TestRunningJobs(t *testing.T) { + mkJob := func(id int64, schemaTableNames ...string) *model.Job { + var schemaInfos []model.InvolvingSchemaInfo + for _, schemaTableName := range schemaTableNames { + ss := strings.Split(schemaTableName, ".") + schemaInfos = append(schemaInfos, model.InvolvingSchemaInfo{ + Database: ss[0], + Table: ss[1], + }) + } + return &model.Job{ + ID: id, + InvolvingSchemaInfo: schemaInfos, + } + } + orderedAllIDs := func(ids string) string { + ss := strings.Split(ids, ",") + ssid := make([]int, len(ss)) + for i := range ss { + id, err := strconv.Atoi(ss[i]) + require.NoError(t, err) + ssid[i] = id + } + sort.Ints(ssid) + for i := range ssid { + ss[i] = strconv.Itoa(ssid[i]) + } + return strings.Join(ss, ",") + } + + j := newRunningJobs() + require.Equal(t, "", j.allIDs()) + + runnable := j.checkRunnable(mkJob(0, "db1.t1")) + require.True(t, runnable) + job1 := mkJob(1, "db1.t1", "db1.t2") + job2 := mkJob(2, "db2.t3") + j.add(job1) + j.add(job2) + require.Equal(t, "1,2", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t1")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db1.t2")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db3.t4", "db1.t1")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db3.t4", "db4.t5")) + require.True(t, runnable) + + job3 := mkJob(3, "db1.*") + j.add(job3) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t100")) + require.False(t, runnable) + + job4 := mkJob(4, "db4.") + j.add(job4) + require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db4.t100")) + require.True(t, runnable) + + job5 := mkJob(5, "*.*") + j.add(job5) + require.Equal(t, "1,2,3,4,5", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db100.t100")) + require.False(t, runnable) + + j.remove(job5) + require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db100.t100")) + require.True(t, runnable) + + j.remove(job3) + require.Equal(t, "1,2,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t100")) + require.True(t, runnable) + + j.remove(job1) + require.Equal(t, "2,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t1")) + require.True(t, runnable) +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 81751abef47b2..0b695a3594ec6 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -463,6 +463,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error, t } // Reduce this txn entry size. job.BinlogInfo.Clean() + job.InvolvingSchemaInfo = nil job.Error = toTError(err) job.ErrorCount++ job.SchemaState = model.StateNone diff --git a/ddl/mock.go b/ddl/mock.go index 05d09b522bd99..aa17e37b015bd 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -109,6 +109,12 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, jobID int6 ticker := time.NewTicker(mockCheckVersInterval) defer ticker.Stop() + failpoint.Inject("mockOwnerCheckAllVersionSlow", func(val failpoint.Value) { + if v, ok := val.(int); ok && v == int(jobID) { + time.Sleep(2 * time.Second) + } + }) + for { select { case <-ctx.Done(): diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 387fa41b0cd9d..7fd5ec6439f5c 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -465,8 +465,37 @@ type Job struct { Charset string `json:"charset"` // Collate is the collation the DDL Job is created. Collate string `json:"collate"` +<<<<<<< HEAD:parser/model/ddl.go +======= + + // InvolvingSchemaInfo indicates the schema info involved in the job. + // nil means fallback to use job.SchemaName/TableName. + // Keep unchanged after initialization. + InvolvingSchemaInfo []InvolvingSchemaInfo `json:"involving_schema_info,omitempty"` + + // AdminOperator indicates where the Admin command comes, by the TiDB + // itself (AdminCommandBySystem) or by user (AdminCommandByEndUser). + AdminOperator AdminCommandOperator `json:"admin_operator"` + + // TraceInfo indicates the information for SQL tracing + TraceInfo *TraceInfo `json:"trace_info"` +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/parser/model/ddl.go } +// InvolvingSchemaInfo returns the schema info involved in the job. +// The value should be stored in lower case. +type InvolvingSchemaInfo struct { + Database string `json:"database"` + Table string `json:"table"` +} + +const ( + // InvolvingAll means all schemas/tables are affected. + InvolvingAll = "*" + // InvolvingNone means no schema/table is affected. + InvolvingNone = "" +) + // FinishTableJob is called when a job is finished. // It updates the job's state information and adds tblInfo to the binlog. func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfo *TableInfo) { @@ -851,6 +880,16 @@ func (job *Job) IsRollbackable() bool { return true } +// GetInvolvingSchemaInfo returns the schema info involved in the job. +func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { + if len(job.InvolvingSchemaInfo) > 0 { + return job.InvolvingSchemaInfo + } + return []InvolvingSchemaInfo{ + {Database: job.SchemaName, Table: job.TableName}, + } +} + // JobState is for job state. type JobState byte diff --git a/parser/model/ddl_test.go b/parser/model/ddl_test.go index c6f47833be5d8..408f4d822fb84 100644 --- a/parser/model/ddl_test.go +++ b/parser/model/ddl_test.go @@ -49,5 +49,61 @@ func TestJobSize(t *testing.T) { - SubJob.ToProxyJob() ` job := model.Job{} +<<<<<<< HEAD:parser/model/ddl_test.go require.Equal(t, 320, int(unsafe.Sizeof(job)), msg) +======= + require.Equal(t, 360, int(unsafe.Sizeof(job)), msg) +} + +func TestBackfillMetaCodec(t *testing.T) { + jm := &model.JobMeta{ + SchemaID: 1, + TableID: 2, + Query: "alter table t add index idx(a)", + Priority: 1, + } + bm := &model.BackfillMeta{ + EndInclude: true, + Error: terror.ErrResultUndetermined, + JobMeta: jm, + } + bmBytes, err := bm.Encode() + require.NoError(t, err) + bmRet := &model.BackfillMeta{} + bmRet.Decode(bmBytes) + require.Equal(t, bm, bmRet) +} + +func TestMayNeedReorg(t *testing.T) { + //TODO(bb7133): add more test cases for different ActionType. + reorgJobTypes := []model.ActionType{ + model.ActionReorganizePartition, + model.ActionRemovePartitioning, + model.ActionAlterTablePartitioning, + model.ActionAddIndex, + model.ActionAddPrimaryKey, + } + generalJobTypes := []model.ActionType{ + model.ActionCreateTable, + model.ActionDropTable, + } + job := &model.Job{ + ID: 100, + Type: model.ActionCreateTable, + SchemaID: 101, + TableID: 102, + SchemaName: "test", + TableName: "t", + State: model.JobStateDone, + MultiSchemaInfo: nil, + } + for _, jobType := range reorgJobTypes { + job.Type = jobType + require.True(t, job.MayNeedReorg()) + } + for _, jobType := range generalJobTypes { + job.Type = jobType + require.False(t, job.MayNeedReorg()) + } +>>>>>>> 2dfbaa8264f (ddl: set jobs dependency by schema and table name (#49699)):pkg/parser/model/ddl_test.go } diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel new file mode 100644 index 0000000000000..0aa8b3638917f --- /dev/null +++ b/pkg/ddl/BUILD.bazel @@ -0,0 +1,327 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +package_group( + name = "ddl_friend", + packages = [ + "-//pkg/planner/...", + "//...", + ], +) + +go_library( + name = "ddl", + srcs = [ + "backfilling.go", + "backfilling_clean_s3.go", + "backfilling_dist_executor.go", + "backfilling_dist_scheduler.go", + "backfilling_import_cloud.go", + "backfilling_merge_sort.go", + "backfilling_operators.go", + "backfilling_proto.go", + "backfilling_read_index.go", + "backfilling_scheduler.go", + "callback.go", + "cluster.go", + "column.go", + "constant.go", + "constraint.go", + "ddl.go", + "ddl_algorithm.go", + "ddl_api.go", + "ddl_running_jobs.go", + "ddl_tiflash_api.go", + "ddl_worker.go", + "ddl_workerpool.go", + "delete_range.go", + "delete_range_util.go", + "dist_owner.go", + "foreign_key.go", + "generated_column.go", + "index.go", + "index_cop.go", + "index_merge_tmp.go", + "job_table.go", + "mock.go", + "multi_schema_change.go", + "options.go", + "partition.go", + "placement_policy.go", + "reorg.go", + "resource_group.go", + "rollingback.go", + "sanity_check.go", + "schema.go", + "sequence.go", + "split_region.go", + "stat.go", + "table.go", + "table_lock.go", + "ttl.go", + ], + importpath = "github.com/pingcap/tidb/pkg/ddl", + visibility = [ + ":ddl_friend", + ], + deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/external", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/storage", + "//pkg/config", + "//pkg/ddl/copr", + "//pkg/ddl/ingest", + "//pkg/ddl/internal/session", + "//pkg/ddl/label", + "//pkg/ddl/placement", + "//pkg/ddl/resourcegroup", + "//pkg/ddl/syncer", + "//pkg/ddl/util", + "//pkg/distsql", + "//pkg/disttask/framework/handle", + "//pkg/disttask/framework/proto", + "//pkg/disttask/framework/scheduler", + "//pkg/disttask/framework/storage", + "//pkg/disttask/framework/taskexecutor", + "//pkg/disttask/framework/taskexecutor/execute", + "//pkg/disttask/operator", + "//pkg/domain/infosync", + "//pkg/domain/resourcegroup", + "//pkg/expression", + "//pkg/infoschema", + "//pkg/kv", + "//pkg/meta", + "//pkg/meta/autoid", + "//pkg/metrics", + "//pkg/owner", + "//pkg/parser", + "//pkg/parser/ast", + "//pkg/parser/charset", + "//pkg/parser/format", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/opcode", + "//pkg/parser/terror", + "//pkg/parser/types", + "//pkg/privilege", + "//pkg/resourcemanager/pool/workerpool", + "//pkg/resourcemanager/util", + "//pkg/sessionctx", + "//pkg/sessionctx/binloginfo", + "//pkg/sessionctx/stmtctx", + "//pkg/sessionctx/variable", + "//pkg/sessiontxn", + "//pkg/statistics", + "//pkg/statistics/handle", + "//pkg/statistics/handle/util", + "//pkg/store/copr", + "//pkg/store/driver/backoff", + "//pkg/store/helper", + "//pkg/table", + "//pkg/table/tables", + "//pkg/tablecodec", + "//pkg/tidb-binlog/pump_client", + "//pkg/types", + "//pkg/types/parser_driver", + "//pkg/util", + "//pkg/util/backoff", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/cpu", + "//pkg/util/dbterror", + "//pkg/util/dbterror/exeerrors", + "//pkg/util/domainutil", + "//pkg/util/filter", + "//pkg/util/gcutil", + "//pkg/util/hack", + "//pkg/util/intest", + "//pkg/util/logutil", + "//pkg/util/mathutil", + "//pkg/util/memory", + "//pkg/util/mock", + "//pkg/util/ranger", + "//pkg/util/resourcegrouptag", + "//pkg/util/rowDecoder", + "//pkg/util/rowcodec", + "//pkg/util/set", + "//pkg/util/size", + "//pkg/util/slice", + "//pkg/util/sqlexec", + "//pkg/util/sqlkiller", + "//pkg/util/stringutil", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "//pkg/util/topsql", + "//pkg/util/topsql/state", + "@com_github_google_uuid//:uuid", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//txnkv/rangetask", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//http", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_x_sync//errgroup", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "ddl_test", + timeout = "moderate", + srcs = [ + "attributes_sql_test.go", + "backfilling_dist_scheduler_test.go", + "backfilling_test.go", + "bench_test.go", + "cancel_test.go", + "cluster_test.go", + "column_change_test.go", + "column_modify_test.go", + "column_test.go", + "column_type_change_test.go", + "constraint_test.go", + "db_cache_test.go", + "db_change_failpoints_test.go", + "db_change_test.go", + "db_integration_test.go", + "db_rename_test.go", + "db_table_test.go", + "db_test.go", + "ddl_algorithm_test.go", + "ddl_api_test.go", + "ddl_error_test.go", + "ddl_running_jobs_test.go", + "ddl_test.go", + "ddl_worker_test.go", + "ddl_workerpool_test.go", + "export_test.go", + "fail_test.go", + "foreign_key_test.go", + "index_change_test.go", + "index_cop_test.go", + "index_modify_test.go", + "integration_test.go", + "job_table_test.go", + "main_test.go", + "modify_column_test.go", + "multi_schema_change_test.go", + "mv_index_test.go", + "options_test.go", + "partition_test.go", + "placement_policy_ddl_test.go", + "placement_policy_test.go", + "placement_sql_test.go", + "primary_key_handle_test.go", + "reorg_partition_test.go", + "repair_table_test.go", + "restart_test.go", + "rollingback_test.go", + "schema_test.go", + "sequence_test.go", + "stat_test.go", + "table_modify_test.go", + "table_split_test.go", + "table_test.go", + "tiflash_replica_test.go", + "ttl_test.go", + ], + embed = [":ddl"], + flaky = True, + shard_count = 50, + deps = [ + "//br/pkg/lightning/backend/external", + "//pkg/autoid_service", + "//pkg/config", + "//pkg/ddl/copr", + "//pkg/ddl/ingest", + "//pkg/ddl/internal/session", + "//pkg/ddl/placement", + "//pkg/ddl/schematracker", + "//pkg/ddl/syncer", + "//pkg/ddl/testutil", + "//pkg/ddl/util", + "//pkg/ddl/util/callback", + "//pkg/disttask/framework/proto", + "//pkg/disttask/framework/scheduler", + "//pkg/disttask/framework/storage", + "//pkg/domain", + "//pkg/domain/infosync", + "//pkg/errno", + "//pkg/executor", + "//pkg/infoschema", + "//pkg/keyspace", + "//pkg/kv", + "//pkg/meta", + "//pkg/meta/autoid", + "//pkg/parser", + "//pkg/parser/ast", + "//pkg/parser/auth", + "//pkg/parser/charset", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/parser/types", + "//pkg/planner/core", + "//pkg/server", + "//pkg/session", + "//pkg/session/types", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/sessionctx/variable", + "//pkg/sessiontxn", + "//pkg/store/gcworker", + "//pkg/store/helper", + "//pkg/store/mockstore", + "//pkg/table", + "//pkg/table/tables", + "//pkg/tablecodec", + "//pkg/testkit", + "//pkg/testkit/external", + "//pkg/testkit/testsetup", + "//pkg/testkit/testutil", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/dbterror", + "//pkg/util/domainutil", + "//pkg/util/gcutil", + "//pkg/util/logutil", + "//pkg/util/mathutil", + "//pkg/util/mock", + "//pkg/util/sem", + "//pkg/util/sqlexec", + "//pkg/util/timeutil", + "@com_github_docker_go_units//:go-units", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//util", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_google_grpc//:grpc", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", + ], +) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go new file mode 100644 index 0000000000000..2a90b31b41a98 --- /dev/null +++ b/pkg/ddl/job_table.go @@ -0,0 +1,699 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "slices" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/pkg/ddl/ingest" + sess "github.com/pingcap/tidb/pkg/ddl/internal/session" + "github.com/pingcap/tidb/pkg/ddl/syncer" + "github.com/pingcap/tidb/pkg/ddl/util" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/autoid" + "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/owner" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/table" + tidb_util "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +var ( + addingDDLJobConcurrent = "/tidb/ddl/add_ddl_job_general" + dispatchLoopWaitingDuration = 1 * time.Second +) + +func init() { + // In test the wait duration can be reduced to make test case run faster + if intest.InTest { + dispatchLoopWaitingDuration = 50 * time.Millisecond + } +} + +type jobType int + +func (t jobType) String() string { + switch t { + case general: + return "general" + case reorg: + return "reorg" + } + return "unknown job type: " + strconv.Itoa(int(t)) +} + +const ( + general jobType = iota + reorg +) + +func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) { + not := "not" + label := "get_job_general" + if tp == reorg { + not = "" + label = "get_job_reorg" + } + const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in + (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) + and %s reorg %s order by processing desc, job_id` + var excludedJobIDs string + if ids := d.runningJobs.allIDs(); len(ids) > 0 { + excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) + } + sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) + rows, err := se.Execute(context.Background(), sql, label) + if err != nil { + return nil, errors.Trace(err) + } + for _, row := range rows { + jobBinary := row.GetBytes(0) + isJobProcessing := row.GetInt64(1) == 1 + + job := model.Job{} + err = job.Decode(jobBinary) + if err != nil { + return nil, errors.Trace(err) + } + + isRunnable, err := d.processJobDuringUpgrade(se, &job) + if err != nil { + return nil, errors.Trace(err) + } + if !isRunnable { + continue + } + + // The job has already been picked up, just return to continue it. + if isJobProcessing { + return &job, nil + } + + b, err := filter(&job) + if err != nil { + return nil, errors.Trace(err) + } + if b { + if err = d.markJobProcessing(se, &job); err != nil { + logutil.BgLogger().Warn( + "[ddl] handle ddl job failed: mark job is processing meet error", + zap.Error(err), + zap.String("job", job.String())) + return nil, errors.Trace(err) + } + return &job, nil + } + } + return nil, nil +} + +func hasSysDB(job *model.Job) bool { + for _, info := range job.GetInvolvingSchemaInfo() { + if tidb_util.IsSysDB(info.Database) { + return true + } + } + return false +} + +func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRunnable bool, err error) { + if d.stateSyncer.IsUpgradingState() { + if job.IsPaused() { + return false, nil + } + // We need to turn the 'pausing' job to be 'paused' in ddl worker, + // and stop the reorganization workers + if job.IsPausing() || hasSysDB(job) { + return true, nil + } + var errs []error + // During binary upgrade, pause all running DDL jobs + errs, err = PauseJobsBySystem(sess.Session(), []int64{job.ID}) + if len(errs) > 0 && errs[0] != nil { + err = errs[0] + } + + if err != nil { + isCannotPauseDDLJobErr := dbterror.ErrCannotPauseDDLJob.Equal(err) + logutil.BgLogger().Warn("pause the job failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), + zap.Bool("isRunnable", isCannotPauseDDLJobErr), zap.Error(err)) + if isCannotPauseDDLJobErr { + return true, nil + } + } else { + logutil.BgLogger().Warn("pause the job successfully", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job)) + } + + return false, nil + } + + if job.IsPausedBySystem() { + var errs []error + errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID}) + if len(errs) > 0 && errs[0] != nil { + logutil.BgLogger().Warn("normal cluster state, resume the job failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(errs[0])) + return false, errs[0] + } + if err != nil { + logutil.BgLogger().Warn("normal cluster state, resume the job failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err)) + return false, err + } + logutil.BgLogger().Warn("normal cluster state, resume the job successfully", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job)) + return false, errors.Errorf("system paused job:%d need to be resumed", job.ID) + } + + if job.IsPaused() { + return false, nil + } + + return true, nil +} + +func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) { + return d.getJob(sess, general, func(job *model.Job) (bool, error) { + if !d.runningJobs.checkRunnable(job) { + return false, nil + } + if job.Type == model.ActionDropSchema { + // Check if there is any reorg job on this schema. + sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10))) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err + } + // Check if there is any running job works on the same table. + sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+ + "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+ + "or (type = %d and processing)", job.ID, model.ActionFlashbackCluster) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err + }) +} + +func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { + return d.getJob(sess, reorg, func(job *model.Job) (bool, error) { + if !d.runningJobs.checkRunnable(job) { + return false, nil + } + if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && + job.ReorgMeta != nil && + job.ReorgMeta.IsFastReorg && + ingest.LitBackCtxMgr != nil { + succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID) + if !succeed { + // We only allow one task to use ingest at the same time in order to limit the CPU/memory usage. + return false, nil + } + } + // Check if there is any block ddl running, like drop schema and flashback cluster. + sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+ + "(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+ + "or (CONCAT(',', table_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+ + "or (type = %d and processing) limit 1", + strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err + }) +} + +func (d *ddl) startDispatchLoop() { + sessCtx, err := d.sessPool.Get() + if err != nil { + logutil.BgLogger().Fatal("dispatch loop get session failed, it should not happen, please try restart TiDB", zap.Error(err)) + } + defer d.sessPool.Put(sessCtx) + se := sess.NewSession(sessCtx) + var notifyDDLJobByEtcdCh clientv3.WatchChan + if d.etcdCli != nil { + notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent) + } + if err := d.checkAndUpdateClusterState(true); err != nil { + logutil.BgLogger().Fatal("dispatch loop get cluster state failed, it should not happen, please try restart TiDB", zap.Error(err)) + } + ticker := time.NewTicker(dispatchLoopWaitingDuration) + defer ticker.Stop() + isOnce := false + for { + if isChanClosed(d.ctx.Done()) { + return + } + if !d.isOwner() { + isOnce = true + d.onceMap = make(map[int64]struct{}, jobOnceCapacity) + time.Sleep(dispatchLoopWaitingDuration) + continue + } + select { + case <-d.ddlJobCh: + case <-ticker.C: + case _, ok := <-notifyDDLJobByEtcdCh: + if !ok { + logutil.BgLogger().Warn("start worker watch channel closed", zap.String("category", "ddl"), zap.String("watch key", addingDDLJobConcurrent)) + notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent) + time.Sleep(time.Second) + continue + } + case <-d.ctx.Done(): + return + } + if err := d.checkAndUpdateClusterState(isOnce); err != nil { + continue + } + isOnce = false + d.loadDDLJobAndRun(se, d.generalDDLWorkerPool, d.getGeneralJob) + d.loadDDLJobAndRun(se, d.reorgWorkerPool, d.getReorgJob) + } +} + +func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error { + select { + case _, ok := <-d.stateSyncer.WatchChan(): + if !ok { + d.stateSyncer.Rewatch(d.ctx) + } + default: + if !needUpdate { + return nil + } + } + + oldState := d.stateSyncer.IsUpgradingState() + stateInfo, err := d.stateSyncer.GetGlobalState(d.ctx) + if err != nil { + logutil.BgLogger().Warn("get global state failed", zap.String("category", "ddl"), zap.Error(err)) + return errors.Trace(err) + } + logutil.BgLogger().Info("get global state and global state change", zap.String("category", "ddl"), + zap.Bool("oldState", oldState), zap.Bool("currState", d.stateSyncer.IsUpgradingState())) + if !d.isOwner() { + return nil + } + + ownerOp := owner.OpNone + if stateInfo.State == syncer.StateUpgrading { + ownerOp = owner.OpSyncUpgradingState + } + err = d.ownerManager.SetOwnerOpValue(d.ctx, ownerOp) + if err != nil { + logutil.BgLogger().Warn("the owner sets global state to owner operator value failed", zap.String("category", "ddl"), zap.Error(err)) + return errors.Trace(err) + } + logutil.BgLogger().Info("the owner sets owner operator value", zap.String("category", "ddl"), zap.Stringer("ownerOp", ownerOp)) + return nil +} + +func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(*sess.Session) (*model.Job, error)) { + wk, err := pool.get() + if err != nil || wk == nil { + logutil.BgLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err)) + return + } + + d.mu.RLock() + d.mu.hook.OnGetJobBefore(pool.tp().String()) + d.mu.RUnlock() + + job, err := getJob(se) + if job == nil || err != nil { + if err != nil { + logutil.BgLogger().Warn("get job met error", zap.String("category", "ddl"), zap.Error(err)) + } + pool.put(wk) + return + } + d.mu.RLock() + d.mu.hook.OnGetJobAfter(pool.tp().String(), job) + d.mu.RUnlock() + + d.delivery2worker(wk, pool, job) +} + +// delivery2worker owns the worker, need to put it back to the pool in this function. +func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { + injectFailPointForGetJob(job) + d.runningJobs.add(job) + d.wg.Run(func() { + metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() + defer func() { + d.runningJobs.remove(job) + asyncNotify(d.ddlJobCh) + metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() + }() + // check if this ddl job is synced to all servers. + if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { + if variable.EnableMDL.Load() { + exist, version, err := checkMDLInfo(job.ID, d.sessPool) + if err != nil { + logutil.BgLogger().Warn("check MDL info failed", zap.String("category", "ddl"), zap.Error(err), zap.String("job", job.String())) + // Release the worker resource. + pool.put(wk) + return + } else if exist { + // Release the worker resource. + pool.put(wk) + err = waitSchemaSyncedForMDL(d.ddlCtx, job, version) + if err != nil { + return + } + d.setAlreadyRunOnce(job.ID) + cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) + // Don't have a worker now. + return + } + } else { + err := waitSchemaSynced(d.ddlCtx, job, 2*d.lease) + if err != nil { + logutil.BgLogger().Warn("wait ddl job sync failed", zap.String("category", "ddl"), zap.Error(err), zap.String("job", job.String())) + time.Sleep(time.Second) + // Release the worker resource. + pool.put(wk) + return + } + d.setAlreadyRunOnce(job.ID) + } + } + + schemaVer, err := wk.HandleDDLJobTable(d.ddlCtx, job) + pool.put(wk) + if err != nil { + logutil.BgLogger().Info("handle ddl job failed", zap.String("category", "ddl"), zap.Error(err), zap.String("job", job.String())) + } else { + failpoint.Inject("mockDownBeforeUpdateGlobalVersion", func(val failpoint.Value) { + if val.(bool) { + if mockDDLErrOnce == 0 { + mockDDLErrOnce = schemaVer + failpoint.Return() + } + } + }) + + // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. + // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update + // the newest schema. + err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + if err != nil { + // May be caused by server closing, shouldn't clean the MDL info. + logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err)) + return + } + cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) + d.synced(job) + + if RunInGoTest { + // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. + d.mu.RLock() + d.mu.hook.OnSchemaStateChanged(schemaVer) + d.mu.RUnlock() + } + + d.mu.RLock() + d.mu.hook.OnJobUpdated(job) + d.mu.RUnlock() + } + }) +} + +func (*ddl) markJobProcessing(se *sess.Session, job *model.Job) error { + se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + _, err := se.Execute(context.Background(), fmt.Sprintf( + "update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID), + "mark_job_processing") + return errors.Trace(err) +} + +func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { + var tbl table.Table + var dbInfo *model.DBInfo + err := kv.RunInNewTxn(d.ctx, r.Store(), false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + dbInfo, err1 = t.GetDatabase(schemaID) + if err1 != nil { + return errors.Trace(err1) + } + tblInfo, err1 := getTableInfo(t, tableID, schemaID) + if err1 != nil { + return errors.Trace(err1) + } + tbl, err1 = getTable(r, schemaID, tblInfo) + return errors.Trace(err1) + }) + return dbInfo, tbl, err +} + +const ( + addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values" + updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d" +) + +func insertDDLJobs2Table(se *sess.Session, updateRawArgs bool, jobs ...*model.Job) error { + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) + if len(jobs) == 0 { + return nil + } + var sql bytes.Buffer + sql.WriteString(addDDLJobSQL) + for i, job := range jobs { + b, err := job.Encode(updateRawArgs) + if err != nil { + return err + } + if i != 0 { + sql.WriteString(",") + } + fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", job.ID, job.MayNeedReorg(), strconv.Quote(job2SchemaIDs(job)), strconv.Quote(job2TableIDs(job)), util.WrapKey2String(b), job.Type, !job.NotStarted()) + } + se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + _, err := se.Execute(ctx, sql.String(), "insert_job") + logutil.BgLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("category", "ddl"), zap.String("sql", sql.String())) + return errors.Trace(err) +} + +func job2SchemaIDs(job *model.Job) string { + return job2UniqueIDs(job, true) +} + +func job2TableIDs(job *model.Job) string { + return job2UniqueIDs(job, false) +} + +func job2UniqueIDs(job *model.Job, schema bool) string { + switch job.Type { + case model.ActionExchangeTablePartition, model.ActionRenameTables, model.ActionRenameTable: + var ids []int64 + if schema { + ids = job.CtxVars[0].([]int64) + } else { + ids = job.CtxVars[1].([]int64) + } + set := make(map[int64]struct{}, len(ids)) + for _, id := range ids { + set[id] = struct{}{} + } + + s := make([]string, 0, len(set)) + for id := range set { + s = append(s, strconv.FormatInt(id, 10)) + } + slices.Sort(s) + return strings.Join(s, ",") + case model.ActionTruncateTable: + return strconv.FormatInt(job.TableID, 10) + "," + strconv.FormatInt(job.Args[0].(int64), 10) + } + if schema { + return strconv.FormatInt(job.SchemaID, 10) + } + return strconv.FormatInt(job.TableID, 10) +} + +func (w *worker) deleteDDLJob(job *model.Job) error { + sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %d", job.ID) + _, err := w.sess.Execute(context.Background(), sql, "delete_job") + return errors.Trace(err) +} + +func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) error { + b, err := job.Encode(updateRawArgs) + if err != nil { + return err + } + sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID) + _, err = se.Execute(context.Background(), sql, "update_job") + return errors.Trace(err) +} + +// getDDLReorgHandle gets DDL reorg handle. +func getDDLReorgHandle(se *sess.Session, job *model.Job) (element *meta.Element, + startKey, endKey kv.Key, physicalTableID int64, err error) { + sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id, reorg_meta from mysql.tidb_ddl_reorg where job_id = %d", job.ID) + ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job.Type)) + rows, err := se.Execute(ctx, sql, "get_handle") + if err != nil { + return nil, nil, nil, 0, err + } + if len(rows) == 0 { + return nil, nil, nil, 0, meta.ErrDDLReorgElementNotExist + } + id := rows[0].GetInt64(0) + tp := rows[0].GetBytes(1) + element = &meta.Element{ + ID: id, + TypeKey: tp, + } + startKey = rows[0].GetBytes(2) + endKey = rows[0].GetBytes(3) + physicalTableID = rows[0].GetInt64(4) + return +} + +func getCheckpointReorgHandle(se *sess.Session, job *model.Job) (startKey, endKey kv.Key, physicalTableID int64, err error) { + startKey, endKey = kv.Key{}, kv.Key{} + sql := fmt.Sprintf("select reorg_meta from mysql.tidb_ddl_reorg where job_id = %d", job.ID) + ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job.Type)) + rows, err := se.Execute(ctx, sql, "get_handle") + if err != nil { + return nil, nil, 0, err + } + if len(rows) == 0 { + return nil, nil, 0, meta.ErrDDLReorgElementNotExist + } + if !rows[0].IsNull(0) { + rawReorgMeta := rows[0].GetBytes(0) + var reorgMeta ingest.JobReorgMeta + err = json.Unmarshal(rawReorgMeta, &reorgMeta) + if err != nil { + return nil, nil, 0, errors.Trace(err) + } + if cp := reorgMeta.Checkpoint; cp != nil { + logutil.BgLogger().Info("resume physical table ID from checkpoint", zap.String("category", "ddl-ingest"), + zap.Int64("jobID", job.ID), + zap.String("start", hex.EncodeToString(cp.StartKey)), + zap.String("end", hex.EncodeToString(cp.EndKey)), + zap.Int64("checkpoint physical ID", cp.PhysicalID)) + physicalTableID = cp.PhysicalID + if len(cp.StartKey) > 0 { + startKey = cp.StartKey + } + if len(cp.EndKey) > 0 { + endKey = cp.EndKey + } + } + } + return +} + +// updateDDLReorgHandle update startKey, endKey physicalTableID and element of the handle. +// Caller should wrap this in a separate transaction, to avoid conflicts. +func updateDDLReorgHandle(se *sess.Session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error { + sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s, end_key = %s, physical_id = %d where job_id = %d", + element.ID, util.WrapKey2String(element.TypeKey), util.WrapKey2String(startKey), util.WrapKey2String(endKey), physicalTableID, jobID) + _, err := se.Execute(context.Background(), sql, "update_handle") + return err +} + +// initDDLReorgHandle initializes the handle for ddl reorg. +func initDDLReorgHandle(s *sess.Session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error { + rawReorgMeta, err := json.Marshal(ingest.JobReorgMeta{ + Checkpoint: &ingest.ReorgCheckpoint{ + PhysicalID: physicalTableID, + StartKey: startKey, + EndKey: endKey, + Version: ingest.JobCheckpointVersionCurrent, + }}) + if err != nil { + return errors.Trace(err) + } + del := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", jobID) + ins := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id, reorg_meta) values (%d, %d, %s, %s, %s, %d, %s)", + jobID, element.ID, util.WrapKey2String(element.TypeKey), util.WrapKey2String(startKey), util.WrapKey2String(endKey), physicalTableID, util.WrapKey2String(rawReorgMeta)) + return s.RunInTxn(func(se *sess.Session) error { + _, err := se.Execute(context.Background(), del, "init_handle") + if err != nil { + logutil.BgLogger().Info("initDDLReorgHandle failed to delete", zap.Int64("jobID", jobID), zap.Error(err)) + } + _, err = se.Execute(context.Background(), ins, "init_handle") + return err + }) +} + +// deleteDDLReorgHandle deletes the handle for ddl reorg. +func removeDDLReorgHandle(se *sess.Session, job *model.Job, elements []*meta.Element) error { + if len(elements) == 0 { + return nil + } + sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID) + return se.RunInTxn(func(se *sess.Session) error { + _, err := se.Execute(context.Background(), sql, "remove_handle") + return err + }) +} + +// removeReorgElement removes the element from ddl reorg, it is the same with removeDDLReorgHandle, only used in failpoint +func removeReorgElement(se *sess.Session, job *model.Job) error { + sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID) + return se.RunInTxn(func(se *sess.Session) error { + _, err := se.Execute(context.Background(), sql, "remove_handle") + return err + }) +} + +// cleanDDLReorgHandles removes handles that are no longer needed. +func cleanDDLReorgHandles(se *sess.Session, job *model.Job) error { + sql := "delete from mysql.tidb_ddl_reorg where job_id = " + strconv.FormatInt(job.ID, 10) + return se.RunInTxn(func(se *sess.Session) error { + _, err := se.Execute(context.Background(), sql, "clean_handle") + return err + }) +} + +func getJobsBySQL(se *sess.Session, tbl, condition string) ([]*model.Job, error) { + rows, err := se.Execute(context.Background(), fmt.Sprintf("select job_meta from mysql.%s where %s", tbl, condition), "get_job") + if err != nil { + return nil, errors.Trace(err) + } + jobs := make([]*model.Job, 0, 16) + for _, row := range rows { + jobBinary := row.GetBytes(0) + job := model.Job{} + err := job.Decode(jobBinary) + if err != nil { + return nil, errors.Trace(err) + } + jobs = append(jobs, &job) + } + return jobs, nil +}