diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 3c9171f6ad91c..ccbd5bd2e6be7 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1052,8 +1052,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) { f := func(c *C, err1, err2 error) { c.Assert(err1, IsNil) c.Assert(err2, IsNil) - _, err := s.se.Execute(context.Background(), "select * from t") + rs, err := s.se.Execute(context.Background(), "select * from t") c.Assert(err, IsNil) + c.Assert(rs[0].Close(), IsNil) } s.testControlParallelExecSQL(c, sql, sql, f) } @@ -1064,8 +1065,9 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum f := func(c *C, err1, err2 error) { c.Assert(err1, IsNil) c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: oldCol is a dependent column 'a' for generated column") - _, err := s.se.Execute(context.Background(), "select * from t") + rs, err := s.se.Execute(context.Background(), "select * from t") c.Assert(err, IsNil) + c.Assert(rs[0].Close(), IsNil) } s.testControlParallelExecSQL(c, sql1, sql2, f) } @@ -1076,8 +1078,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) { f := func(c *C, err1, err2 error) { c.Assert(err1, IsNil) c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: this column has primary key flag") - _, err := s.se.Execute(context.Background(), "select * from t") + rs, err := s.se.Execute(context.Background(), "select * from t") c.Assert(err, IsNil) + c.Assert(rs[0].Close(), IsNil) } s.testControlParallelExecSQL(c, sql1, sql2, f) } @@ -1361,12 +1364,24 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s wg.Add(2) go func() { defer wg.Done() - _, err1 = se.Execute(context.Background(), sql1) + var rss []sqlexec.RecordSet + rss, err1 = se.Execute(context.Background(), sql1) + if err1 == nil && len(rss) > 0 { + for _, rs := range rss { + c.Assert(rs.Close(), IsNil) + } + } }() go func() { defer wg.Done() <-ch - _, err2 = se1.Execute(context.Background(), sql2) + var rss []sqlexec.RecordSet + rss, err2 = se1.Execute(context.Background(), sql2) + if err2 == nil && len(rss) > 0 { + for _, rs := range rss { + c.Assert(rs.Close(), IsNil) + } + } }() wg.Wait() diff --git a/executor/adapter.go b/executor/adapter.go index b6bd6fe2f2033..7dd83d2de71b1 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -567,6 +567,12 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex ctx = opentracing.ContextWithSpan(ctx, span1) } + var err error + defer func() { + terror.Log(e.Close()) + a.logAudit() + }() + // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. switch e.(type) { @@ -581,12 +587,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex } } - var err error - defer func() { - terror.Log(e.Close()) - a.logAudit() - }() - err = Next(ctx, e, newFirstChunk(e)) if err != nil { return nil, err diff --git a/executor/aggregate.go b/executor/aggregate.go index c5a305694c7a1..cf5bf5687e07f 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" @@ -298,14 +299,17 @@ func (e *HashAggExec) Close() error { // Open implements the Executor Open interface. func (e *HashAggExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { if val.(bool) { failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error")) } }) + + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + // If panic here, the children executor should be closed because they are open. + defer closeBaseExecutor(&e.baseExecutor) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) @@ -344,6 +348,15 @@ func (e *HashAggExec) initForUnparallelExec() { } } +func closeBaseExecutor(b *baseExecutor) { + if r := recover(); r != nil { + // Release the resource, but throw the panic again and let the top level handle it. + terror.Log(b.Close()) + logutil.BgLogger().Warn("panic in Open(), close base executor and throw exception again") + panic(r) + } +} + func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { sessionVars := e.ctx.GetSessionVars() finalConcurrency := sessionVars.HashAggFinalConcurrency() @@ -1218,14 +1231,18 @@ type StreamAggExec struct { // Open implements the Executor Open interface. func (e *StreamAggExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { if val.(bool) { failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error")) } }) + + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + // If panic in Open, the children executor should be closed because they are open. + defer closeBaseExecutor(&e.baseExecutor) + e.childResult = newFirstChunk(e.children[0]) e.executed = false e.isChildReturnEmpty = true diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index b9b01d9678762..c427156ba073b 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -43,7 +43,7 @@ import ( "github.com/pingcap/tidb/util/tableutil" ) -var _ = Suite(&testExecSuite{}) +var _ = SerialSuites(&testExecSuite{}) var _ = SerialSuites(&testExecSerialSuite{}) // Note: it's a tricky way to export the `inspectionSummaryRules` and `inspectionRules` for unit test but invisible for normal code diff --git a/executor/explain.go b/executor/explain.go index a584028b686ee..4e6116975b978 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -44,6 +44,10 @@ func (e *ExplainExec) Open(ctx context.Context) error { // Close implements the Executor Close interface. func (e *ExplainExec) Close() error { e.rows = nil + if e.analyzeExec != nil && !e.executed { + // Open(), but Next() is not called. + return e.analyzeExec.Close() + } return nil } diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 93ba9cb30cace..1b54221a224b5 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -57,16 +57,22 @@ func (s *seqTestSuite) TestPrepared(c *C) { tk.MustExec("create table prepare_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1)") tk.MustExec("insert prepare_test (c1) values (1),(2),(NULL)") - tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?'; set @a = 1; execute stmt_test_1 using @a;`) + tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?';`) + tk.MustExec(`set @a = 1;`) + tk.MustExec(`execute stmt_test_1 using @a;`) tk.MustExec(`prepare stmt_test_2 from 'select 1'`) // Prepare multiple statement is not allowed. _, err = tk.Exec(`prepare stmt_test_3 from 'select id from prepare_test where id > ?;select id from prepare_test where id > ?;'`) c.Assert(executor.ErrPrepareMulti.Equal(err), IsTrue) + // The variable count does not match. - _, err = tk.Exec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?'; set @a = 1; execute stmt_test_4 using @a;`) + tk.MustExec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?';`) + tk.MustExec(`set @a = 1;`) + _, err = tk.Exec(`execute stmt_test_4 using @a;`) c.Assert(plannercore.ErrWrongParamCount.Equal(err), IsTrue) // Prepare and deallocate prepared statement immediately. - tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?'; deallocate prepare stmt_test_5;`) + tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?';`) + tk.MustExec(`deallocate prepare stmt_test_5;`) // Statement not found. _, err = tk.Exec("deallocate prepare stmt_test_5") @@ -166,8 +172,11 @@ func (s *seqTestSuite) TestPrepared(c *C) { c.Assert(err, IsNil) // Should success as the changed schema do not affect the prepared statement. - _, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)}) + rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)}) c.Assert(err, IsNil) + if rs != nil { + rs.Close() + } // Drop a column so the prepared statement become invalid. query = "select c1, c2 from prepare_test where c1 = ?" diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 8c4dbcd40350f..0b021c39ab1ef 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -211,12 +211,15 @@ func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) { } else if testcase.preSec > 0 { c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance", fmt.Sprintf(`return(%d)`, time.Now().Unix()-testcase.preSec)), IsNil) } - _, err := tk.Exec(testcase.sql) + rs, err := tk.Exec(testcase.sql) if len(testcase.errorStr) != 0 { c.Assert(err, ErrorMatches, testcase.errorStr) continue } c.Assert(err, IsNil, Commentf("sql:%s, error stack %v", testcase.sql, errors.ErrorStack(err))) + if rs != nil { + rs.Close() + } if testcase.expectPhysicalTS > 0 { c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO"), IsNil) } else if testcase.preSec > 0 { @@ -696,8 +699,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c * c.Log(testcase.name) tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) if testcase.isValidate { - _, err := tk.Exec(testcase.sql) - c.Assert(err, IsNil) + tk.MustExec(testcase.sql) } else { err := tk.ExecToErr(testcase.sql) c.Assert(err, NotNil) @@ -706,8 +708,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c * tk.MustExec("commit") tk.MustExec("set transaction read only as of timestamp NOW(3);") if testcase.isValidate { - _, err := tk.Exec(testcase.sql) - c.Assert(err, IsNil) + tk.MustExec(testcase.sql) } else { err := tk.ExecToErr(testcase.sql) c.Assert(err, NotNil) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index e769cf53d8c09..c63ac9b6b6a53 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -350,8 +350,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c * tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int primary key)") - _, err := tk.Exec("select * from t") - c.Assert(err, IsNil) + tk.MustExec("select * from t") tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") @@ -360,7 +359,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c * "TableReader 10000.00 root data:TableFullScan", "└─TableFullScan 10000.00 cop[tikv] table:stats_meta keep order:false, stats:pseudo")) - _, err = tk.Exec("select * from t") + _, err := tk.Exec("select * from t") c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.") diff --git a/session/bootstrap.go b/session/bootstrap.go index 2572189fae957..372badc39932b 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1378,9 +1378,6 @@ func upgradeToVer67(s Session, ver int64) { if err != nil { logutil.BgLogger().Fatal("upgradeToVer67 error", zap.Error(err)) } - if rs != nil { - defer terror.Call(rs.Close) - } req := rs.NewChunk() iter := chunk.NewIterator4Chunk(req) p := parser.New() @@ -1395,6 +1392,7 @@ func upgradeToVer67(s Session, ver int64) { } updateBindInfo(iter, p, bindMap) } + terror.Call(rs.Close) mustExecute(s, "DELETE FROM mysql.bind_info where source != 'builtin'") for original, bind := range bindMap { diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 2349a312fe042..26208f6e91aaa 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -60,10 +60,14 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) { c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "anyhost"}, []byte(""), []byte("")), IsTrue) mustExecSQL(c, se, "USE test;") // Check privilege tables. - mustExecSQL(c, se, "SELECT * from mysql.global_priv;") - mustExecSQL(c, se, "SELECT * from mysql.db;") - mustExecSQL(c, se, "SELECT * from mysql.tables_priv;") - mustExecSQL(c, se, "SELECT * from mysql.columns_priv;") + rs := mustExecSQL(c, se, "SELECT * from mysql.global_priv;") + c.Assert(rs.Close(), IsNil) + rs = mustExecSQL(c, se, "SELECT * from mysql.db;") + c.Assert(rs.Close(), IsNil) + rs = mustExecSQL(c, se, "SELECT * from mysql.tables_priv;") + c.Assert(rs.Close(), IsNil) + rs = mustExecSQL(c, se, "SELECT * from mysql.columns_priv;") + c.Assert(rs.Close(), IsNil) // Check privilege tables. r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;") c.Assert(r, NotNil) diff --git a/session/session_test.go b/session/session_test.go index 4357832d450b1..ec44c00c0ae42 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -1328,8 +1328,9 @@ func (s *testSessionSuite) TestPrepare(c *C) { c.Assert(id, Equals, uint32(1)) c.Assert(ps, Equals, 1) tk.MustExec(`set @a=1`) - _, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")}) + rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")}) c.Assert(err, IsNil) + rs.Close() err = tk.Se.DropPreparedStmt(id) c.Assert(err, IsNil) @@ -1349,7 +1350,7 @@ func (s *testSessionSuite) TestPrepare(c *C) { tk.MustExec("insert multiexec values (1, 1), (2, 2)") id, _, _, err = tk.Se.PrepareStmt("select a from multiexec where b = ? order by b") c.Assert(err, IsNil) - rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)}) + rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)}) c.Assert(err, IsNil) rs.Close() rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(2)}) @@ -1963,17 +1964,26 @@ func (s *testSessionSuite3) TestCaseInsensitive(c *C) { tk.MustExec("create table T (a text, B int)") tk.MustExec("insert t (A, b) values ('aaa', 1)") - rs, _ := tk.Exec("select * from t") + rs, err := tk.Exec("select * from t") + c.Assert(err, IsNil) fields := rs.Fields() c.Assert(fields[0].ColumnAsName.O, Equals, "a") c.Assert(fields[1].ColumnAsName.O, Equals, "B") - rs, _ = tk.Exec("select A, b from t") + rs.Close() + + rs, err = tk.Exec("select A, b from t") + c.Assert(err, IsNil) fields = rs.Fields() c.Assert(fields[0].ColumnAsName.O, Equals, "A") c.Assert(fields[1].ColumnAsName.O, Equals, "b") - rs, _ = tk.Exec("select a as A from t where A > 0") + rs.Close() + + rs, err = tk.Exec("select a as A from t where A > 0") + c.Assert(err, IsNil) fields = rs.Fields() c.Assert(fields[0].ColumnAsName.O, Equals, "A") + rs.Close() + tk.MustExec("update T set b = B + 1") tk.MustExec("update T set B = b + 1") tk.MustQuery("select b from T").Check(testkit.Rows("3"))