Skip to content

Commit

Permalink
Merge branch 'release-3.0' into automated-cherry-pick-of-pingcap#11237-…
Browse files Browse the repository at this point in the history
…release-3.0
  • Loading branch information
wshwsh12 authored Jul 15, 2019
2 parents 7229ef7 + 9e4e8da commit 2c8ae00
Show file tree
Hide file tree
Showing 23 changed files with 326 additions and 107 deletions.
94 changes: 51 additions & 43 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,58 +30,66 @@ func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo,
regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter))
}
if scatter {
waitScatterRegionFinish(store, regionIDs)
waitScatterRegionFinish(store, regionIDs...)
}
}

func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
regionIDs := make([]uint64, 0, len(tbInfo.Indices)+1)
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.
splitPreSplitedTable(store, tbInfo, scatter)
} else {
regionID := splitRecordRegion(store, tbInfo.ID, scatter)
if scatter {
waitScatterRegionFinish(store, regionID)
}
}
}

// Split table region.
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tbInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions-1)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tbInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID),
zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
} else {
regionIDs = append(regionIDs, splitRecordRegion(store, tbInfo.ID, scatter))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
waitScatterRegionFinish(store, regionIDs)
waitScatterRegionFinish(store, regionIDs...)
}
}

Expand Down Expand Up @@ -111,7 +119,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter
return regionIDs
}

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs []uint64) {
func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
}

// recordSet wraps an executor, implements sqlexec.RecordSet interface
Expand Down Expand Up @@ -245,8 +245,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
sql = ss.SecureText()
}
}
maxExecutionTime := getMaxExecutionTime(sctx, a.StmtNode)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd)
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

Expand Down Expand Up @@ -285,6 +286,20 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}, nil
}

// getMaxExecutionTime get the max execution timeout value.
func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 {
ret := sctx.GetSessionVars().MaxExecutionTime
if sel, ok := stmtNode.(*ast.SelectStmt); ok {
for _, hint := range sel.TableHints {
if hint.HintName.L == variable.MaxExecutionTime {
ret = hint.MaxExecutionTime
break
}
}
}
return ret
}

type chunkRowRecordSet struct {
rows []chunk.Row
idx int
Expand Down
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (e *InsertExec) Open(ctx context.Context) error {

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, e.Table, handle, e.GenExprs)
oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.Logger(context.Background()).Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
Expand Down
14 changes: 14 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,17 @@ func (s *testSuite3) TestAllowInvalidDates(c *C) {
runWithMode("STRICT_TRANS_TABLES,ALLOW_INVALID_DATES")
runWithMode("ALLOW_INVALID_DATES")
}

func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int,primary key(a,b)) partition by range(a) (partition p0 values less than (100),partition p1 values less than (1000))`)
tk.MustExec(`insert into t1 set a=1, b=1`)
tk.MustExec(`insert into t1 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t1`).Check(testkit.Rows("1 1"))

tk.MustExec(`create table t2 (a int,b int,primary key(a,b)) partition by hash(a) partitions 4`)
tk.MustExec(`insert into t2 set a=1,b=1;`)
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))
}
9 changes: 7 additions & 2 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,19 @@ func (s *testFlushSuite) TestFlushPrivilegesPanic(c *C) {
c.Assert(err, IsNil)
defer store.Close()

config.GetGlobalConfig().Security.SkipGrantTable = true
saveConf := config.GetGlobalConfig()
conf := config.NewConfig()
conf.Security.SkipGrantTable = true
config.StoreGlobalConfig(conf)

dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer dom.Close()

tk := testkit.NewTestKit(c, store)
tk.MustExec("FLUSH PRIVILEGES")
config.GetGlobalConfig().Security.SkipGrantTable = false

config.StoreGlobalConfig(saveConf)
}

func (s *testSuite3) TestDropStats(c *C) {
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGm
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
Expand Down
15 changes: 15 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,3 +1091,18 @@ func (s *testAnalyzeSuite) TestLimitCrossEstimation(c *C) {
" └─TableScan_19 6.00 cop table:t, keep order:false",
))
}

func (s *testAnalyzeSuite) TestUpdateProjEliminate(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
defer func() {
dom.Close()
store.Close()
}()

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("explain update t t1, (select distinct b from t) t2 set t1.b = t2.b")
}
4 changes: 3 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2587,7 +2587,9 @@ func (b *PlanBuilder) buildUpdate(update *ast.UpdateStmt) (Plan, error) {

updt := Update{OrderedList: orderedList}.Init(b.ctx)
updt.SetSchema(p.Schema())
updt.SelectPlan, err = DoOptimize(b.optFlag, p)
// We cannot apply projection elimination when building the subplan, because
// columns in orderedList cannot be resolved.
updt.SelectPlan, err = DoOptimize(b.optFlag&^flagEliminateProjection, p)
if err != nil {
return nil, err
}
Expand Down
22 changes: 19 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -262,6 +263,11 @@ func (cc *clientConn) readPacket() ([]byte, error) {
}

func (cc *clientConn) writePacket(data []byte) error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.writePacket(data)
}

Expand Down Expand Up @@ -845,7 +851,11 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
cc.lastCmd = string(hack.String(data))
token := cc.server.getToken()
defer func() {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep)
// if handleChangeUser failed, cc.ctx may be nil
if cc.ctx != nil {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep, 0)
}

cc.server.releaseToken(token)
span.Finish()
}()
Expand All @@ -860,9 +870,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
switch cmd {
case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset,
mysql.ComSetOption, mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd)
cc.ctx.SetProcessInfo("", t, cmd, 0)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use "+dataStr, t, cmd)
cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0)
}

switch cmd {
Expand Down Expand Up @@ -925,6 +935,11 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) {
}

func (cc *clientConn) flush() error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.flush()
}

Expand Down Expand Up @@ -1255,6 +1270,7 @@ func (cc *clientConn) writeResultset(ctx context.Context, rs ResultSet, binary b
if err != nil {
return err
}

return cc.flush()
}

Expand Down
2 changes: 1 addition & 1 deletion server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute)
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
if rs == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
Expand Down
Loading

0 comments on commit 2c8ae00

Please sign in to comment.