Skip to content

Commit

Permalink
*: only create snapshot interceptor for temporary table when needed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Aug 22, 2022
1 parent 8b5b724 commit 9af0f03
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 3 deletions.
18 changes: 17 additions & 1 deletion executor/temporary_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@ func TestTemporaryTableNoNetwork(t *testing.T) {
t.Run("global", func(t *testing.T) {
assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) {
tk.MustExec("create global temporary table tmp_t (id int primary key, a int, b int, index(a)) on commit delete rows")
tk.MustExec("begin")
})
})

t.Run("global create and then truncate", func(t *testing.T) {
assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) {
tk.MustExec("create global temporary table tmp_t (id int primary key, a int, b int, index(a)) on commit delete rows")
tk.MustExec("truncate table tmp_t")
tk.MustExec("begin")
})
})

t.Run("local", func(t *testing.T) {
assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) {
tk.MustExec("create temporary table tmp_t (id int primary key, a int, b int, index(a))")
tk.MustExec("begin")
})
})

t.Run("local and create table inside txn", func(t *testing.T) {
assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) {
tk.MustExec("begin")
tk.MustExec("create temporary table tmp_t (id int primary key, a int, b int, index(a))")
})
})
}
Expand All @@ -61,7 +78,6 @@ func assertTemporaryTableNoNetwork(t *testing.T, createTable func(*testkit.TestK
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"))
}()

tk.MustExec("begin")
tk.MustExec("insert into tmp_t values (1, 1, 1)")
tk.MustExec("insert into tmp_t values (2, 2, 2)")

Expand Down
19 changes: 19 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,10 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i
})
b.is.sortedTablesBuckets[bucketIdx] = sortedTbls

if tblInfo.TempTableType != model.TempTableNone {
b.addTemporaryTable(tableID)
}

newTbl, ok := b.is.TableByID(tableID)
if ok {
dbInfo.Tables = append(dbInfo.Tables, newTbl.Meta())
Expand Down Expand Up @@ -750,6 +754,11 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
// Remove the table in sorted table slice.
b.is.sortedTablesBuckets[bucketIdx] = append(sortedTbls[0:idx], sortedTbls[idx+1:]...)

// Remove the table in temporaryTables
if b.is.temporaryTableIDs != nil {
delete(b.is.temporaryTableIDs, tableID)
}

// The old DBInfo still holds a reference to old table info, we need to remove it.
for i, tblInfo := range dbInfo.Tables {
if tblInfo.ID == tableID {
Expand Down Expand Up @@ -895,10 +904,20 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF
schTbls.tables[t.Name.L] = tbl
sortedTbls := b.is.sortedTablesBuckets[tableBucketIdx(t.ID)]
b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTbls, tbl)
if tblInfo := tbl.Meta(); tblInfo.TempTableType != model.TempTableNone {
b.addTemporaryTable(tblInfo.ID)
}
}
return nil
}

func (b *Builder) addTemporaryTable(tblID int64) {
if b.is.temporaryTableIDs == nil {
b.is.temporaryTableIDs = make(map[int64]struct{})
}
b.is.temporaryTableIDs[tblID] = struct{}{}
}

type virtualTableDriver struct {
*model.DBInfo
TableFromMeta tableFromMetaFunc
Expand Down
15 changes: 15 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type InfoSchema interface {
AllPlacementBundles() []*placement.Bundle
// AllPlacementPolicies returns all placement policies
AllPlacementPolicies() []*model.PolicyInfo
// HasTemporaryTable returns whether information schema has temporary table
HasTemporaryTable() bool
}

type sortedTables []table.Table
Expand Down Expand Up @@ -94,6 +96,9 @@ type infoSchema struct {
// sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount).
sortedTablesBuckets []sortedTables

// temporaryTables stores the temporary table ids
temporaryTableIDs map[int64]struct{}

// schemaMetaVersion is the version of schema, and we should check version when change schema.
schemaMetaVersion int64
}
Expand Down Expand Up @@ -302,6 +307,11 @@ func (is *infoSchema) FindTableByPartitionID(partitionID int64) (table.Table, *m
return nil, nil, nil
}

// HasTemporaryTable returns whether information schema has temporary table
func (is *infoSchema) HasTemporaryTable() bool {
return len(is.temporaryTableIDs) != 0
}

func (is *infoSchema) Clone() (result []*model.DBInfo) {
for _, v := range is.schemaMap {
result = append(result, v.dbInfo.Clone())
Expand Down Expand Up @@ -567,3 +577,8 @@ func (ts *TemporaryTableAttachedInfoSchema) SchemaByTable(tableInfo *model.Table

return ts.InfoSchema.SchemaByTable(tableInfo)
}

// HasTemporaryTable returns whether information schema has temporary table
func (ts *TemporaryTableAttachedInfoSchema) HasTemporaryTable() bool {
return ts.LocalTemporaryTables.Count() > 0 || ts.InfoSchema.HasTemporaryTable()
}
124 changes: 124 additions & 0 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,130 @@ func genGlobalID(store kv.Storage) (int64, error) {
return globalID, errors.Trace(err)
}

func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
require.False(t, is.HasTemporaryTable())
db, ok := is.SchemaByName(model.NewCIStr("test"))
require.True(t, ok)

doChange := func(changes ...func(m *meta.Meta, builder *infoschema.Builder)) infoschema.InfoSchema {
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{db}, nil, is.SchemaMetaVersion())
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
for _, change := range changes {
change(m, builder)
}
return nil
})
require.NoError(t, err)
return builder.Build()
}

createGlobalTemporaryTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) {
return func(m *meta.Meta, builder *infoschema.Builder) {
err := m.CreateTableOrView(db.ID, &model.TableInfo{
ID: tblID,
TempTableType: model.TempTableGlobal,
State: model.StatePublic,
})
require.NoError(t, err)
_, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: db.ID, TableID: tblID})
require.NoError(t, err)
}
}

dropTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) {
return func(m *meta.Meta, builder *infoschema.Builder) {
err := m.DropTableOrView(db.ID, tblID)
require.NoError(t, err)
_, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionDropTable, SchemaID: db.ID, TableID: tblID})
require.NoError(t, err)
}
}

truncateGlobalTemporaryTableChange := func(tblID, newTblID int64) func(m *meta.Meta, builder *infoschema.Builder) {
return func(m *meta.Meta, builder *infoschema.Builder) {
err := m.DropTableOrView(db.ID, tblID)
require.NoError(t, err)

err = m.CreateTableOrView(db.ID, &model.TableInfo{
ID: newTblID,
TempTableType: model.TempTableGlobal,
State: model.StatePublic,
})
require.NoError(t, err)
_, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionTruncateTable, SchemaID: db.ID, OldTableID: tblID, TableID: newTblID})
require.NoError(t, err)
}
}

alterTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) {
return func(m *meta.Meta, builder *infoschema.Builder) {
_, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: db.ID, TableID: tblID})
require.NoError(t, err)
}
}

// create table
tbID, err := genGlobalID(store)
require.NoError(t, err)
newIS := doChange(
createGlobalTemporaryTableChange(tbID),
)
require.True(t, newIS.HasTemporaryTable())

// full load
newDB, ok := newIS.SchemaByName(model.NewCIStr("test"))
require.True(t, ok)
builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.SchemaMetaVersion())
require.NoError(t, err)
require.True(t, builder.Build().HasTemporaryTable())

// create and then drop
tbID, err = genGlobalID(store)
require.NoError(t, err)
require.False(t, doChange(
createGlobalTemporaryTableChange(tbID),
dropTableChange(tbID),
).HasTemporaryTable())

// create and then alter
tbID, err = genGlobalID(store)
require.NoError(t, err)
require.True(t, doChange(
createGlobalTemporaryTableChange(tbID),
alterTableChange(tbID),
).HasTemporaryTable())

// create and truncate
tbID, err = genGlobalID(store)
require.NoError(t, err)
newTbID, err := genGlobalID(store)
require.NoError(t, err)
require.True(t, doChange(
createGlobalTemporaryTableChange(tbID),
truncateGlobalTemporaryTableChange(tbID, newTbID),
).HasTemporaryTable())

// create two and drop one
tbID, err = genGlobalID(store)
require.NoError(t, err)
tbID2, err := genGlobalID(store)
require.NoError(t, err)
require.True(t, doChange(
createGlobalTemporaryTableChange(tbID),
createGlobalTemporaryTableChange(tbID2),
dropTableChange(tbID),
).HasTemporaryTable())
}

func TestBuildBundle(t *testing.T) {
store := testkit.CreateMockStore(t)

Expand Down
10 changes: 9 additions & 1 deletion sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error {
func (p *baseTxnContextProvider) OnLocalTemporaryTableCreated() {
p.infoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, p.infoSchema)
p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema
if p.txn != nil && p.txn.Valid() {
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
p.txn.SetOption(kv.SnapInterceptor, interceptor)
}
}
}

// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
Expand Down Expand Up @@ -258,7 +263,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if readReplicaType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, readReplicaType)
}
txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema))

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}

if sessVars.StmtCtx.WeakConsistency {
txn.SetOption(kv.IsolationLevel, kv.RC)
Expand Down
5 changes: 4 additions & 1 deletion sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
TxnScope: txnScope,
},
}
txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, is))

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil {
txn.SetOption(kv.SnapInterceptor, interceptor)
}

p.is = is
err = p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, "")
Expand Down
4 changes: 4 additions & 0 deletions table/temptable/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type TemporaryTableSnapshotInterceptor struct {

// SessionSnapshotInterceptor creates a new snapshot interceptor for temporary table data fetch
func SessionSnapshotInterceptor(sctx sessionctx.Context, is infoschema.InfoSchema) kv.SnapshotInterceptor {
if !is.HasTemporaryTable() {
return nil
}

return NewTemporaryTableSnapshotInterceptor(
is,
getSessionData(sctx),
Expand Down

0 comments on commit 9af0f03

Please sign in to comment.