Skip to content

Commit

Permalink
tidb: always rebuild plan for retry. (#5219)
Browse files Browse the repository at this point in the history
Fixs record and index inconsistency bug.
  • Loading branch information
coocood authored and shenli committed Nov 25, 2017
1 parent 039cf7a commit b8bcf9c
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 40 deletions.
3 changes: 3 additions & 0 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type Statement interface {

// IsReadOnly returns if the statement is read only. For example: SelectStmt without lock.
IsReadOnly() bool

// RebuildPlan rebuilds the plan of the statement.
RebuildPlan() error
}

// Visitor visits a Node.
Expand Down
1 change: 1 addition & 0 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type ExecuteStmt struct {

Name string
UsingVars []ExprNode
ExecID uint32
}

// Accept implements Node Accept interface.
Expand Down
31 changes: 24 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (a *recordSet) Fields() ([]*ast.ResultField, error) {
for _, col := range a.executor.Schema().Columns {
dbName := col.DBName.O
if dbName == "" && col.TblName.L != "" {
dbName = a.stmt.ctx.GetSessionVars().CurrentDB
dbName = a.stmt.Ctx.GetSessionVars().CurrentDB
}
rf := &ast.ResultField{
ColumnAsName: col.ColName,
Expand All @@ -77,13 +77,13 @@ func (a *recordSet) Next() (*ast.Row, error) {
}
if row == nil {
if a.stmt != nil {
a.stmt.ctx.GetSessionVars().LastFoundRows = a.stmt.ctx.GetSessionVars().StmtCtx.FoundRows()
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
}
return nil, nil
}

if a.stmt != nil {
a.stmt.ctx.GetSessionVars().StmtCtx.AddFoundRows(1)
a.stmt.Ctx.GetSessionVars().StmtCtx.AddFoundRows(1)
}
return &ast.Row{Data: row}, nil
}
Expand All @@ -110,7 +110,9 @@ type ExecStmt struct {
// Text represents the origin query text.
Text string

ctx context.Context
StmtNode ast.StmtNode

Ctx context.Context
startTime time.Time
isPreparedStmt bool

Expand All @@ -133,13 +135,28 @@ func (a *ExecStmt) IsReadOnly() bool {
return a.ReadOnly
}

// RebuildPlan implements ast.Statement interface.
func (a *ExecStmt) RebuildPlan() error {
is := GetInfoSchema(a.Ctx)
a.InfoSchema = is
if err := plan.Preprocess(a.StmtNode, is, a.Ctx); err != nil {
return errors.Trace(err)
}
p, err := plan.Optimize(a.Ctx, a.StmtNode, is)
if err != nil {
return errors.Trace(err)
}
a.Plan = p
return nil
}

// Exec implements the ast.Statement Exec interface.
// This function builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
a.startTime = time.Now()
a.ctx = ctx
a.Ctx = ctx

if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
oriStats := ctx.GetSessionVars().Systems[variable.TiDBBuildStatsConcurrency]
Expand Down Expand Up @@ -288,8 +305,8 @@ func (a *ExecStmt) logSlowQuery(succ bool) {
if len(sql) > cfg.Log.QueryLogMaxLen {
sql = sql[:cfg.Log.QueryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql))
}
connID := a.ctx.GetSessionVars().ConnectionID
currentDB := a.ctx.GetSessionVars().CurrentDB
connID := a.Ctx.GetSessionVars().ConnectionID
currentDB := a.Ctx.GetSessionVars().CurrentDB
logEntry := log.NewEntry(logutil.SlowQueryLogger)
logEntry.Data = log.Fields{
"connectionId": connID,
Expand Down
2 changes: 2 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
Cacheable: plan.Cacheable(stmtNode),
Text: stmtNode.Text(),
ReadOnly: readOnly,
Ctx: ctx,
StmtNode: stmtNode,
}, nil
}

Expand Down
8 changes: 8 additions & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ func (e *DeallocateExec) Open() error {

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interface{}) ast.Statement {
execStmtNode := &ast.ExecuteStmt{ExecID: ID}
execStmtNode.UsingVars = make([]ast.ExprNode, len(args))
for i, val := range args {
execStmtNode.UsingVars[i] = ast.NewValueExpr(val)
}

execPlan := &plan.Execute{ExecID: ID}
execPlan.UsingVars = make([]expression.Expression, len(args))
for i, val := range args {
Expand All @@ -323,6 +329,8 @@ func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interfac
InfoSchema: GetInfoSchema(ctx),
Plan: execPlan,
ReadOnly: false,
Ctx: ctx,
StmtNode: execStmtNode,
}

if prepared, ok := ctx.GetSessionVars().PreparedStmts[ID].(*Prepared); ok {
Expand Down
10 changes: 10 additions & 0 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ func (s *testSuite) TestPrepared(c *C) {
stmt := executor.CompileExecutePreparedStmt(tk.Se, stmtId, 1)
c.Assert(stmt.OriginText(), Equals, query)

// Check that rebuild plan works.
tk.Se.PrepareTxnCtx()
err = stmt.RebuildPlan()
c.Assert(err, IsNil)
rs, err := stmt.Exec(tk.Se)
c.Assert(err, IsNil)
_, err = rs.Next()
c.Assert(err, IsNil)
c.Assert(rs.Close(), IsNil)

// Make schema change.
tk.Exec("create table prepare2 (a int)")

Expand Down
17 changes: 17 additions & 0 deletions new_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,3 +1506,20 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) {
c.Assert(err, IsNil)
tk.MustQuery("select * from t where t.b = 5").Check(testkit.Rows("1 5"))
}

func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (a int primary key, b int unique, c int)")
tk.MustExec("insert into t values (1, 1, 1)")

tk1.MustExec("begin")
tk1.MustExec("update t set b = 1, c = 2 where b = 2")
tk1.MustExec("update t set b = 1, c = 2 where a = 1")

// Create a conflict to reproduces the bug that the second update statement in retry
// has a dirty table but doesn't use UnionScan.
tk.MustExec("update t set b = 2 where a = 1")

tk1.MustExec("commit")
}
2 changes: 1 addition & 1 deletion plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *planBuilder) buildExecute(v *ast.ExecuteStmt) Plan {
}
vars = append(vars, newExpr)
}
exe := &Execute{Name: v.Name, UsingVars: vars}
exe := &Execute{Name: v.Name, ExecID: v.ExecID, UsingVars: vars}
exe.SetSchema(expression.NewSchema())
return exe
}
Expand Down
40 changes: 8 additions & 32 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (s *session) doCommitWithRetry() error {
// We make larger transactions retry less times to prevent cluster resource outage.
txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit)
maxRetryCount := commitRetryLimit - int(float64(commitRetryLimit-1)*txnSizeRate)
err = s.retry(maxRetryCount, domain.ErrInfoSchemaChanged.Equal(err))
err = s.retry(maxRetryCount)
}
}
s.cleanRetryInfo()
Expand Down Expand Up @@ -387,7 +387,7 @@ func (s *session) isRetryableError(err error) bool {
return kv.IsRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err)
}

