From e87d3242ed6b60e1c7fe37022d0df11847fdf9a6 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 16 Apr 2019 14:01:13 +0800 Subject: [PATCH 1/4] ddl: ddl-owner try to use memory infoSchema to check first --- ddl/ddl.go | 10 +++++----- ddl/ddl_worker.go | 2 +- ddl/table.go | 31 ++++++++++++++++++++++++------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 5729a75584860..643cec0a52e77 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -279,9 +279,8 @@ type DDL interface { // ddl is used to handle the statements that define the structure or schema of the database. type ddl struct { - m sync.RWMutex - infoHandle *infoschema.Handle - quitCh chan struct{} + m sync.RWMutex + quitCh chan struct{} *ddlCtx workers map[workerType]*worker @@ -299,6 +298,7 @@ type ddlCtx struct { ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog. + infoHandle *infoschema.Handle // hook may be modified. mu struct { @@ -379,12 +379,12 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, ownerManager: manager, schemaSyncer: syncer, binlogCli: binloginfo.GetPumpsClient(), + infoHandle: infoHandle, } ddlCtx.mu.hook = hook ddlCtx.mu.interceptor = &BaseInterceptor{} d := &ddl{ - infoHandle: infoHandle, - ddlCtx: ddlCtx, + ddlCtx: ddlCtx, } d.start(ctx, ctxPool) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 62ed299cc45b1..9b0a8658d18ff 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -523,7 +523,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, case model.ActionRebaseAutoID: ver, err = onRebaseAutoID(d.store, t, job) case model.ActionRenameTable: - ver, err = onRenameTable(t, job) + ver, err = onRenameTable(d, t, job) case model.ActionShardRowID: ver, err = w.onShardRowID(d, t, job) case model.ActionModifyTableComment: diff --git a/ddl/table.go b/ddl/table.go index 7cb63bf6e6915..8f1d3dfa35796 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -50,7 +50,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } tbInfo.State = model.StateNone - err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, job, schemaID, tbInfo.Name.L) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -94,7 +94,7 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, job, schemaID, tbInfo.Name.L) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || !orReplace { job.State = model.JobStateCancelled @@ -191,7 +191,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - err = checkTableNotExists(t, job, schemaID, tblInfo.Name.L) + err = checkTableNotExists(d, t, job, schemaID, tblInfo.Name.L) if err != nil { return ver, errors.Trace(err) } @@ -525,7 +525,7 @@ func verifyNoOverflowShardBits(s *sessionPool, tbl table.Table, shardRowIDBits u return nil } -func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { +func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { var oldSchemaID int64 var tableName model.CIStr if err := job.DecodeArgs(&oldSchemaID, &tableName); err != nil { @@ -539,7 +539,7 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } newSchemaID := job.SchemaID - err = checkTableNotExists(t, job, newSchemaID, tableName.L) + err = checkTableNotExists(d, t, job, newSchemaID, tableName.L) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -658,8 +658,25 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ return ver, nil } -func checkTableNotExists(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { - // Check this table's database. +func checkTableNotExists(d *ddlCtx, t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err + } + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + // Check this table's database. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + if is.TableExists(schema.Name, model.NewCIStr(tableName)) { + return infoschema.ErrTableExists.GenWithStackByArgs(tableName) + } + return nil + } + // Load schema info from store. tables, err := t.ListTables(schemaID) if err != nil { if meta.ErrDBNotExists.Equal(err) { From c19cb00449d9e7e5aaf21566cfdd66d16e7b25c4 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 16 Apr 2019 15:59:34 +0800 Subject: [PATCH 2/4] fix test --- ddl/table.go | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/ddl/table.go b/ddl/table.go index 8f1d3dfa35796..1567260df8fd0 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -659,24 +659,32 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ } func checkTableNotExists(d *ddlCtx, t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { - // Try to use memory schema info to check first. - currVer, err := t.GetSchemaVersion() - if err != nil { - return err - } - is := d.infoHandle.Get() - if is.SchemaMetaVersion() == currVer { - // Check this table's database. - schema, ok := is.SchemaByID(schemaID) - if !ok { - return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + // d.infoHandle maybe nil in some test. + if d.infoHandle != nil { + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err } - if is.TableExists(schema.Name, model.NewCIStr(tableName)) { - return infoschema.ErrTableExists.GenWithStackByArgs(tableName) + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + // Check this table's database. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + if is.TableExists(schema.Name, model.NewCIStr(tableName)) { + return infoschema.ErrTableExists.GenWithStackByArgs(tableName) + } + return nil } - return nil } - // Load schema info from store. + + return checkTableNotExistsFromStore(t, job, schemaID, tableName) +} + +func checkTableNotExistsFromStore(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { + // Check this table's database. tables, err := t.ListTables(schemaID) if err != nil { if meta.ErrDBNotExists.Equal(err) { From 23bea36ba6aa4e61100485d3ab78bc684f50c4cd Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 17 Apr 2019 10:53:40 +0800 Subject: [PATCH 3/4] address comment --- ddl/table.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/ddl/table.go b/ddl/table.go index 1567260df8fd0..ece7bf9052bdb 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -660,24 +660,25 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ func checkTableNotExists(d *ddlCtx, t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { // d.infoHandle maybe nil in some test. - if d.infoHandle != nil { - // Try to use memory schema info to check first. - currVer, err := t.GetSchemaVersion() - if err != nil { - return err + if d.infoHandle == nil { + return checkTableNotExistsFromStore(t, job, schemaID, tableName) + } + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err + } + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + // Check this table's database. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") } - is := d.infoHandle.Get() - if is.SchemaMetaVersion() == currVer { - // Check this table's database. - schema, ok := is.SchemaByID(schemaID) - if !ok { - return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") - } - if is.TableExists(schema.Name, model.NewCIStr(tableName)) { - return infoschema.ErrTableExists.GenWithStackByArgs(tableName) - } - return nil + if is.TableExists(schema.Name, model.NewCIStr(tableName)) { + return infoschema.ErrTableExists.GenWithStackByArgs(tableName) } + return nil } return checkTableNotExistsFromStore(t, job, schemaID, tableName) From 3a7b531deff7d9af7a075edbec283eca20a67746 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 17 Apr 2019 13:45:44 +0800 Subject: [PATCH 4/4] refine code and fix cancel job with no check err type --- ddl/ddl_worker.go | 2 +- ddl/schema.go | 65 +++++++++++++++++++++++++++++++++++++---------- ddl/table.go | 58 +++++++++++++++++++++++++++--------------- 3 files changed, 91 insertions(+), 34 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 9b0a8658d18ff..6a9ed39f5a01e 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -487,7 +487,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, switch job.Type { case model.ActionCreateSchema: - ver, err = onCreateSchema(t, job) + ver, err = onCreateSchema(d, t, job) case model.ActionDropSchema: ver, err = onDropSchema(t, job) case model.ActionCreateTable: diff --git a/ddl/schema.go b/ddl/schema.go index ac875e158059b..51df4ef1f4b7b 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tidb/meta" ) -func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { +func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID dbInfo := &model.DBInfo{} if err := job.DecodeArgs(dbInfo); err != nil { @@ -32,20 +32,13 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { dbInfo.ID = schemaID dbInfo.State = model.StateNone - dbs, err := t.ListDatabases() + err := checkSchemaNotExists(d, t, schemaID, dbInfo) if err != nil { - return ver, errors.Trace(err) - } - - for _, db := range dbs { - if db.Name.L == dbInfo.Name.L { - if db.ID != schemaID { - // The database already exists, can't create it, we should cancel this job now. - job.State = model.JobStateCancelled - return ver, infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name) - } - dbInfo = db + if infoschema.ErrDatabaseExists.Equal(err) { + // The database already exists, can't create it, we should cancel this job now. + job.State = model.JobStateCancelled } + return ver, errors.Trace(err) } ver, err = updateSchemaVersion(t, job) @@ -70,6 +63,52 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { } } +func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { + // d.infoHandle maybe nil in some test. + if d.infoHandle == nil { + return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) + } + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return err + } + is := d.infoHandle.Get() + if is.SchemaMetaVersion() == currVer { + return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) + } + return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) +} + +func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbInfo *model.DBInfo) error { + // Check database exists by name. + if is.SchemaExists(dbInfo.Name) { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name) + } + // Check database exists by ID. + if _, ok := is.SchemaByID(schemaID); ok { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name) + } + return nil +} + +func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error { + dbs, err := t.ListDatabases() + if err != nil { + return errors.Trace(err) + } + + for _, db := range dbs { + if db.Name.L == dbInfo.Name.L { + if db.ID != schemaID { + return infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name) + } + dbInfo = db + } + } + return nil +} + func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) { dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) if err != nil { diff --git a/ddl/table.go b/ddl/table.go index ece7bf9052bdb..2d992d968c176 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -50,9 +50,11 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } tbInfo.State = model.StateNone - err := checkTableNotExists(d, t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) if err != nil { - job.State = model.JobStateCancelled + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -94,11 +96,18 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(d, t, job, schemaID, tbInfo.Name.L) + err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) if err != nil { - if infoschema.ErrDatabaseNotExists.Equal(err) || !orReplace { + if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled return ver, errors.Trace(err) + } else if infoschema.ErrTableExists.Equal(err) { + if !orReplace { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } else { + return ver, errors.Trace(err) } } ver, err = updateSchemaVersion(t, job) @@ -191,8 +200,11 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - err = checkTableNotExists(d, t, job, schemaID, tblInfo.Name.L) + err = checkTableNotExists(d, t, schemaID, tblInfo.Name.L) if err != nil { + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -539,9 +551,11 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } newSchemaID := job.SchemaID - err = checkTableNotExists(d, t, job, newSchemaID, tableName.L) + err = checkTableNotExists(d, t, newSchemaID, tableName.L) if err != nil { - job.State = model.JobStateCancelled + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + } return ver, errors.Trace(err) } @@ -658,10 +672,10 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ return ver, nil } -func checkTableNotExists(d *ddlCtx, t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { +func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { // d.infoHandle maybe nil in some test. if d.infoHandle == nil { - return checkTableNotExistsFromStore(t, job, schemaID, tableName) + return checkTableNotExistsFromStore(t, schemaID, tableName) } // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() @@ -670,21 +684,25 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, job *model.Job, schemaID int64 } is := d.infoHandle.Get() if is.SchemaMetaVersion() == currVer { - // Check this table's database. - schema, ok := is.SchemaByID(schemaID) - if !ok { - return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") - } - if is.TableExists(schema.Name, model.NewCIStr(tableName)) { - return infoschema.ErrTableExists.GenWithStackByArgs(tableName) - } - return nil + return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } - return checkTableNotExistsFromStore(t, job, schemaID, tableName) + return checkTableNotExistsFromStore(t, schemaID, tableName) +} + +func checkTableNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) error { + // Check this table's database. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + if is.TableExists(schema.Name, model.NewCIStr(tableName)) { + return infoschema.ErrTableExists.GenWithStackByArgs(tableName) + } + return nil } -func checkTableNotExistsFromStore(t *meta.Meta, job *model.Job, schemaID int64, tableName string) error { +func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string) error { // Check this table's database. tables, err := t.ListTables(schemaID) if err != nil {