Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…57919-to-release-7.5

Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Feb 12, 2025
2 parents d8845a8 + 4d157f6 commit ebcd25e
Show file tree
Hide file tree
Showing 22 changed files with 543 additions and 77 deletions.
24 changes: 12 additions & 12 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}

if e.table.Meta().TempTableType != model.TempTableNone {
e.dummy = true
}

return e, nil
}

Expand Down Expand Up @@ -3464,10 +3468,6 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down Expand Up @@ -3678,6 +3678,10 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(col.Index))
}

if e.table.Meta().TempTableType != model.TempTableNone {
e.dummy = true
}

return e, nil
}

Expand All @@ -3694,10 +3698,6 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) e
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
Expand Down Expand Up @@ -3872,6 +3872,10 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
e.handleCols = v.CommonHandleCols
e.primaryKeyIndex = tables.FindPrimaryIndex(tbl.Meta())
}

if e.table.Meta().TempTableType != model.TempTableNone {
e.dummy = true
}
return e, nil
}

Expand All @@ -3891,10 +3895,6 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return nil
}

if ret.table.Meta().TempTableType != model.TempTableNone {
ret.dummy = true
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)

ret.ranges = is.Ranges
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/temporary_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor_test

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -156,3 +157,22 @@ func assertTemporaryTableNoNetwork(t *testing.T, createTable func(*testkit.TestK
tk.MustExec("select * from tmp_t where id > 1 for update")
tk.MustExec("rollback")
}

func TestIssue58875(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists users, users1;")
tk.MustExec("CREATE GLOBAL TEMPORARY TABLE users ( id BIGINT, v1 int, v2 int, v3 int, v4 int, PRIMARY KEY(id), index v1_index(v1,v2,v3) ) ON COMMIT DELETE ROWS;")
tk.MustExec("create table users1(id int, value int, index index_value(value));")
tk.MustExec("insert into users1 values(1,2);")
tk.MustExec("begin;")
res := tk.MustQuery("explain analyze select /*+ inl_join(users) */ * from users use index(v1_index) where v1 in (select value from users1);").Rows()
for _, row := range res {
// if access object contains 'table:users', the execution info should be empty.
if strings.Contains(row[4].(string), "table:users") && !strings.Contains(row[4].(string), "table:users1") {
require.Len(t, row[5].(string), 0)
}
}
}
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
"//pkg/config",
"//pkg/planner/core/internal",
Expand Down
20 changes: 20 additions & 0 deletions pkg/planner/core/casetest/partition/integration_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,23 @@ func TestBatchPointGetPartitionForAccessObject(t *testing.T) {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
}

// Issue 58475
func TestGeneratedColumnWithPartition(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec(`
CREATE TABLE tp (
id int,
c1 int,
c2 int GENERATED ALWAYS AS (c1) VIRTUAL,
KEY idx (id)
) PARTITION BY RANGE (id)
(PARTITION p0 VALUES LESS THAN (0),
PARTITION p1 VALUES LESS THAN (10000))
`)
tk.MustExec(`INSERT INTO tp (id, c1) VALUES (0, 1)`)
tk.MustExec(`select /*+ FORCE_INDEX(tp, idx) */id from tp where c2 = 2 group by id having id in (0)`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestHashPartitionPruner(t *testing.T) {
tk.MustExec("create table t8(a int, b int) partition by hash(a) partitions 6;")
tk.MustExec("create table t9(a bit(1) default null, b int(11) default null) partition by hash(a) partitions 3;") //issue #22619
tk.MustExec("create table t10(a bigint unsigned) partition BY hash (a);")
tk.MustExec("create table t11(a int, b int) partition by hash(a + a + a + b) partitions 5")

var input []string
var output []struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
"explain format = 'brief' select * from t8 where (a <= 10 and a >= 8) or (a <= 13 and a >= 11) or (a <= 16 and a >= 14)",
"explain format = 'brief' select * from t8 where a < 12 and a > 9",
"explain format = 'brief' select * from t9",
"explain format = 'brief' select * from t10 where a between 0 AND 15218001646226433652"
"explain format = 'brief' select * from t10 where a between 0 AND 15218001646226433652",
"explain format = 'brief' select * from t11 where a is null",
"explain format = 'brief' select * from t11 where a is null and b = 2",
"explain format = 'brief' select * from t11 where a = 1 and b = 2"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@
"└─Selection 250.00 cop[tikv] ge(test_partition.t10.a, 0), le(test_partition.t10.a, 15218001646226433652)",
" └─TableFullScan 10000.00 cop[tikv] table:t10 keep order:false, stats:pseudo"
]
},
{
"SQL": "explain format = 'brief' select * from t11 where a is null",
"Result": [
"TableReader 10.00 root partition:all data:Selection",
"└─Selection 10.00 cop[tikv] isnull(test_partition.t11.a)",
" └─TableFullScan 10000.00 cop[tikv] table:t11 keep order:false, stats:pseudo"
]
},
{
"SQL": "explain format = 'brief' select * from t11 where a is null and b = 2",
"Result": [
"TableReader 0.01 root partition:p0 data:Selection",
"└─Selection 0.01 cop[tikv] eq(test_partition.t11.b, 2), isnull(test_partition.t11.a)",
" └─TableFullScan 10000.00 cop[tikv] table:t11 keep order:false, stats:pseudo"
]
},
{
"SQL": "explain format = 'brief' select * from t11 where a = 1 and b = 2",
"Result": [
"TableReader 0.01 root partition:p0 data:Selection",
"└─Selection 0.01 cop[tikv] eq(test_partition.t11.a, 1), eq(test_partition.t11.b, 2)",
" └─TableFullScan 10000.00 cop[tikv] table:t11 keep order:false, stats:pseudo"
]
}
]
},
Expand Down
39 changes: 26 additions & 13 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,16 +975,30 @@ func (ts *PhysicalTableScan) ResolveCorrelatedColumns() ([]*ranger.Range, error)
// ExpandVirtualColumn expands the virtual column's dependent columns to ts's schema and column.
func ExpandVirtualColumn(columns []*model.ColumnInfo, schema *expression.Schema,
colsInfo []*model.ColumnInfo) []*model.ColumnInfo {
copyColumn := make([]*model.ColumnInfo, len(columns))
copy(copyColumn, columns)
var extraColumn *expression.Column
var extraColumnModel *model.ColumnInfo
if schema.Columns[len(schema.Columns)-1].ID == model.ExtraHandleID {
extraColumn = schema.Columns[len(schema.Columns)-1]
extraColumnModel = copyColumn[len(copyColumn)-1]
schema.Columns = schema.Columns[:len(schema.Columns)-1]
copyColumn = copyColumn[:len(copyColumn)-1]
copyColumn := make([]*model.ColumnInfo, 0, len(columns))
copyColumn = append(copyColumn, columns...)

oldNumColumns := len(schema.Columns)
numExtraColumns := 0
for i := oldNumColumns - 1; i >= 0; i-- {
cid := schema.Columns[i].ID
// Move extra columns to the end.
// ExtraRowChecksumID is ignored here since it's treated as an ordinary column.
// https://github.com/pingcap/tidb/blob/3c407312a986327bc4876920e70fdd6841b8365f/pkg/util/rowcodec/decoder.go#L206-L222
if cid != model.ExtraHandleID && cid != model.ExtraPhysTblID {
break
}
numExtraColumns++
}

extraColumns := make([]*expression.Column, numExtraColumns)
copy(extraColumns, schema.Columns[oldNumColumns-numExtraColumns:])
schema.Columns = schema.Columns[:oldNumColumns-numExtraColumns]

extraColumnModels := make([]*model.ColumnInfo, numExtraColumns)
copy(extraColumnModels, copyColumn[len(copyColumn)-numExtraColumns:])
copyColumn = copyColumn[:len(copyColumn)-numExtraColumns]

schemaColumns := schema.Columns
for _, col := range schemaColumns {
if col.VirtualExpr == nil {
Expand All @@ -999,10 +1013,9 @@ func ExpandVirtualColumn(columns []*model.ColumnInfo, schema *expression.Schema,
}
}
}
if extraColumn != nil {
schema.Columns = append(schema.Columns, extraColumn)
copyColumn = append(copyColumn, extraColumnModel) // nozero
}

schema.Columns = append(schema.Columns, extraColumns...)
copyColumn = append(copyColumn, extraColumnModels...)
return copyColumn
}

Expand Down
23 changes: 15 additions & 8 deletions pkg/planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,18 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo,
func getPartColumnsForHashPartition(hashExpr expression.Expression) ([]*expression.Column, []int) {
partCols := expression.ExtractColumns(hashExpr)
colLen := make([]int, 0, len(partCols))
retCols := make([]*expression.Column, 0, len(partCols))
filled := make(map[int64]struct{})
for i := 0; i < len(partCols); i++ {
partCols[i].Index = i
colLen = append(colLen, types.UnspecifiedLength)
// Deal with same columns.
if _, done := filled[partCols[i].UniqueID]; !done {
partCols[i].Index = len(filled)
filled[partCols[i].UniqueID] = struct{}{}
colLen = append(colLen, types.UnspecifiedLength)
retCols = append(retCols, partCols[i])
}
}
return partCols, colLen
return retCols, colLen
}

func (s *partitionProcessor) getUsedHashPartitions(ctx sessionctx.Context,
Expand Down Expand Up @@ -217,11 +224,11 @@ func (s *partitionProcessor) getUsedHashPartitions(ctx sessionctx.Context,
used = []int{FullRange}
break
}
if !r.HighVal[0].IsNull() {
if len(r.HighVal) != len(partCols) {
used = []int{-1}
break
}

// The code below is for the range `r` is a point.
if len(r.HighVal) != len(partCols) {
used = []int{FullRange}
break
}
highLowVals := make([]types.Datum, 0, len(r.HighVal)+len(r.LowVal))
highLowVals = append(highLowVals, r.HighVal...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/timer/tablestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T
}
defer back()

if sessVars := sctx.GetSessionVars(); sessVars.GetEnableIndexMerge() {
if sessVars := sctx.GetSessionVars(); !sessVars.GetEnableIndexMerge() {
// Enable index merge is used to make sure filtering timers with tags quickly.
// Currently, we are using multi-value index to index tags for timers which requires index merge enabled.
// see: https://docs.pingcap.com/tidb/dev/choose-index#use-a-multi-valued-index
Expand Down
2 changes: 1 addition & 1 deletion pkg/ttl/client/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ loop:
return ctx.Err()
case ch <- clientv3.WatchResponse{}:
default:
unsent = make([]chan clientv3.WatchResponse, len(watchers), 0)
unsent = make([]chan clientv3.WatchResponse, len(watchers))
copy(unsent, watchers[i:])
break loop
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{

// RunInTxn executes the specified function in a txn
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
success := false
defer func() {
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
if !success {
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
// Using another timeout context to avoid that this behavior will be changed in the future.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
cancel()
}
}()

tracer := metrics.PhaseTracerFromCtx(ctx)
defer tracer.EnterPhase(tracer.Phase())

Expand All @@ -125,14 +139,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
}
tracer.EnterPhase(metrics.PhaseOther)

success := false
defer func() {
if !success {
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
}
}()

if err = fn(); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"del_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"scan_integration_test.go",
"scan_test.go",
"session_test.go",
"task_manager_integration_test.go",
Expand Down
Loading

0 comments on commit ebcd25e

Please sign in to comment.