Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark authored Aug 15, 2022
2 parents 4a69acf + 1b3c09b commit 72f0665
Show file tree
Hide file tree
Showing 32 changed files with 9,514 additions and 9,117 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,11 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_2.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_all_build: bazel_ci_prepare
mkdir -p bin
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ func BuildBackupRangeAndSchema(
tableInfo.ClearPlacement()
}

// Treat cached table as normal table.
tableInfo.TableCacheStatusType = model.TableCacheStatusDisable

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (importer *FileImporter) getKeyRangeForFiles(
if importer.isRawKvMode {
start, end = f.GetStartKey(), f.GetEndKey()
} else {
start, end, err = RewriteFileKeys(f, rewriteRules)
start, end, err = GetRewriteRawKeys(f, rewriteRules)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -390,7 +390,7 @@ func (importer *FileImporter) ImportKVFiles(
) error {
startTime := time.Now()
log.Debug("import kv files", zap.String("file", file.Path))
startKey, endKey, err := RewriteFileKeys(file, rule)
startKey, endKey, err := GetRewriteEncodedKeys(file, rule)
if err != nil {
return errors.Trace(err)
}
Expand Down
51 changes: 33 additions & 18 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,33 +505,48 @@ func findMatchedRewriteRule(file AppliedFile, rules *RewriteRules) *import_sstpb
return rule
}

// RewriteFileKeys tries to choose and apply the rewrite rules to the file.
// This method would try to detach whether the file key is encoded.
// Note: Maybe add something like `GetEncodedStartKey` Or `GetRawStartkey` for `AppliedFile` for making it more determinable?
func RewriteFileKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
// GetRewriteRawKeys rewrites rules to the raw key.
func GetRewriteRawKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
var rule *import_sstpb.RewriteRule
if startID == endID {
startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
// fall back to encoded key
log.Debug("cannot find rewrite rule with raw key format",
logutil.Key("startKey", file.GetStartKey()),
zap.Reflect("rewrite data", rewriteRules.Data))
startKey, rule = rewriteEncodedKey(file.GetStartKey(), rewriteRules)
if rule == nil {
err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for start key")
return
}
err = errors.Annotatef(berrors.ErrRestoreInvalidRewrite, "cannot find raw rewrite rule for start key, startKey: %s", redact.Key(file.GetStartKey()))
return
}
endKey, rule = rewriteRawKey(file.GetEndKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
endKey, rule = rewriteEncodedKey(file.GetEndKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for end key")
return
}
err = errors.Annotatef(berrors.ErrRestoreInvalidRewrite, "cannot find raw rewrite rule for end key, endKey: %s", redact.Key(file.GetEndKey()))
return
}
} else {
log.Error("table ids dont matched",
zap.Int64("startID", startID),
zap.Int64("endID", endID),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey))
err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "invalid table id")
}
return
}

// GetRewriteRawKeys rewrites rules to the encoded key
func GetRewriteEncodedKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
var rule *import_sstpb.RewriteRule
if startID == endID {
startKey, rule = rewriteEncodedKey(file.GetStartKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
err = errors.Annotatef(berrors.ErrRestoreInvalidRewrite, "cannot find encode rewrite rule for start key, startKey: %s", redact.Key(file.GetStartKey()))
return
}
endKey, rule = rewriteEncodedKey(file.GetEndKey(), rewriteRules)
if rewriteRules != nil && rule == nil {
err = errors.Annotatef(berrors.ErrRestoreInvalidRewrite, "cannot find encode rewrite rule for end key, endKey: %s", redact.Key(file.GetEndKey()))
return
}
} else {
log.Error("table ids dont matched",
Expand Down
25 changes: 23 additions & 2 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,18 @@ func TestRewriteFileKeys(t *testing.T) {
NewKeyPrefix: tablecodec.GenTablePrefix(2),
OldKeyPrefix: tablecodec.GenTablePrefix(1),
},
{
NewKeyPrefix: tablecodec.GenTablePrefix(511),
OldKeyPrefix: tablecodec.GenTablePrefix(767),
},
},
}
rawKeyFile := backuppb.File{
Name: "backup.sst",
StartKey: tablecodec.GenTableRecordPrefix(1),
EndKey: tablecodec.GenTableRecordPrefix(1).PrefixNext(),
}
start, end, err := restore.RewriteFileKeys(&rawKeyFile, &rewriteRules)
start, end, err := restore.GetRewriteRawKeys(&rawKeyFile, &rewriteRules)
require.NoError(t, err)
_, end, err = codec.DecodeBytes(end, nil)
require.NoError(t, err)
Expand All @@ -324,8 +328,25 @@ func TestRewriteFileKeys(t *testing.T) {
StartKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(1)),
EndKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(1).PrefixNext()),
}
start, end, err = restore.RewriteFileKeys(&encodeKeyFile, &rewriteRules)
start, end, err = restore.GetRewriteEncodedKeys(&encodeKeyFile, &rewriteRules)
require.NoError(t, err)
require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(2)), start)
require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(2).PrefixNext()), end)

