Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
Browse files Browse the repository at this point in the history
…mistic-lock-optimize
  • Loading branch information
MyonKeminta committed Jan 12, 2023
2 parents 216f739 + eef8438 commit 6c8fd71
Show file tree
Hide file tree
Showing 58 changed files with 1,213 additions and 320 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_download_sdk(
"https://mirrors.aliyun.com/golang/{}",
"https://dl.google.com/go/{}",
],
version = "1.19.3",
version = "1.19.5",
)

go_register_toolchains(
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
if err != nil {
return errors.Trace(err)
}
if err = session.InitMDLVariable(store); err != nil {
return errors.Trace(err)
}

// because domain was created during the whole program exists.
// and it will register br info to info syncer.
// we'd better close it as soon as possible.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func NewLocalBackend(
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()))
pdCliForTiKV := &tikvclient.CodecPDClient{Client: pdCtl.GetPDClient()}
pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,18 @@ outside:

func (parser *CSVParser) readQuotedField() error {
for {
prevPos := parser.pos
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
err = parser.replaceEOF(err, errUnterminatedQuotedField)
if err != nil {
if errors.Cause(err) == io.EOF {
// return the position of quote to the caller.
// because we return an error here, the parser won't
// use the `pos` again, so it's safe to modify it here.
parser.pos = prevPos - 1
// set buf to parser.buf in order to print err log
parser.buf = content
err = parser.replaceEOF(err, errUnterminatedQuotedField)
}
return err
}
parser.recordBuffer = append(parser.recordBuffer, content...)
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db-schema-create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create database if not exists db;
1 change: 1 addition & 0 deletions br/tests/lightning_csv/errData/db.test-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table test(a int primary key, b int, c int, d int);
3 changes: 3 additions & 0 deletions br/tests/lightning_csv/errData/db.test.1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,2,3,4
2,10,4,5
1111,",7,8
8 changes: 8 additions & 0 deletions br/tests/lightning_csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ for BACKEND in tidb local; do
check_not_contains 'id:'

done

set +e
run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null
set -e
# err content presented
grep ",7,8" "$TEST_DIR/lightning-err.log"
# pos should not set to end
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"
25 changes: 25 additions & 0 deletions ci.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Commands to trigger ci pipeline

## Guide

ci pipeline will be triggered when your comment on pull request matched command. But we have some task that will be triggered manually.

## Commands

| ci pipeline | Commands |
| ---------------------------------------- |-----------------------------------------------------------------|
| tidb_ghpr_coverage | /run-coverage |
| tidb_ghpr_build_arm64 | /run-build-arm64 comment=true |
| tidb_ghpr_common_test | /run-common-test<br />/run-integration-tests |
| tidb_ghpr_integration_br_test | /run-integration-br-test<br />/run-integration-tests |
| tidb_ghpr_integration_campatibility_test | /run-integration-compatibility-test<br />/run-integration-tests |
| tidb_ghpr_integration_common_test | /run-integration-common-test<br />/run-integration-tests |
| tidb_ghpr_integration_copr_test | /run-integration-copr-test<br />/run-integration-tests |
| tidb_ghpr_integration_ddl_test | /run-integration-ddl-test<br />/run-integration-tests |
| tidb_ghpr_monitor_test | /run-monitor-test |
| tidb_ghpr_mybatis | /run-mybatis-test<br />/run-integration-tests |
| tidb_ghpr_sqllogic_test_1 | /run-sqllogic-test<br />/run-integration-tests |
| tidb_ghpr_sqllogic_test_2 | /run-sqllogic-test<br />/run-integration-tests |
| tidb_ghpr_tics_test | /run-tics-test<br />/run-integration-tests |
| tidb_ghpr_unit_test | /run-unit-test<br />/run-all-tests<br />/merge |

24 changes: 13 additions & 11 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
// workers to do this in the DDL owner node.
//
// DDL owner thread
// DDL owner thread (also see comments before runReorgJob func)
// ^
// | (reorgCtx.doneCh)
// |
Expand Down Expand Up @@ -583,9 +583,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
err = dc.isReorgRunnable(reorgInfo.Job.ID)
}

// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",

Expand Down Expand Up @@ -614,7 +615,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("take time", elapsedTime.String()))
zap.String("take time", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
return nil
}

Expand Down Expand Up @@ -1320,15 +1322,15 @@ func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte)
}

// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table.
func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJob) error {
sess, ok := sessCtx.(*session)
func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error {
s, ok := sctx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", sessCtx)
return errors.Errorf("sess ctx:%#v convert session failed", sctx)
}

return runInTxn(sess, func(se *session) error {
return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job")
if err != nil {
return errors.Trace(err)
Expand All @@ -1342,13 +1344,13 @@ func MoveBackfillJobsToHistoryTable(sessCtx sessionctx.Context, bfJob *BackfillJ
return errors.Trace(err)
}
startTS := txn.StartTS()
err = RemoveBackfillJob(sess, true, bJobs[0])
err = RemoveBackfillJob(se, true, bJobs[0])
if err == nil {
for _, bj := range bJobs {
bj.State = model.JobStateCancelled
bj.FinishTS = startTS
}
err = AddBackfillHistoryJob(sess, bJobs)
err = AddBackfillHistoryJob(se, bJobs)
}
logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs)))
return errors.Trace(err)
Expand Down
17 changes: 12 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,13 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess)
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = errors.Trace(err1)
return
}
defer w.sessPool.put(sctx)
rh := newReorgHandler(newSession(sctx))
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1291,8 +1297,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return w.reformatErrors(err)
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
Expand Down Expand Up @@ -1376,8 +1382,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords))
warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords))
// Optimize for few warnings!
warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
warningsCountMap := make(map[errors.ErrorID]int64, 2)
for _, rowRecord := range rowRecords {
taskCtx.scanCount++

Expand Down
15 changes: 15 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2421,3 +2421,18 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustExec("admin check table t")
}

func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}
19 changes: 19 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4528,6 +4528,25 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) {
` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`))
}

func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
t.Skip("waiting for supporting Modify Partition Column again")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "truncWarn"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`set sql_mode = default`)
tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`)
tk.MustExec(`insert into t values ("123456"),(" 654321")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '")
tk.MustExec(`set sql_mode = ''`)
tk.MustExec(`alter table t modify a varchar(5)`)
// Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
54 changes: 50 additions & 4 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,7 @@ func TestAddExpressionIndexRollback(t *testing.T) {
// Check whether the reorg information is cleaned up.
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err))
require.Nil(t, element)
require.Nil(t, start)
Expand Down Expand Up @@ -1577,3 +1574,52 @@ func TestSetInvalidDefaultValueAfterModifyColumn(t *testing.T) {
wg.Wait()
require.EqualError(t, checkErr, "[ddl:1101]BLOB/TEXT/JSON column 'a' can't have a default value")
}

func TestMDLTruncateTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk3 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int);")
tk.MustExec("begin")
tk.MustExec("select * from t for update")

var wg sync.WaitGroup

hook := &ddl.TestDDLCallback{Do: dom}
wg.Add(2)
var timetk2 time.Time
var timetk3 time.Time

one := false
f := func(job *model.Job) {
if !one {
one = true
} else {
return
}
go func() {
tk3.MustExec("truncate table test.t")
timetk3 = time.Now()
wg.Done()
}()
}

hook.OnJobUpdatedExported.Store(&f)
dom.DDL().SetHook(hook)

go func() {
tk2.MustExec("truncate table test.t")
timetk2 = time.Now()
wg.Done()
}()

time.Sleep(2 * time.Second)
timeMain := time.Now()
tk.MustExec("commit")
wg.Wait()
require.True(t, timetk2.After(timeMain))
require.True(t, timetk3.After(timeMain))
}
15 changes: 14 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) {
return info, nil
}

_, info.ReorgHandle, _, _, err = newReorgHandler(t, sess).GetDDLReorgHandle(reorgJob)
_, info.ReorgHandle, _, _, err = newReorgHandler(sess).GetDDLReorgHandle(reorgJob)
if err != nil {
if meta.ErrDDLReorgElementNotExist.Equal(err) {
return info, nil
Expand Down Expand Up @@ -1584,6 +1584,19 @@ func (s *session) session() sessionctx.Context {
return s.Context
}

func (s *session) runInTxn(f func(*session) error) (err error) {
err = s.begin()
if err != nil {
return err
}
err = f(s)
if err != nil {
s.rollback()
return
}
return errors.Trace(s.commit())
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
iterator, err := GetLastHistoryDDLJobsIterator(m)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
if err != nil {
return err
}
CleanupDDLReorgHandles(job, w.sess)
asyncNotify(d.ddlJobDoneCh)
return nil
}
Expand Down
Loading

0 comments on commit 6c8fd71

Please sign in to comment.