Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix check constraintInfo state change #44455

Merged
merged 33 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
302bb38
commit
fzzf678 May 23, 2023
ae21c86
commit
fzzf678 May 23, 2023
6e05774
commit
fzzf678 May 23, 2023
40c2d35
bazel&errdoc
fzzf678 May 23, 2023
d4d6a6e
add head
fzzf678 May 23, 2023
3d88bb6
Update tables.go
fzzf678 May 23, 2023
aa27cd8
head
fzzf678 May 23, 2023
ff45118
Update integration_test.go
fzzf678 May 23, 2023
a07bf50
refactor
fzzf678 May 24, 2023
9929450
rename
fzzf678 May 24, 2023
ebcc64c
Update ddl_api.go
fzzf678 Jun 1, 2023
915b7f3
Update ddl/constraint.go
fzzf678 Jun 1, 2023
befe763
update head
fzzf678 Jun 1, 2023
984b12c
Update constraint_test.go
fzzf678 Jun 1, 2023
8f0491a
Update constraint_test.go
fzzf678 Jun 1, 2023
3d39c83
Merge remote-tracking branch 'upstream/master' into commit_check_cons…
fzzf678 Jun 1, 2023
5af90d6
Update constraint_test.go
fzzf678 Jun 1, 2023
ce2d252
Update constraint.go
fzzf678 Jun 6, 2023
c2778d5
Update constraint.go
fzzf678 Jun 6, 2023
26a9fdf
Update constraint.go
fzzf678 Jun 6, 2023
ea01335
Update constraint.go
fzzf678 Jun 6, 2023
d5ec5bf
Update constraint.go
fzzf678 Jun 6, 2023
c470d02
Update constraint.go
fzzf678 Jun 7, 2023
5c32166
Update constraint.go
fzzf678 Jun 7, 2023
4038946
Merge remote-tracking branch 'upstream/master' into state_check_const…
fzzf678 Jun 7, 2023
0f02402
commit test case
fzzf678 Jun 7, 2023
e88d3b6
Update constraint.go
fzzf678 Jun 7, 2023
c6e4c6e
Update constraint.go
fzzf678 Jun 7, 2023
56276c2
Update constraint.go
fzzf678 Jun 7, 2023
921bfaa
commit test
fzzf678 Jun 7, 2023
3e3e2c7
refactor
fzzf678 Jun 12, 2023
61a3505
Update constraint.go
fzzf678 Jun 15, 2023
6531791
Update constraint.go
fzzf678 Jun 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/sqlexec"
)
Expand Down Expand Up @@ -86,12 +87,14 @@ func (w *worker) onAddCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (
originalState := constraintInfoInMeta.State
switch constraintInfoInMeta.State {
case model.StateNone:
// none -> write only
job.SchemaState = model.StateWriteOnly
constraintInfoInMeta.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State)
case model.StateWriteOnly:
// write only -> public
job.SchemaState = model.StateWriteReorganization
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State)
case model.StateWriteReorganization:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we add a new state for this operation?

Copy link
Contributor Author

@fzzf678 fzzf678 Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want to make sure that the state of all tidb nodes is changed to StateWriteOnly before doing remaining data verification, so that no data is written that violates the check constraint.
If doing remaining data verification in StateWriteOnly state, the state of some nodes may be still in StateNone ( am i right?), so even if the remaining data verification passes, there are still StateNone nodes that can write data that violates the check constraint. Hence I added a state to ensure that all nodes change at least to StateWriteOnly before verifying the data. Is this right or not, please give some advice, thks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, there was no PR description before, I don't know why it suddenly added a status, so ask.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems here we missed delete-only state, that is before writeonly state. write reorg means will reorg table data.

