Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_38304
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed Oct 13, 2022
2 parents 53b883e + 4394a2a commit 712cc5a
Show file tree
Hide file tree
Showing 42 changed files with 218 additions and 81 deletions.
16 changes: 8 additions & 8 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2818,8 +2818,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:/92S0s/TCoCmK2vv6WbkXNeqtLn90sHRJ5Vlx1Sigas=",
version = "v0.0.0-20220913025519-586cff113d10",
sum = "h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=",
version = "v0.0.0-20220929075948-06e08d5ed64c",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3422,15 +3422,15 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:TxDSQAmtGdE34BvOaYF35mRrAXePeZEq8quvuAwrKsI=",
version = "v2.0.1-0.20220923061703-33efe476e022",
sum = "h1:/13jzD/AR7v3dCLweFQ2JG8bihh3HLVIci2tbOHHGW0=",
version = "v2.0.1-0.20221012074856-6def8d7b90c4",
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=",
version = "v0.0.0-20220725055910-7187a7ab72db",
sum = "h1:REQOR1XraH1fT9BCoNBPZs1CAe+w7VPLU+d+si7DLYo=",
version = "v0.0.0-20221010134149-d50e5fe43f14",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down Expand Up @@ -4250,8 +4250,8 @@ def go_deps():
name = "org_golang_x_crypto",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/crypto",
sum = "h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=",
version = "v0.0.0-20210921155107-089bfa567519",
sum = "h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=",
version = "v0.0.0-20220411220226-7b82a4e95df4",
)
go_repository(
name = "org_golang_x_exp",
Expand Down
2 changes: 1 addition & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ TEST_COVERAGE_DIR := "test_coverage"

ifneq ("$(CI)", "0")
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp
BAZEL_CMD_CONFIG := --config=ci
BAZEL_CMD_CONFIG := --config=ci --repository_cache=/home/jenkins/.tidb/tmp
endif
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func InterpolateMySQLString(s string) string {
}

// TableExists return whether table with specified name exists in target db
func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, error) {
func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema, table).Scan(&exist)
Expand All @@ -309,6 +309,21 @@ func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, e
}
}

// SchemaExists return whether schema with specified name exists.
func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema).Scan(&exist)
switch {
case err == nil:
return true, nil
case err == sql.ErrNoRows:
return false, nil
default:
return false, errors.Annotatef(err, "check schema exists failed")
}
}

// GetJSON fetches a page and parses it as JSON. The parsed result will be
// stored into the `v`. The variable `v` must be a pointer to a type that can be
// unmarshalled from JSON.
Expand Down
28 changes: 26 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,15 @@ type restoreSchemaWorker struct {
func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.glue.GetParser(), sqlStr, job.dbName, job.tblName)
if err != nil {
return err
worker.logger.Warn("failed to rewrite statement, will use raw input instead",
zap.String("db", job.dbName),
zap.String("table", job.tblName),
zap.String("statement", sqlStr),
zap.Error(err))
job.stmts = []string{sqlStr}
} else {
job.stmts = stmts
}
job.stmts = stmts
return worker.appendJob(job)
}

Expand Down Expand Up @@ -656,7 +662,25 @@ loop:
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt)
if err != nil {
// try to imitate IF NOT EXISTS behavior for parsing errors
exists := false
switch job.stmtType {
case schemaCreateDatabase:
var err2 error
exists, err2 = common.SchemaExists(worker.ctx, session, job.dbName)
if err2 != nil {
task.Error("failed to check database existence", zap.Error(err2))
}
case schemaCreateTable:
exists, _ = common.TableExists(worker.ctx, session, job.dbName, job.tblName)
}
if exists {
err = nil
}
}
task.End(zap.ErrorLevel, err)

