Skip to content

Commit

Permalink
*: fix many leaks of the test case (#26909)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Aug 6, 2021
1 parent 300f159 commit a8adc4c
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 42 deletions.
25 changes: 20 additions & 5 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2, IsNil)
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql, sql, f)
}
Expand All @@ -1064,8 +1065,9 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: oldCol is a dependent column 'a' for generated column")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand All @@ -1076,8 +1078,9 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) {
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:8200]Unsupported modify column: this column has primary key flag")
_, err := s.se.Execute(context.Background(), "select * from t")
rs, err := s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
c.Assert(rs[0].Close(), IsNil)
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}
Expand Down Expand Up @@ -1361,12 +1364,24 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
wg.Add(2)
go func() {
defer wg.Done()
_, err1 = se.Execute(context.Background(), sql1)
var rss []sqlexec.RecordSet
rss, err1 = se.Execute(context.Background(), sql1)
if err1 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()
go func() {
defer wg.Done()
<-ch
_, err2 = se1.Execute(context.Background(), sql2)
var rss []sqlexec.RecordSet
rss, err2 = se1.Execute(context.Background(), sql2)
if err2 == nil && len(rss) > 0 {
for _, rs := range rss {
c.Assert(rs.Close(), IsNil)
}
}
}()

wg.Wait()
Expand Down
12 changes: 6 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -581,12 +587,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

var err error
defer func() {
terror.Log(e.Close())
a.logAudit()
}()

err = Next(ctx, e, newFirstChunk(e))
if err != nil {
return nil, err
Expand Down
29 changes: 23 additions & 6 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -298,14 +299,17 @@ func (e *HashAggExec) Close() error {

// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic here, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
Expand Down Expand Up @@ -344,6 +348,15 @@ func (e *HashAggExec) initForUnparallelExec() {
}
}

func closeBaseExecutor(b *baseExecutor) {
if r := recover(); r != nil {
// Release the resource, but throw the panic again and let the top level handle it.
terror.Log(b.Close())
logutil.BgLogger().Warn("panic in Open(), close base executor and throw exception again")
panic(r)
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
Expand Down Expand Up @@ -1218,14 +1231,18 @@ type StreamAggExec struct {

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})

if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic in Open, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)

e.childResult = newFirstChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/pingcap/tidb/util/tableutil"
)

var _ = Suite(&testExecSuite{})
var _ = SerialSuites(&testExecSuite{})
var _ = SerialSuites(&testExecSerialSuite{})

// Note: it's a tricky way to export the `inspectionSummaryRules` and `inspectionRules` for unit test but invisible for normal code
Expand Down
4 changes: 4 additions & 0 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (e *ExplainExec) Open(ctx context.Context) error {
// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
e.rows = nil
if e.analyzeExec != nil && !e.executed {
// Open(), but Next() is not called.
return e.analyzeExec.Close()
}
return nil
}

Expand Down
17 changes: 13 additions & 4 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ func (s *seqTestSuite) TestPrepared(c *C) {
tk.MustExec("create table prepare_test (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1)")
tk.MustExec("insert prepare_test (c1) values (1),(2),(NULL)")

tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?'; set @a = 1; execute stmt_test_1 using @a;`)
tk.MustExec(`prepare stmt_test_1 from 'select id from prepare_test where id > ?';`)
tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
tk.MustExec(`prepare stmt_test_2 from 'select 1'`)
// Prepare multiple statement is not allowed.
_, err = tk.Exec(`prepare stmt_test_3 from 'select id from prepare_test where id > ?;select id from prepare_test where id > ?;'`)
c.Assert(executor.ErrPrepareMulti.Equal(err), IsTrue)

// The variable count does not match.
_, err = tk.Exec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?'; set @a = 1; execute stmt_test_4 using @a;`)
tk.MustExec(`prepare stmt_test_4 from 'select id from prepare_test where id > ? and id < ?';`)
tk.MustExec(`set @a = 1;`)
_, err = tk.Exec(`execute stmt_test_4 using @a;`)
c.Assert(plannercore.ErrWrongParamCount.Equal(err), IsTrue)
// Prepare and deallocate prepared statement immediately.
tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?'; deallocate prepare stmt_test_5;`)
tk.MustExec(`prepare stmt_test_5 from 'select id from prepare_test where id > ?';`)
tk.MustExec(`deallocate prepare stmt_test_5;`)

// Statement not found.
_, err = tk.Exec("deallocate prepare stmt_test_5")
Expand Down Expand Up @@ -166,8 +172,11 @@ func (s *seqTestSuite) TestPrepared(c *C) {
c.Assert(err, IsNil)

// Should success as the changed schema do not affect the prepared statement.
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
if rs != nil {
rs.Close()
}

// Drop a column so the prepared statement become invalid.
query = "select c1, c2 from prepare_test where c1 = ?"
Expand Down
11 changes: 6 additions & 5 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,15 @@ func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) {
} else if testcase.preSec > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance", fmt.Sprintf(`return(%d)`, time.Now().Unix()-testcase.preSec)), IsNil)
}
_, err := tk.Exec(testcase.sql)
rs, err := tk.Exec(testcase.sql)
if len(testcase.errorStr) != 0 {
c.Assert(err, ErrorMatches, testcase.errorStr)
continue
}
c.Assert(err, IsNil, Commentf("sql:%s, error stack %v", testcase.sql, errors.ErrorStack(err)))
if rs != nil {
rs.Close()
}
if testcase.expectPhysicalTS > 0 {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO"), IsNil)
} else if testcase.preSec > 0 {
Expand Down Expand Up @@ -696,8 +699,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *
c.Log(testcase.name)
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
if testcase.isValidate {
_, err := tk.Exec(testcase.sql)
c.Assert(err, IsNil)
tk.MustExec(testcase.sql)
} else {
err := tk.ExecToErr(testcase.sql)
c.Assert(err, NotNil)
Expand All @@ -706,8 +708,7 @@ func (s *testStaleTxnSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *
tk.MustExec("commit")
tk.MustExec("set transaction read only as of timestamp NOW(3);")
if testcase.isValidate {
_, err := tk.Exec(testcase.sql)
c.Assert(err, IsNil)
tk.MustExec(testcase.sql)
} else {
err := tk.ExecToErr(testcase.sql)
c.Assert(err, NotNil)
Expand Down
5 changes: 2 additions & 3 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c *
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")

_, err := tk.Exec("select * from t")
c.Assert(err, IsNil)
tk.MustExec("select * from t")

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")

Expand All @@ -360,7 +359,7 @@ func (s *testIntegrationSerialSuite) TestNoneAccessPathsFoundByIsolationRead(c *
"TableReader 10000.00 root data:TableFullScan",
"└─TableFullScan 10000.00 cop[tikv] table:stats_meta keep order:false, stats:pseudo"))

_, err = tk.Exec("select * from t")
_, err := tk.Exec("select * from t")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.")

Expand Down
4 changes: 1 addition & 3 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,9 +1378,6 @@ func upgradeToVer67(s Session, ver int64) {
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer67 error", zap.Error(err))
}
if rs != nil {
defer terror.Call(rs.Close)
}
req := rs.NewChunk()
iter := chunk.NewIterator4Chunk(req)
p := parser.New()
Expand All @@ -1395,6 +1392,7 @@ func upgradeToVer67(s Session, ver int64) {
}
updateBindInfo(iter, p, bindMap)
}
terror.Call(rs.Close)

mustExecute(s, "DELETE FROM mysql.bind_info where source != 'builtin'")
for original, bind := range bindMap {
Expand Down
12 changes: 8 additions & 4 deletions session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func (s *testBootstrapSuite) TestBootstrap(c *C) {
c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "anyhost"}, []byte(""), []byte("")), IsTrue)
mustExecSQL(c, se, "USE test;")
// Check privilege tables.
mustExecSQL(c, se, "SELECT * from mysql.global_priv;")
mustExecSQL(c, se, "SELECT * from mysql.db;")
mustExecSQL(c, se, "SELECT * from mysql.tables_priv;")
mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
rs := mustExecSQL(c, se, "SELECT * from mysql.global_priv;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.db;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.tables_priv;")
c.Assert(rs.Close(), IsNil)
rs = mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
c.Assert(rs.Close(), IsNil)
// Check privilege tables.
r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;")
c.Assert(r, NotNil)
Expand Down
20 changes: 15 additions & 5 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,8 +1328,9 @@ func (s *testSessionSuite) TestPrepare(c *C) {
c.Assert(id, Equals, uint32(1))
c.Assert(ps, Equals, 1)
tk.MustExec(`set @a=1`)
_, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")})
rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum("1")})
c.Assert(err, IsNil)
rs.Close()
err = tk.Se.DropPreparedStmt(id)
c.Assert(err, IsNil)

Expand All @@ -1349,7 +1350,7 @@ func (s *testSessionSuite) TestPrepare(c *C) {
tk.MustExec("insert multiexec values (1, 1), (2, 2)")
id, _, _, err = tk.Se.PrepareStmt("select a from multiexec where b = ? order by b")
c.Assert(err, IsNil)
rs, err := tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)})
rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
rs.Close()
rs, err = tk.Se.ExecutePreparedStmt(ctx, id, []types.Datum{types.NewDatum(2)})
Expand Down Expand Up @@ -1963,17 +1964,26 @@ func (s *testSessionSuite3) TestCaseInsensitive(c *C) {

tk.MustExec("create table T (a text, B int)")
tk.MustExec("insert t (A, b) values ('aaa', 1)")
rs, _ := tk.Exec("select * from t")
rs, err := tk.Exec("select * from t")
c.Assert(err, IsNil)
fields := rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "a")
c.Assert(fields[1].ColumnAsName.O, Equals, "B")
rs, _ = tk.Exec("select A, b from t")
rs.Close()

rs, err = tk.Exec("select A, b from t")
c.Assert(err, IsNil)
fields = rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "A")
c.Assert(fields[1].ColumnAsName.O, Equals, "b")
rs, _ = tk.Exec("select a as A from t where A > 0")
rs.Close()

rs, err = tk.Exec("select a as A from t where A > 0")
c.Assert(err, IsNil)
fields = rs.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "A")
rs.Close()

tk.MustExec("update T set b = B + 1")
tk.MustExec("update T set B = b + 1")
tk.MustQuery("select b from T").Check(testkit.Rows("3"))
Expand Down

0 comments on commit a8adc4c

Please sign in to comment.