Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: only create TemporaryTableAttachedInfoSchema if needed #37196

Merged
merged 11 commits into from
Aug 18, 2022
9 changes: 8 additions & 1 deletion ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3801,8 +3801,15 @@ func TestCreateTempTableInTxn(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("begin")
// new created temporary table should be visible
tk.MustExec("create temporary table t1(id int)")
tk.MustQuery("select * from t1")
tk.MustQuery("select * from t1").Check(testkit.Rows())
// new inserted data should be visible
tk.MustExec("insert into t1 values(123)")
tk.MustQuery("select * from t1").Check(testkit.Rows("123"))
// truncate table will clear data but table still visible
tk.MustExec("truncate table t1")
tk.MustQuery("select * from t1").Check(testkit.Rows())
tk.MustExec("commit")

tk1 := testkit.NewTestKit(t, store)
Expand Down
8 changes: 8 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

// OnLocalTemporaryTableCreated is the hook that should be called when a temporary table created.
// The provider will update its state then
func (m *txnManager) OnLocalTemporaryTableCreated() {
if m.ctxProvider != nil {
m.ctxProvider.OnLocalTemporaryTableCreated()
}
}

func (m *txnManager) AdviseWarmup() error {
if m.ctxProvider != nil {
return m.ctxProvider.AdviseWarmup()
Expand Down
4 changes: 4 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// OnLocalTemporaryTableCreated is the hook that should be called when a local temporary table created.
OnLocalTemporaryTableCreated()
// ActivateTxn activates the transaction.
ActivateTxn() (kv.Transaction, error)
}
Expand Down Expand Up @@ -177,6 +179,8 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// OnLocalTemporaryTableCreated is the hook that should be called when a local temporary table created.
OnLocalTemporaryTableCreated()
// ActivateTxn activates the transaction.
ActivateTxn() (kv.Transaction, error)
// GetCurrentStmt returns the current statement node
Expand Down
16 changes: 15 additions & 1 deletion sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error {
return nil
}

// OnLocalTemporaryTableCreated is the hook that should be called when a local temporary table created.
func (p *baseTxnContextProvider) OnLocalTemporaryTableCreated() {
p.infoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, p.infoSchema)
p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema
if p.txn != nil && p.txn.Valid() {
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
p.txn.GetSnapshot().SetOption(kv.SnapInterceptor, interceptor)
}
}
}

// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *baseTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
switch point {
Expand Down Expand Up @@ -252,7 +263,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if readReplicaType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, readReplicaType)
}
txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema))

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}

if sessVars.StmtCtx.WeakConsistency {
txn.SetOption(kv.IsolationLevel, kv.RC)
Expand Down
8 changes: 7 additions & 1 deletion sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
TxnScope: txnScope,
},
}
txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, is))

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}

p.is = is
err = p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, "")
Expand Down Expand Up @@ -228,3 +231,6 @@ func (p *StalenessTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot,
func (p *StalenessTxnContextProvider) GetSnapshotWithStmtForUpdateTS() (kv.Snapshot, error) {
return nil, errors.New("GetSnapshotWithStmtForUpdateTS not supported for stalenessTxnProvider")
}

// OnLocalTemporaryTableCreated will not be called for StalenessTxnContextProvider
func (p *StalenessTxnContextProvider) OnLocalTemporaryTableCreated() {}
8 changes: 7 additions & 1 deletion table/temptable/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -55,7 +56,12 @@ func (d *temporaryTableDDL) CreateLocalTemporaryTable(db *model.DBInfo, info *mo
return err
}

return ensureLocalTemporaryTables(d.sctx).AddTable(db, tbl)
if err = ensureLocalTemporaryTables(d.sctx).AddTable(db, tbl); err != nil {
return err
}

sessiontxn.GetTxnManager(d.sctx).OnLocalTemporaryTableCreated()
return nil
}

func (d *temporaryTableDDL) DropLocalTemporaryTable(schema model.CIStr, tblName model.CIStr) error {
Expand Down
14 changes: 11 additions & 3 deletions table/temptable/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (

// AttachLocalTemporaryTableInfoSchema attach local temporary table information schema to is
func AttachLocalTemporaryTableInfoSchema(sctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
localTemporaryTables := getLocalTemporaryTables(sctx)
if localTemporaryTables == nil {
return is
}

if _, ok := is.(*infoschema.TemporaryTableAttachedInfoSchema); ok {
return is
}

localTemporaryTables := getLocalTemporaryTables(sctx)
return &infoschema.TemporaryTableAttachedInfoSchema{
InfoSchema: is,
LocalTemporaryTables: localTemporaryTables,
Expand All @@ -42,8 +46,12 @@ func DetachLocalTemporaryTableInfoSchema(is infoschema.InfoSchema) infoschema.In
}

func getLocalTemporaryTables(sctx sessionctx.Context) *infoschema.LocalTemporaryTables {
// Do not return nil so that new created tables can always be visited through the returned object.
return ensureLocalTemporaryTables(sctx)
localTemporaryTables := sctx.GetSessionVars().LocalTemporaryTables
if localTemporaryTables == nil {
return nil
}

return localTemporaryTables.(*infoschema.LocalTemporaryTables)
}

func ensureLocalTemporaryTables(sctx sessionctx.Context) *infoschema.LocalTemporaryTables {
Expand Down
7 changes: 6 additions & 1 deletion table/temptable/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ type TemporaryTableSnapshotInterceptor struct {

// SessionSnapshotInterceptor creates a new snapshot interceptor for temporary table data fetch
func SessionSnapshotInterceptor(sctx sessionctx.Context, is infoschema.InfoSchema) kv.SnapshotInterceptor {
sessionData := getSessionData(sctx)
if sessionData == nil {
return nil
}

return NewTemporaryTableSnapshotInterceptor(
is,
getSessionData(sctx),
sessionData,
)
}

Expand Down