Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix wrong prepare plan after isolation read changed #16293

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
package executor_test

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -53,3 +60,45 @@ func (s *testSuite1) TestIgnorePlanCache(c *C) {
tk.MustExec("execute stmt using @ignore_plan_doma")
c.Assert(tk.Se.GetSessionVars().StmtCtx.UseCache, IsFalse)
}

func (s *testSuite1) TestPrepareStmtAfterIsolationReadChange(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost", CurrentUser: true, AuthUsername: "root", AuthHostname: "%"}, nil, []byte("012345678901234567890"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
// create virtual tiflash replica.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@session.tidb_isolation_read_engines='tikv'")
tk.MustExec("prepare stmt from \"select * from t\"")
tk.MustQuery("execute stmt")
tkProcess := tk.Se.ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(rows[len(rows)-1][2], Equals, "cop[tikv]")

tk.MustExec("set @@session.tidb_isolation_read_engines='tiflash'")
tk.MustExec("execute stmt")
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(rows[len(rows)-1][2], Equals, "cop[tiflash]")

c.Assert(len(tk.Se.GetSessionVars().PreparedStmts), Equals, 1)
c.Assert(tk.Se.GetSessionVars().PreparedStmts[1].(*plannercore.CachedPrepareStmt).NormalizedSQL, Equals, "select * from t")
c.Assert(tk.Se.GetSessionVars().PreparedStmts[1].(*plannercore.CachedPrepareStmt).NormalizedPlan, Equals, "")
}
54 changes: 37 additions & 17 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -63,13 +64,14 @@ func PreparedPlanCacheEnabled() bool {
}

type pstmtPlanCacheKey struct {
database string
connID uint64
pstmtID uint32
snapshot uint64
schemaVersion int64
sqlMode mysql.SQLMode
timezoneOffset int
database string
connID uint64
pstmtID uint32
snapshot uint64
schemaVersion int64
sqlMode mysql.SQLMode
timezoneOffset int
isolationReadEngines map[kv.StoreType]struct{}

hash []byte
}
Expand All @@ -79,7 +81,7 @@ func (key *pstmtPlanCacheKey) Hash() []byte {
if len(key.hash) == 0 {
var (
dbBytes = hack.Slice(key.database)
bufferSize = len(dbBytes) + 8*6
bufferSize = len(dbBytes) + 8*6 + 3*8
)
if key.hash == nil {
key.hash = make([]byte, 0, bufferSize)
Expand All @@ -91,19 +93,32 @@ func (key *pstmtPlanCacheKey) Hash() []byte {
key.hash = codec.EncodeInt(key.hash, key.schemaVersion)
key.hash = codec.EncodeInt(key.hash, int64(key.sqlMode))
key.hash = codec.EncodeInt(key.hash, int64(key.timezoneOffset))
if _, ok := key.isolationReadEngines[kv.TiDB]; ok {
key.hash = append(key.hash, kv.TiDB.Name()...)
}
if _, ok := key.isolationReadEngines[kv.TiKV]; ok {
key.hash = append(key.hash, kv.TiKV.Name()...)
}
if _, ok := key.isolationReadEngines[kv.TiFlash]; ok {
key.hash = append(key.hash, kv.TiFlash.Name()...)
}
}
return key.hash
}

// SetPstmtIDSchemaVersion implements PstmtCacheKeyMutator interface to change pstmtID and schemaVersion of cacheKey.
// so we can reuse Key instead of new every time.
func SetPstmtIDSchemaVersion(key kvcache.Key, pstmtID uint32, schemaVersion int64) {
func SetPstmtIDSchemaVersion(key kvcache.Key, pstmtID uint32, schemaVersion int64, isolationReadEngines map[kv.StoreType]struct{}) {
psStmtKey, isPsStmtKey := key.(*pstmtPlanCacheKey)
if !isPsStmtKey {
return
}
psStmtKey.pstmtID = pstmtID
psStmtKey.schemaVersion = schemaVersion
psStmtKey.isolationReadEngines = make(map[kv.StoreType]struct{})
for k, v := range isolationReadEngines {
psStmtKey.isolationReadEngines[k] = v
}
psStmtKey.hash = psStmtKey.hash[:0]
}

Expand All @@ -113,15 +128,20 @@ func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, sch
if sessionVars.TimeZone != nil {
_, timezoneOffset = time.Now().In(sessionVars.TimeZone).Zone()
}
return &pstmtPlanCacheKey{
database: sessionVars.CurrentDB,
connID: sessionVars.ConnectionID,
pstmtID: pstmtID,
snapshot: sessionVars.SnapshotTS,
schemaVersion: schemaVersion,
sqlMode: sessionVars.SQLMode,
timezoneOffset: timezoneOffset,
key := &pstmtPlanCacheKey{
database: sessionVars.CurrentDB,
connID: sessionVars.ConnectionID,
pstmtID: pstmtID,
snapshot: sessionVars.SnapshotTS,
schemaVersion: schemaVersion,
sqlMode: sessionVars.SQLMode,
timezoneOffset: timezoneOffset,
isolationReadEngines: make(map[kv.StoreType]struct{}),
}
for k, v := range sessionVars.IsolationReadEngines {
key.isolationReadEngines[k] = v
}
return key
}

// PSTMTPlanCacheValue stores the cached Statement and StmtNode.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ func (s *testCacheSuite) SetUpSuite(c *C) {
func (s *testCacheSuite) TestCacheKey(c *C) {
defer testleak.AfterTest(c)()
key := NewPSTMTPlanCacheKey(s.ctx.GetSessionVars(), 1, 1)
c.Assert(key.Hash(), DeepEquals, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0})
c.Assert(key.Hash(), DeepEquals, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68})
}
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s *session) cleanRetryInfo() {
for i, stmtID := range retryInfo.DroppedPreparedStmtIDs {
if planCacheEnabled {
if i > 0 && preparedAst != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, preparedAst.SchemaVersion)
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
}
s.PreparedPlanCache().Delete(cacheKey)
}
Expand Down