// test for table id 767
encodeKeyFile767 := backuppb.DataFileInfo{
Path: "bakcup.log",
StartKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(767)),
EndKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(767).PrefixNext()),
}
// use raw rewrite should no error but not equal
start, end, err = restore.GetRewriteRawKeys(&encodeKeyFile767, &rewriteRules)
require.NoError(t, err)
require.NotEqual(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511)), start)
require.NotEqual(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511).PrefixNext()), end)
// use encode rewrite should no error and equal
start, end, err = restore.GetRewriteEncodedKeys(&encodeKeyFile767, &rewriteRules)
require.NoError(t, err)
require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511)), start)
require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511).PrefixNext()), end)
}
42 changes: 42 additions & 0 deletions br/tests/br_cache_table/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

run_sql "create schema $DB;"
run_sql "create table $DB.cache_1 (id int);"
run_sql "insert into $DB.cache_1 values (1);"
run_sql "alter table $DB.cache_1 cache;"
run_sql "insert into $DB.cache_1 values (2),(3);"

echo "backup start..."
run_br backup db --db "$DB" -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

run_sql "drop schema $DB;"

echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

set -x

run_sql "select count(*) from $DB.cache_1;"
check_contains 'count(*): 3'

run_sql "select create_options from information_schema.tables where table_schema = '$DB' and table_name = 'cache_1';"
check_not_contains 'create_options: cached=on'

run_sql "drop schema $DB"
3 changes: 3 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
"planner/core/plan.go": "planner/core/plan.go",
"errno/": "only table code"
}
},
Expand Down Expand Up @@ -300,6 +301,7 @@
"planner/implementation": "planner code",
"planner/cascades": "planner code",
"planner/core/plan_cache.go": "planner code",
"planner/core/plan.go": "planner/core/plan.go",
"util/": "util code",
"parser/": "parser code",
"meta/": "parser code"
Expand Down Expand Up @@ -637,6 +639,7 @@
},
"only_files": {
"expression/builtin_cast.go": "enable expression/builtin_cast.go",
"planner/core/plan.go": "planner/core/plan.go",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
DefExpensiveQueryTimeThreshold = 60
// DefMemoryUsageAlarmRatio is the threshold triggering an alarm which the memory usage of tidb-server instance exceeds.
DefMemoryUsageAlarmRatio = 0.8
// DefTempDir is the default temporary directory path for TiDB.
DefTempDir = "/tmp/tidb"
)