func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
func (s *session) retry(maxCnt int) error {
connID := s.sessionVars.ConnectionID
if s.sessionVars.TxnCtx.ForUpdate {
return errors.Errorf("[%d] can not retry select for update statement", connID)
Expand All @@ -411,19 +411,15 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
if st.IsReadOnly() {
continue
}
txt := st.OriginText()
if infoSchemaChanged {
st, err = updateStatement(st, s, txt)
if err != nil {
return errors.Trace(err)
}
nh.history[i].st = st
err = st.RebuildPlan()
if err != nil {
return errors.Trace(err)
}

if retryCnt == 0 {
// We do not have to log the query every time.
// We print the queries at the first try only.
log.Warnf("[%d] Retry [%d] query [%d] %s", connID, retryCnt, i, sqlForLog(txt))
log.Warnf("[%d] Retry [%d] query [%d] %s", connID, retryCnt, i, sqlForLog(st.OriginText()))
} else {
log.Warnf("[%d] Retry [%d] query [%d]", connID, retryCnt, i)
}
Expand All @@ -449,7 +445,6 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
return errors.Trace(err)
}
retryCnt++
infoSchemaChanged = domain.ErrInfoSchemaChanged.Equal(err)
if retryCnt >= maxCnt {
log.Warnf("[%d] Retry reached max count %d", connID, retryCnt)
return errors.Trace(err)
Expand All @@ -462,27 +457,6 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error {
return err
}

func updateStatement(st ast.Statement, s *session, txt string) (ast.Statement, error) {
// statement maybe stale because of infoschema changed, this function will return the updated one.
if st.IsPrepared() {
// TODO: Rebuild plan if infoschema changed, reuse the statement otherwise.
} else {
// Rebuild plan if infoschema changed, reuse the statement otherwise.
charset, collation := s.sessionVars.GetCharsetInfo()
stmt, err := s.parser.ParseOneStmt(txt, charset, collation)
if err != nil {
return st, errors.Trace(err)
}
st, err = Compile(s, stmt)
if err != nil {
// If a txn is inserting data when DDL is dropping column,
// it would fail to commit and retry, and run here then.
return st, errors.Trace(err)
}
}
return st, nil
}

func sqlForLog(sql string) string {
if len(sql) > sqlLogMaxLen {
return sql[:sqlLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql))
Expand Down Expand Up @@ -709,6 +683,8 @@ func (s *session) Execute(sql string) (recordSets []ast.RecordSet, err error) {
Expensive: cacheValue.(*cache.SQLCacheValue).Expensive,
Text: stmtNode.Text(),
ReadOnly: ast.IsReadOnly(stmtNode),
Ctx: s,
StmtNode: stmtNode,
}

s.PrepareTxnCtx()
Expand Down

0 comments on commit b8bcf9c

Please sign in to comment.