if err != nil {
err = common.ErrCreateSchema.Wrap(err).GenWithStackByArgs(common.UniqueTable(job.dbName, job.tblName), job.stmtType.String())
worker.wg.Done()
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_test(
"//types",
"//util/codec",
"//util/mathutil",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_character_sets/greek.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "local"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/*!40101 SET NAMES binary*/;
CREATE DATABASE `charsets` /*!40100 DEFAULT CHARACTER SET greek */;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*!40101 SET NAMES binary*/;
CREATE TABLE `greek` (
`c` varchar(20) DEFAULT NULL,
PRIMARY KEY (`c`)
) ENGINE=InnoDB DEFAULT CHARSET=greek;
3 changes: 3 additions & 0 deletions br/tests/lightning_character_sets/greek/charsets.greek.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/*!40101 SET NAMES binary*/;
INSERT INTO `greek` VALUES
('α');
14 changes: 14 additions & 0 deletions br/tests/lightning_character_sets/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ run_lightning --config "tests/$TEST_NAME/binary.toml" -d "tests/$TEST_NAME/mixed
run_sql 'SELECT sum(`唯一键`) AS s FROM charsets.mixed'
check_contains 's: 5291'

# test about unsupported charset in UTF-8 encoding dump files
# test local backend
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" 2>&1 | grep -q "Unknown character set: 'greek'"
run_sql 'DROP DATABASE IF EXISTS charsets;'
run_sql 'CREATE DATABASE charsets;'
run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);'
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek"
run_sql "SELECT count(*) FROM charsets.greek WHERE c = 'α';"
check_contains 'count(*): 1'
# test tidb backend
run_sql 'TRUNCATE TABLE charsets.greek;'
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" --backend tidb
run_sql "SELECT count(*) FROM charsets.greek WHERE c = 'α';"
check_contains 'count(*): 1'
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,9 @@ type Instance struct {
ForcePriority string `toml:"tidb_force_priority" json:"tidb_force_priority"`
MemoryUsageAlarmRatio float64 `toml:"tidb_memory_usage_alarm_ratio" json:"tidb_memory_usage_alarm_ratio"`
// EnableCollectExecutionInfo enables the TiDB to collect execution info.
EnableCollectExecutionInfo bool `toml:"tidb_enable_collect_execution_info" json:"tidb_enable_collect_execution_info"`
PluginDir string `toml:"plugin_dir" json:"plugin_dir"`
PluginLoad string `toml:"plugin_load" json:"plugin_load"`
EnableCollectExecutionInfo AtomicBool `toml:"tidb_enable_collect_execution_info" json:"tidb_enable_collect_execution_info"`
PluginDir string `toml:"plugin_dir" json:"plugin_dir"`
PluginLoad string `toml:"plugin_load" json:"plugin_load"`
// MaxConnections is the maximum permitted number of simultaneous client connections.
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
Expand Down Expand Up @@ -864,7 +864,7 @@ var defaultConf = Config{
CheckMb4ValueInUTF8: *NewAtomicBool(true),
ForcePriority: "NO_PRIORITY",
MemoryUsageAlarmRatio: DefMemoryUsageAlarmRatio,
EnableCollectExecutionInfo: true,
EnableCollectExecutionInfo: *NewAtomicBool(true),
PluginDir: "/data/deploy/plugin",
PluginLoad: "",
MaxConnections: 0,
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ func getTiFlashTableSyncProgress(pollTiFlashContext *TiFlashManagementContext, t
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
progress = 1
}
if progress < 1 {
logutil.BgLogger().Debug("TiFlash replica progress < 1.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
}
return types.TruncateFloatToString(progress, 2), nil
}

Expand Down
1 change: 1 addition & 0 deletions distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
],
embed = [":distsql"],
flaky = True,
race = "on",
deps = [
"//kv",
"//parser/charset",
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
SessionMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
EnabledRateLimitAction: enabledRateLimitAction,
EventCb: eventCb,
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo,
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),
}

if kvReq.StoreType == kv.TiFlash {
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_library(
"//util/execdetails",
"//util/expensivequery",
"//util/logutil",
"//util/memoryusagealarm",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_ngaut_pools//:pools",
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,7 +2109,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
if globalConfig.Instance.EnableCollectExecutionInfo {
if globalConfig.Instance.EnableCollectExecutionInfo.Load() {
// In ExplainFor case, RuntimeStatsColl should not be reset for reuse,
// because ExplainFor need to display the last statement information.
reuseObj := vars.StmtCtx.RuntimeStatsColl
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestIndexJoin31494(t *testing.T) {
func TestFix31038(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.Instance.EnableCollectExecutionInfo = false
conf.Instance.EnableCollectExecutionInfo.Store(false)
})
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
9 changes: 3 additions & 6 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5198,12 +5198,9 @@ func TestHistoryRead(t *testing.T) {
require.Greater(t, snapshotTS, curVer1.Ver)
require.Less(t, snapshotTS, curVer2.Ver)
tk.MustQuery("select * from history_read").Check(testkit.Rows("1"))
_, err = tk.Exec("insert history_read values (2)")
require.Error(t, err)
_, err = tk.Exec("update history_read set a = 3 where a = 1")
require.Error(t, err)
_, err = tk.Exec("delete from history_read where a = 1")
require.Error(t, err)
tk.MustExecToErr("insert history_read values (2)")
tk.MustExecToErr("update history_read set a = 3 where a = 1")
tk.MustExecToErr("delete from history_read where a = 1")
tk.MustExec("set @@tidb_snapshot = ''")
tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2"))
tk.MustExec("insert history_read values (3)")
Expand Down
19 changes: 17 additions & 2 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ func (h *memoryDebugModeHandler) genInfo(status string, needProfile bool, heapIn
return h.infoField, err
}

func updateTriggerIntervalByHeapInUse(heapInUse uint64) (time.Duration, int) {
const GB uint64 = 1 << 30
if heapInUse < 30*GB {
return 5 * time.Second, 6
} else if heapInUse < 40*GB {
return 15 * time.Second, 2
} else {
return 30 * time.Second, 1
}
}

func (h *memoryDebugModeHandler) run() {
var err error
var fields []zap.Field
Expand All @@ -213,21 +224,25 @@ func (h *memoryDebugModeHandler) run() {
zap.String("minHeapInUse", memory.FormatBytes(h.minHeapInUse)),
zap.Int64("alarmRatio", h.alarmRatio),
)
ticker, loop := time.NewTicker(5*time.Second), 0
triggerInterval := 5 * time.Second
printMod := 6
ticker, loop := time.NewTicker(triggerInterval), 0
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
heapInUse, trackedMem := h.fetchCurrentMemoryUsage(h.autoGC)
loop++
if loop%6 == 0 {
if loop%printMod == 0 {
fields, err = h.genInfo("running", false, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Info("Memory Debug Mode", fields...)
if err != nil {
return
}
}
triggerInterval, printMod = updateTriggerIntervalByHeapInUse(heapInUse)
ticker.Reset(triggerInterval)

if !h.autoGC {
if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse {
Expand Down
9 changes: 3 additions & 6 deletions executor/index_advise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ func TestIndexAdvise(t *testing.T) {

tk := testkit.NewTestKit(t, store)

_, err := tk.Exec("index advise infile '/tmp/nonexistence.sql'")
require.EqualError(t, err, "Index Advise: don't support load file without local field")
_, err = tk.Exec("index advise local infile ''")
require.EqualError(t, err, "Index Advise: infile path is empty")
_, err = tk.Exec("index advise local infile '/tmp/nonexistence.sql' lines terminated by ''")
require.EqualError(t, err, "Index Advise: don't support advise index for SQL terminated by nil")
tk.MustGetErrMsg("index advise infile '/tmp/nonexistence.sql'", "Index Advise: don't support load file without local field")
tk.MustGetErrMsg("index advise local infile ''", "Index Advise: infile path is empty")
tk.MustGetErrMsg("index advise local infile '/tmp/nonexistence.sql' lines terminated by ''", "Index Advise: don't support advise index for SQL terminated by nil")

path := "/tmp/index_advise.sql"
fp, err := os.Create(path)
Expand Down
2 changes: 2 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2223,6 +2223,8 @@ func (e *memtableRetriever) dataForTableTiFlashReplica(ctx sessionctx.Context, s
progress += progressMap[p.ID]
}
progress = progress / float64(len(pi.Definitions))
progressString := types.TruncateFloatToString(progress, 2)
progress, _ = strconv.ParseFloat(progressString, 64)
} else {
progress = progressMap[tbl.ID]
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220913025519-586cff113d10
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -85,8 +85,8 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db
github.com/tikv/client-go/v2 v2.0.1-0.20221012074856-6def8d7b90c4
github.com/tikv/pd/client v0.0.0-20221010134149-d50e5fe43f14
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down Expand Up @@ -226,7 +226,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/exp/typeparams v0.0.0-20220613132600-b0d781184e0d // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
Expand Down
Loading

0 comments on commit 712cc5a

Please sign in to comment.