// Valid config maps
Expand Down Expand Up @@ -170,6 +172,7 @@ type Config struct {
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
TempDir string `toml:"temp-dir" json:"temp-dir"`
TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"`
// TempStorageQuota describe the temporary storage Quota during query exector when TiDBEnableTmpStorageOnOOM is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
Expand Down Expand Up @@ -810,6 +813,7 @@ var defaultConf = Config{
Lease: "45s",
TokenLimit: 1000,
OOMUseTmpStorage: true,
TempDir: DefTempDir,
TempStorageQuota: -1,
TempStoragePath: tempStorageDirName,
MemQuotaQuery: 1 << 30,
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000

# The temporary directory to store the intermediate compute results.
temp-dir = "/tmp/tidb"

# Specifies the temporary storage path for some operators when a single SQL statement exceeds the memory quota specified by the memory quota.
# It defaults to a generated directory in `<TMPDIR>/<os/user.Current().Uid>_tidb/` if it is unset.
# It only takes effect when `tidb_enable_tmp_storage_on_oom` is `true`.
Expand Down
41 changes: 41 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,18 @@ func TestAlterTableExchangePartition(t *testing.T) {
require.NoError(t, err)

tk.MustExec("alter table e15 exchange partition p0 with table e16")

tk.MustExec("create table e17 (a int)")
tk.MustExec("alter table e17 set tiflash replica 1")
tk.MustExec("insert into e17 values (1)")

tk.MustExec("create table e18 (a int) partition by range (a) (partition p0 values less than (4), partition p1 values less than (10))")
tk.MustExec("alter table e18 set tiflash replica 1")
tk.MustExec("insert into e18 values (2)")

tk.MustExec("alter table e18 exchange partition p0 with table e17")
tk.MustQuery("select * /*+ read_from_storage(tiflash[e18]) */ from e18").Check(testkit.Rows("1"))
tk.MustQuery("select * /*+ read_from_storage(tiflash[e17]) */ from e17").Check(testkit.Rows("2"))
}

func TestExchangePartitionTableCompatiable(t *testing.T) {
Expand Down Expand Up @@ -2377,6 +2389,35 @@ func TestExchangePartitionHook(t *testing.T) {
tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
}

func TestExchangePartitionAutoID(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@tidb_enable_exchange_partition=1")
defer tk.MustExec("set @@tidb_enable_exchange_partition=0")

tk.MustExec("use test")
tk.MustExec(`create table pt (a int primary key auto_increment) partition by range(a) (
partition p0 values less than (3),
partition p1 values less than (6),
PARTITION p2 values less than (9),
PARTITION p3 values less than (50000000)
);`)
tk.MustExec(`create table nt(a int primary key auto_increment);`)
tk.MustExec(`insert into pt values (0), (4)`)
tk.MustExec("insert into nt values (1)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID"))
}()

tk.MustExec("alter table pt exchange partition p0 with table nt")
tk.MustExec("insert into nt values (NULL)")
tk.MustQuery("select count(*) from nt where a >= 4000000").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from pt where a >= 4000000").Check(testkit.Rows("1"))
}

func TestExchangePartitionExpressIndex(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
Expand Down
15 changes: 15 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,21 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

failpoint.Inject("exchangePartitionAutoID", func(val failpoint.Value) {
if val.(bool) {
se, err := w.sessPool.get()
defer w.sessPool.put(se)
if err != nil {
failpoint.Return(ver, err)
}
sess := newSession(se)
_, err = sess.execute(context.Background(), "insert into test.pt values (40000000)", "exchange_partition_test")
if err != nil {
failpoint.Return(ver, err)
}
}
})

err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestB
builder.Request.Tp = kv.ReqTypeAnalyze
builder.Request.Data, builder.err = ana.Marshal()
builder.Request.NotFillCache = true
builder.Request.IsolationLevel = kv.RC
builder.Request.IsolationLevel = kv.SI
builder.Request.Priority = kv.PriorityLow
}

Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func TestRequestBuilder5(t *testing.T) {
KeepOrder: true,
Desc: false,
Concurrency: 15,
IsolationLevel: kv.RC,
IsolationLevel: kv.SI,
Priority: 1,
NotFillCache: true,
ReadReplicaScope: kv.GlobalReplicaScope,
Expand Down
5 changes: 5 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4726,6 +4726,11 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}

if plan.IndexInfo != nil {
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, plan.TblInfo.Name.O+":"+plan.IndexInfo.Name.O)
}

failpoint.Inject("assertBatchPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if e.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && assertScope != b.readReplicaScope {
Expand Down
5 changes: 5 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}

if p.IndexInfo != nil {
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, p.TblInfo.Name.O+":"+p.IndexInfo.Name.O)
}

failpoint.Inject("assertPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if e.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && assertScope != e.readReplicaScope {
Expand Down
Loading

0 comments on commit 72f0665

Please sign in to comment.