Skip to content

Commit

Permalink
Merge branch 'master' into enhance-restore-kvfiles
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored Nov 22, 2022
2 parents d90e199 + f894d9b commit d43b33f
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ bazel-out
bazel-testlogs
bazel-tidb
.ijwb/
/oom_record/
25 changes: 25 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,3 +1736,28 @@ func TestTiDBDownBeforeUpdateGlobalVersion(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDownBeforeUpdateGlobalVersion"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkDownBeforeUpdateGlobalVersion"))
}

func TestDDLBlockedCreateView(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

hook := &ddl.TestDDLCallback{Do: dom}
first := true
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
if !first {
return
}
first = false
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("create view v as select * from t")
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t modify column a char(10)")
}
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (dc *ddlCtx) excludeJobIDs() string {
}

const (
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing order by processing desc limit 1) and %s reorg %s order by processing desc, job_id"
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id"
)

type jobType int
Expand Down
4 changes: 2 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ func (do *Domain) loadStatsWorker() {
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
if err != nil {
logutil.BgLogger().Debug("init stats info failed", zap.Error(err))
logutil.BgLogger().Error("init stats info failed", zap.Duration("take time", time.Since(t)), zap.Error(err))
} else {
logutil.BgLogger().Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
Expand Down Expand Up @@ -1863,7 +1863,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
case t := <-statsHandle.DDLEventCh():
err := statsHandle.HandleDDLEvent(t)
if err != nil {
logutil.BgLogger().Debug("handle ddl event failed", zap.Error(err))
logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err))
}
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(handle.DumpDelta)
Expand Down
8 changes: 8 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,14 @@ func TestIssue10608(t *testing.T) {
tk.MustExec("insert into t values(508931), (508932)")
tk.MustQuery("select (select /*+ stream_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-"))
tk.MustQuery("select (select /*+ hash_agg() */ group_concat(concat(123,'-')) from t where t.a = s.b group by t.a) as t from s;").Check(testkit.Rows("123-", "123-"))

tk.MustExec("CREATE TABLE `t49`(`c0` char(1) DEFAULT '1', `c2` char(1) DEFAULT NULL, UNIQUE KEY `c2` (`c2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("INSERT INTO `t49` VALUES ('0','0'),('0','1');")
tk.MustExec("CREATE TABLE `t0` (`c0` blob DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("INSERT INTO `t0` VALUES (_binary ']'),(_binary '777926278'),(_binary '0.2136404982804636'),(_binary '1901362489'),(_binary '1558203848'),(''),(_binary '1830406335'),(''),(_binary '0'),(NULL),(_binary '601930250'),(_binary '1558203848'),(_binary '-122008948'),(_binary '-2053608489'),(_binary 'hb/vt <7'),(_binary 'RC&2*'),(_binary '1'),(_binary '-1722334316'),(_binary '1830406335'),(_binary '1372126029'),(_binary '882291196'),(NULL),(_binary '-399693596');")
tk.MustExec("CREATE ALGORITHM=TEMPTABLE DEFINER=`root`@`%` SQL SECURITY DEFINER VIEW `v0` (`c0`, `c1`, `c2`) AS SELECT NULL AS `NULL`,`t49`.`c2` AS `c2`,(((CASE _UTF8MB4'I되EkfIO퀶' WHEN NULL THEN `t49`.`c0` WHEN `t49`.`c2` THEN `t0`.`c0` ELSE (CASE `t49`.`c0` WHEN _UTF8MB4'%' THEN 1035293362 ELSE _UTF8MB4',' END) END))<<(`t49`.`c0`)) AS `(((CASE 'I되EkfIO퀶' WHEN NULL THEN t49.c0 WHEN t49.c2 THEN t0.c0 ELSE (CASE t49.c0 WHEN '%' THEN 1035293362 ELSE ',' END ) END ))<<(t49.c0))` FROM (`t0`) JOIN `t49` WHERE TRUE;")
tk.MustQuery("SELECT /*+ STREAM_AGG()*/v0.c0 FROM t49, v0 LEFT OUTER JOIN t0 ON ('Iw') GROUP BY true;").
Check(testkit.Rows("<nil>"))
}

func TestIssue12759HashAggCalledByApply(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
globalStatsID.tableID, info.isIndex, info.histIDs,
tableAllPartitionStats)
if err != nil {
logutil.BgLogger().Error("merge global stats failed", zap.String("info", job.JobInfo), zap.Error(err))
if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}
Expand All @@ -95,14 +95,14 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
true,
)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err))
}
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil {
logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1))
}
}
return nil
return err
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}
Expand Down
5 changes: 5 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,11 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
return cc.handleChangeUser(ctx, data)
// ComBinlogDump, ComTableDump, ComConnectOut, ComRegisterSlave
case mysql.ComStmtPrepare:
// For issue 39132, same as ComQuery
if len(data) > 0 && data[len(data)-1] == 0 {
data = data[:len(data)-1]
dataStr = string(hack.String(data))
}
return cc.handleStmtPrepare(ctx, dataStr)
case mysql.ComStmtExecute:
return cc.handleStmtExecute(ctx, data)
Expand Down
27 changes: 27 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,33 @@ func TestDispatchClientProtocol41(t *testing.T) {
testDispatch(t, inputs, mysql.ClientProtocol41)
}

