From 2fdfe368732ef6dbfa5aa09775621f8f7873932a Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Sat, 25 Nov 2017 09:50:32 +0800 Subject: [PATCH] tidb: always rebuild plan for retry. Fixs record and index inconsistency bug. --- ast/ast.go | 3 +++ ast/misc.go | 1 + executor/adapter.go | 31 +++++++++++++++++++++++------- executor/compiler.go | 2 ++ executor/prepared.go | 8 ++++++++ executor/prepared_test.go | 10 ++++++++++ new_session_test.go | 17 +++++++++++++++++ plan/planbuilder.go | 2 +- session.go | 40 ++++++++------------------------------- 9 files changed, 74 insertions(+), 40 deletions(-) diff --git a/ast/ast.go b/ast/ast.go index 8032ef37ed34f..e55aceb95bdc2 100644 --- a/ast/ast.go +++ b/ast/ast.go @@ -179,6 +179,9 @@ type Statement interface { // IsReadOnly returns if the statement is read only. For example: SelectStmt without lock. IsReadOnly() bool + + // RebuildPlan rebuilds the plan of the statement. + RebuildPlan() error } // Visitor visits a Node. diff --git a/ast/misc.go b/ast/misc.go index afb2afbd12e66..ada1b90d47b27 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -168,6 +168,7 @@ type ExecuteStmt struct { Name string UsingVars []ExprNode + ExecID uint32 } // Accept implements Node Accept interface. diff --git a/executor/adapter.go b/executor/adapter.go index 18f2b188c9418..0ae871b4739ae 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -51,7 +51,7 @@ func (a *recordSet) Fields() ([]*ast.ResultField, error) { for _, col := range a.executor.Schema().Columns { dbName := col.DBName.O if dbName == "" && col.TblName.L != "" { - dbName = a.stmt.ctx.GetSessionVars().CurrentDB + dbName = a.stmt.Ctx.GetSessionVars().CurrentDB } rf := &ast.ResultField{ ColumnAsName: col.ColName, @@ -77,13 +77,13 @@ func (a *recordSet) Next() (*ast.Row, error) { } if row == nil { if a.stmt != nil { - a.stmt.ctx.GetSessionVars().LastFoundRows = a.stmt.ctx.GetSessionVars().StmtCtx.FoundRows() + a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows() } return nil, nil } if a.stmt != nil { - a.stmt.ctx.GetSessionVars().StmtCtx.AddFoundRows(1) + a.stmt.Ctx.GetSessionVars().StmtCtx.AddFoundRows(1) } return &ast.Row{Data: row}, nil } @@ -110,7 +110,9 @@ type ExecStmt struct { // Text represents the origin query text. Text string - ctx context.Context + StmtNode ast.StmtNode + + Ctx context.Context startTime time.Time isPreparedStmt bool @@ -133,13 +135,28 @@ func (a *ExecStmt) IsReadOnly() bool { return a.ReadOnly } +// RebuildPlan implements ast.Statement interface. +func (a *ExecStmt) RebuildPlan() error { + is := GetInfoSchema(a.Ctx) + a.InfoSchema = is + if err := plan.Preprocess(a.StmtNode, is, a.Ctx); err != nil { + return errors.Trace(err) + } + p, err := plan.Optimize(a.Ctx, a.StmtNode, is) + if err != nil { + return errors.Trace(err) + } + a.Plan = p + return nil +} + // Exec implements the ast.Statement Exec interface. // This function builds an Executor from a plan. If the Executor doesn't return result, // like the INSERT, UPDATE statements, it executes in this function, if the Executor returns // result, execution is done after this function returns, in the returned ast.RecordSet Next method. func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) { a.startTime = time.Now() - a.ctx = ctx + a.Ctx = ctx if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { oriStats := ctx.GetSessionVars().Systems[variable.TiDBBuildStatsConcurrency] @@ -288,8 +305,8 @@ func (a *ExecStmt) logSlowQuery(succ bool) { if len(sql) > cfg.Log.QueryLogMaxLen { sql = sql[:cfg.Log.QueryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql)) } - connID := a.ctx.GetSessionVars().ConnectionID - currentDB := a.ctx.GetSessionVars().CurrentDB + connID := a.Ctx.GetSessionVars().ConnectionID + currentDB := a.Ctx.GetSessionVars().CurrentDB logEntry := log.NewEntry(logutil.SlowQueryLogger) logEntry.Data = log.Fields{ "connectionId": connID, diff --git a/executor/compiler.go b/executor/compiler.go index bcea287a39740..a049757e2f95b 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -67,6 +67,8 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm Cacheable: plan.Cacheable(stmtNode), Text: stmtNode.Text(), ReadOnly: readOnly, + Ctx: ctx, + StmtNode: stmtNode, }, nil } diff --git a/executor/prepared.go b/executor/prepared.go index a06f2039a026b..bafbdb9a40cfa 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -312,6 +312,12 @@ func (e *DeallocateExec) Open() error { // CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement. func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interface{}) ast.Statement { + execStmtNode := &ast.ExecuteStmt{ExecID: ID} + execStmtNode.UsingVars = make([]ast.ExprNode, len(args)) + for i, val := range args { + execStmtNode.UsingVars[i] = ast.NewValueExpr(val) + } + execPlan := &plan.Execute{ExecID: ID} execPlan.UsingVars = make([]expression.Expression, len(args)) for i, val := range args { @@ -323,6 +329,8 @@ func CompileExecutePreparedStmt(ctx context.Context, ID uint32, args ...interfac InfoSchema: GetInfoSchema(ctx), Plan: execPlan, ReadOnly: false, + Ctx: ctx, + StmtNode: execStmtNode, } if prepared, ok := ctx.GetSessionVars().PreparedStmts[ID].(*Prepared); ok { diff --git a/executor/prepared_test.go b/executor/prepared_test.go index dd739aa275d1c..78d69f06d7d76 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -67,6 +67,16 @@ func (s *testSuite) TestPrepared(c *C) { stmt := executor.CompileExecutePreparedStmt(tk.Se, stmtId, 1) c.Assert(stmt.OriginText(), Equals, query) + // Check that rebuild plan works. + tk.Se.PrepareTxnCtx() + err = stmt.RebuildPlan() + c.Assert(err, IsNil) + rs, err := stmt.Exec(tk.Se) + c.Assert(err, IsNil) + _, err = rs.Next() + c.Assert(err, IsNil) + c.Assert(rs.Close(), IsNil) + // Make schema change. tk.Exec("create table prepare2 (a int)") diff --git a/new_session_test.go b/new_session_test.go index 1711432902b23..1d1a19a3666d8 100644 --- a/new_session_test.go +++ b/new_session_test.go @@ -1506,3 +1506,20 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) { c.Assert(err, IsNil) tk.MustQuery("select * from t where t.b = 5").Check(testkit.Rows("1 5")) } + +func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (a int primary key, b int unique, c int)") + tk.MustExec("insert into t values (1, 1, 1)") + + tk1.MustExec("begin") + tk1.MustExec("update t set b = 1, c = 2 where b = 2") + tk1.MustExec("update t set b = 1, c = 2 where a = 1") + + // Create a conflict to reproduces the bug that the second update statement in retry + // has a dirty table but doesn't use UnionScan. + tk.MustExec("update t set b = 2 where a = 1") + + tk1.MustExec("commit") +} diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 7ad8331a72061..8afe9066e7f9b 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -186,7 +186,7 @@ func (b *planBuilder) buildExecute(v *ast.ExecuteStmt) Plan { } vars = append(vars, newExpr) } - exe := &Execute{Name: v.Name, UsingVars: vars} + exe := &Execute{Name: v.Name, ExecID: v.ExecID, UsingVars: vars} exe.SetSchema(expression.NewSchema()) return exe } diff --git a/session.go b/session.go index 16bff8483e126..888b9806ea714 100644 --- a/session.go +++ b/session.go @@ -304,7 +304,7 @@ func (s *session) doCommitWithRetry() error { // We make larger transactions retry less times to prevent cluster resource outage. txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit) maxRetryCount := commitRetryLimit - int(float64(commitRetryLimit-1)*txnSizeRate) - err = s.retry(maxRetryCount, domain.ErrInfoSchemaChanged.Equal(err)) + err = s.retry(maxRetryCount) } } s.cleanRetryInfo() @@ -387,7 +387,7 @@ func (s *session) isRetryableError(err error) bool { return kv.IsRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err) } -func (s *session) retry(maxCnt int, infoSchemaChanged bool) error { +func (s *session) retry(maxCnt int) error { connID := s.sessionVars.ConnectionID if s.sessionVars.TxnCtx.ForUpdate { return errors.Errorf("[%d] can not retry select for update statement", connID) @@ -411,19 +411,15 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error { if st.IsReadOnly() { continue } - txt := st.OriginText() - if infoSchemaChanged { - st, err = updateStatement(st, s, txt) - if err != nil { - return errors.Trace(err) - } - nh.history[i].st = st + err = st.RebuildPlan() + if err != nil { + return errors.Trace(err) } if retryCnt == 0 { // We do not have to log the query every time. // We print the queries at the first try only. - log.Warnf("[%d] Retry [%d] query [%d] %s", connID, retryCnt, i, sqlForLog(txt)) + log.Warnf("[%d] Retry [%d] query [%d] %s", connID, retryCnt, i, sqlForLog(st.OriginText())) } else { log.Warnf("[%d] Retry [%d] query [%d]", connID, retryCnt, i) } @@ -449,7 +445,6 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error { return errors.Trace(err) } retryCnt++ - infoSchemaChanged = domain.ErrInfoSchemaChanged.Equal(err) if retryCnt >= maxCnt { log.Warnf("[%d] Retry reached max count %d", connID, retryCnt) return errors.Trace(err) @@ -462,27 +457,6 @@ func (s *session) retry(maxCnt int, infoSchemaChanged bool) error { return err } -func updateStatement(st ast.Statement, s *session, txt string) (ast.Statement, error) { - // statement maybe stale because of infoschema changed, this function will return the updated one. - if st.IsPrepared() { - // TODO: Rebuild plan if infoschema changed, reuse the statement otherwise. - } else { - // Rebuild plan if infoschema changed, reuse the statement otherwise. - charset, collation := s.sessionVars.GetCharsetInfo() - stmt, err := s.parser.ParseOneStmt(txt, charset, collation) - if err != nil { - return st, errors.Trace(err) - } - st, err = Compile(s, stmt) - if err != nil { - // If a txn is inserting data when DDL is dropping column, - // it would fail to commit and retry, and run here then. - return st, errors.Trace(err) - } - } - return st, nil -} - func sqlForLog(sql string) string { if len(sql) > sqlLogMaxLen { return sql[:sqlLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql)) @@ -709,6 +683,8 @@ func (s *session) Execute(sql string) (recordSets []ast.RecordSet, err error) { Expensive: cacheValue.(*cache.SQLCacheValue).Expensive, Text: stmtNode.Text(), ReadOnly: ast.IsReadOnly(stmtNode), + Ctx: s, + StmtNode: stmtNode, } s.PrepareTxnCtx()