err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta, job)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -151,12 +154,10 @@ func onDropCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
originalState := constraintInfo.State
switch constraintInfo.State {
case model.StatePublic:
// public -> write only
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteOnly:
// write only -> None
// write only state constraint will still take effect to check the newly inserted data.
// So the dependent column shouldn't be dropped even in this intermediate state.
constraintInfo.State = model.StateNone
Expand Down Expand Up @@ -205,29 +206,50 @@ func checkDropCheckConstraint(t *meta.Meta, job *model.Job) (*model.TableInfo, *
return tblInfo, constraintInfo, nil
}

func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(t, job)
if err != nil {
return ver, errors.Trace(err)
}

// enforced will fetch table data and check the constraint.
if constraintInfo.Enforced != enforced && enforced {
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job)
Copy link
Contributor Author

@fzzf678 fzzf678 Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the check constraint, when changing from not enforced to enforced, remaining data verification is required. All nodes in tidb may have two states, enforced and not enforced.If the remaining data verification is carried out directly, data that violates the check constraint may also be written after passing. Therefore, state changes are added to ensure that all nodes have reached the enforced state before remaining data verification.

if enforced {
originalState := constraintInfo.State
switch constraintInfo.State {
case model.StatePublic:
job.SchemaState = model.StateWriteReorganization
constraintInfo.State = model.StateWriteReorganization
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteReorganization:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job)
if err != nil {
if !table.ErrCheckConstraintViolated.Equal(err) {
return ver, errors.Trace(err)
}
constraintInfo.Enforced = !enforced
}
constraintInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
} else {
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
if err != nil {
// check constraint error will cancel the job, job state has been changed
// to cancelled in addTableCheckConstraint.
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
if err != nil {
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
return ver, err
}

func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *model.TableInfo, *model.ConstraintInfo, bool, error) {
Expand All @@ -250,7 +272,6 @@ func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *mo
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}

// do the double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
Expand Down Expand Up @@ -316,6 +337,12 @@ func findDependentColsInExpr(expr ast.ExprNode) map[string]struct{} {
}

func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tableInfo *model.TableInfo, constr *model.ConstraintInfo, job *model.Job) error {
// Inject a fail-point to skip the remaining records check.
failpoint.Inject("mockVerifyRemainDataSuccess", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil)
}
})
// Get sessionctx from ddl context resource pool in ddl worker.
var sctx sessionctx.Context
sctx, err := w.sessPool.Get()
Expand Down
221 changes: 218 additions & 3 deletions ddl/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"sort"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -893,10 +896,7 @@ func TestAlterConstraintAddDrop(t *testing.T) {
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
// StateNone -> StateWriteOnly -> StatePublic
// Node in StateWriteOnly and StatePublic should check the constraint check.
_, checkErr = tk1.Exec("insert into t (a, b) values(5,6) ")
// Don't do the assert in the callback function.
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
Expand All @@ -908,3 +908,218 @@ func TestAlterConstraintAddDrop(t *testing.T) {
require.Errorf(t, err, "[table:3819]Check constraint 'cc' is violated.")
tk.MustExec("drop table if exists t")
}

func TestAlterAddConstraintStateChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
tk1.MustQuery(fmt.Sprintf("select count(1) from `%s`.`%s` where not %s limit 1", "test", "t", "a > 10")).Check(testkit.Rows("0"))
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
originCons := tableCommon.Constraints
tableCommon.WritableConstraints = []*table.Constraint{}
tableCommon.Constraints = []*table.Constraint{}
// insert data
tk1.MustExec("insert into t values(1)")
// recover
tableCommon.Constraints = originCons
tableCommon.WritableConstraint()
}
}

//StatNone StateWriteReorganization
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess", "return(true)"))
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c0 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12", "1"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c0` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("alter table t drop constraint c0")
tk.MustExec("delete from t where a = 1")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess"))
}

func TestAlterAddConstraintStateChange1(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StatNone -> StateWriteOnly
onJobUpdatedExportedFunc1 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
originCons := tableCommon.Constraints
tableCommon.WritableConstraints = []*table.Constraint{}
tableCommon.Constraints = []*table.Constraint{}
// insert data
tk1.MustExec("insert into t values(1)")
// recover
tableCommon.Constraints = originCons
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc1)
d.SetHook(callback)
tk.MustGetErrMsg("alter table t add constraint c1 check ( a > 10)", "[ddl:3819]Check constraint 'c1' is violated.")
tk.MustQuery("select * from t").Check(testkit.Rows("12", "1"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("delete from t where a = 1")
}

func TestAlterAddConstraintStateChange2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteOnly -> StateWriteReorganization
onJobUpdatedExportedFunc2 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteOnly
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c2' is violated.")
// recover
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c2 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c2` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("alter table t drop constraint c2")
}

func TestAlterAddConstraintStateChange3(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StatePublic {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c3' is violated.")
// recover
tableCommon.Constraints[0].State = model.StatePublic
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(callback)
tk.MustExec("alter table t add constraint c3 check ( a > 10)")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c3` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

func TestAlterEnforcedConstraintStateChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, constraint c1 check (a > 10) not enforced)")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("insert into t values(12)")

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
tableCommon, ok := constraintTable.(*tables.TableCommon)
require.True(t, ok)
tableCommon.Constraints[0].State = model.StateWriteOnly
tableCommon.WritableConstraints = []*table.Constraint{}
// insert data
tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c1' is violated.")
// recover
tableCommon.Constraints[0].State = model.StateWriteReorganization
tableCommon.WritableConstraint()
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(callback)
tk.MustExec("alter table t alter constraint c1 enforced")
tk.MustQuery("select * from t").Check(testkit.Rows("12"))
}