func TestQueryEndWithZero(t *testing.T) {
inputs := []dispatchInput{
{
com: mysql.ComStmtPrepare,
in: append([]byte("select 1"), 0x0),
err: nil,
out: []byte{
0xc, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x18,
0x0, 0x0, 0x1, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0, 0x1, 0x31, 0x1, 0x31, 0xc, 0x3f,
0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x2, 0xfe,
},
},
{
com: mysql.ComQuery,
in: append([]byte("select 1"), 0x0),
err: nil,
out: []byte{
0x1, 0x0, 0x0, 0x3, 0x1, 0x18, 0x0, 0x0, 0x4, 0x3, 0x64, 0x65, 0x66, 0x0, 0x0, 0x0,
0x1, 0x31, 0x1, 0x31, 0xc, 0x3f, 0x0, 0x1, 0x0, 0x0, 0x0, 0x8, 0x81, 0x0, 0x0, 0x0,
0x0, 0x1, 0x0, 0x0, 0x5, 0xfe, 0x2, 0x0, 0x0, 0x6, 0x1, 0x31, 0x1, 0x0, 0x0, 0x7, 0xfe,
},
},
}

testDispatch(t, inputs, 0)
}

func testDispatch(t *testing.T, inputs []dispatchInput, capability uint32) {
store := testkit.CreateMockStore(t)

Expand Down
10 changes: 7 additions & 3 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2781,11 +2781,11 @@ func (l *connEventLogs) check(fn func()) {
fn()
}

func (l *connEventLogs) waitConnDisconnected() error {
func (l *connEventLogs) waitEvent(tp extension.ConnEventTp) error {
totalSleep := 0
for {
l.Lock()
if l.types[len(l.types)-1] == extension.ConnDisconnected {
if l.types[len(l.types)-1] == tp {
l.Unlock()
return nil
}
Expand All @@ -2812,6 +2812,8 @@ func TestExtensionConnEvent(t *testing.T) {
require.NoError(t, extension.Setup())

ts := createTidbTestSuite(t)
// createTidbTestSuite create an inner connection, so wait the previous connection closed
require.NoError(t, logs.waitEvent(extension.ConnDisconnected))

// test for login success
logs.reset()
Expand All @@ -2828,6 +2830,7 @@ func TestExtensionConnEvent(t *testing.T) {
}()

var expectedConn2 variable.ConnectionInfo
require.NoError(t, logs.waitEvent(extension.ConnHandshakeAccepted))
logs.check(func() {
require.Equal(t, []extension.ConnEventTp{
extension.ConnConnected,
Expand Down Expand Up @@ -2861,7 +2864,7 @@ func TestExtensionConnEvent(t *testing.T) {

require.NoError(t, conn.Close())
require.NoError(t, db.Close())
require.NoError(t, logs.waitConnDisconnected())
require.NoError(t, logs.waitEvent(extension.ConnDisconnected))
logs.check(func() {
require.Equal(t, 3, len(logs.infos))
require.Equal(t, 1, len(logs.infos[2].ActiveRoles))
Expand Down Expand Up @@ -2889,6 +2892,7 @@ func TestExtensionConnEvent(t *testing.T) {

_, err = db.Conn(context.Background())
require.Error(t, err)
require.NoError(t, logs.waitEvent(extension.ConnDisconnected))
logs.check(func() {
require.Equal(t, []extension.ConnEventTp{
extension.ConnConnected,
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ const (
DefTiDBForeignKeyChecks = false
DefTiDBAnalyzePartitionConcurrency = 1
DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB
DefTiDBCostModelVer = 2
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
DefTiDBMergePartitionStatsConcurrency = 1
DefTiDBServerMemoryLimitGCTrigger = 0.7
Expand Down
18 changes: 10 additions & 8 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,16 @@ func (c *SortedRowContainer) Close() error {
func (c *SortedRowContainer) lessRow(rowI, rowJ Row) bool {
for i, colIdx := range c.keyColumns {
cmpFunc := c.keyCmpFuncs[i]
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if c.ByItemsDesc[i] {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
if cmpFunc != nil {
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if c.ByItemsDesc[i] {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
}
}
}
return false
Expand Down

0 comments on commit d43b33f

Please sign in to comment.