Skip to content

Commit

Permalink
sysvari: enable statsCache lru by config (#34278)
Browse files Browse the repository at this point in the history
* add statsquota variable

Signed-off-by: yisaer <disxiaofei@163.com>

* add test

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* fix conflict

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

fix duplicated plan

Signed-off-by: yisaer <disxiaofei@163.com>

fix duplicated plan

Signed-off-by: yisaer <disxiaofei@163.com>

fix duplicated plan

Signed-off-by: yisaer <disxiaofei@163.com>

* fix duplicated plan

Signed-off-by: yisaer <disxiaofei@163.com>

* fix duplicated plan

Signed-off-by: yisaer <disxiaofei@163.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Yisaer and ti-chi-bot authored May 10, 2022
1 parent 199eb8e commit 53f228a
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 69 deletions.
28 changes: 15 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,14 @@ type Performance struct {
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -822,12 +823,13 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
9 changes: 9 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", sv.Name), zap.Error(err))
}
if sv.Name == variable.TiDBStatsCacheMemQuota {
do.SetStatsCacheCapacity(variable.StatsCacheMemQuota.Load())
}
}
}

Expand Down Expand Up @@ -201,3 +204,9 @@ func (do *Domain) SetPDClientDynamicOption(option pd.DynamicOption, val interfac
}
return pdClient.UpdateOption(option, val)
}

// SetStatsCacheCapacity sets statsCache cap
func (do *Domain) SetStatsCacheCapacity(c int64) {
do.StatsHandle().SetStatsCacheCapacity(c)
logutil.BgLogger().Info("update stats cache capacity successfully", zap.Int64("capacity", c))
}
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,10 @@ func (b *executorBuilder) buildAnalyzeIndexIncremental(task plannercore.AnalyzeI
if !ok || idx.Len() == 0 || idx.LastAnalyzePos.IsNull() {
return analyzeTask
}
// If idx got evicted previously, we directly use IndexPushDown task as incremental analyze task will cause inaccuracy
if idx.IsEvicted() {
return analyzeTask
}
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(idx.Flag) {
exec := analyzeTask.idxExec
Expand Down
4 changes: 4 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,10 @@ func TestSetVar(t *testing.T) {
result = tk.MustQuery("select @@global.max_allowed_packet;")
result.Check(testkit.Rows("1024"))

// test value of tidb_stats_cache_mem_quota
tk.MustQuery("select @@global.tidb_stats_cache_mem_quota").Check(testkit.Rows("0"))
tk.MustExec("set global tidb_stats_cache_mem_quota = 200")
tk.MustQuery("select @@global.tidb_stats_cache_mem_quota").Check(testkit.Rows("200"))
// for read-only instance scoped system variables.
tk.MustGetErrCode("set @@global.plugin_load = ''", errno.ErrIncorrectGlobalLocalVar)
tk.MustGetErrCode("set @@global.plugin_dir = ''", errno.ErrIncorrectGlobalLocalVar)
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,16 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBStatsCacheMemQuota, Value: strconv.Itoa(DefTiDBStatsCacheMemQuota),
MinValue: 0, MaxValue: MaxTiDBStatsCacheMemQuota, Type: TypeInt,
GetGlobal: func(vars *SessionVars) (string, error) {
return strconv.FormatInt(StatsCacheMemQuota.Load(), 10), nil
}, SetGlobal: func(vars *SessionVars, s string) error {
v := TidbOptInt64(s, DefTiDBStatsCacheMemQuota)
StatsCacheMemQuota.Store(v)
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBQueryLogMaxLen, Value: strconv.Itoa(DefTiDBQueryLogMaxLen), Type: TypeInt, MinValue: 0, MaxValue: 1073741824, SetGlobal: func(s *SessionVars, val string) error {
QueryLogMaxLen.Store(int32(TidbOptInt64(val, DefTiDBQueryLogMaxLen)))
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ const (
TiDBMemQuotaBindingCache = "tidb_mem_quota_binding_cache"
// TiDBRCReadCheckTS indicates the tso optimization for read-consistency read is enabled.
TiDBRCReadCheckTS = "tidb_rc_read_check_ts"
// TiDBStatsCacheMemQuota records stats cache quota
TiDBStatsCacheMemQuota = "tidb_stats_cache_mem_quota"
// TiDBMemQuotaAnalyze indicates the memory quota for all analyze jobs.
TiDBMemQuotaAnalyze = "tidb_mem_quota_analyze"
)
Expand Down Expand Up @@ -847,6 +849,8 @@ const (
DefTiDBGCMaxWaitTime = 24 * 60 * 60
DefMaxAllowedPacket uint64 = 67108864
DefTiDBMemQuotaQuery = 1073741824 // 1GB
DefTiDBStatsCacheMemQuota = 0
MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB
DefTiDBQueryLogMaxLen = 4096
DefTiDBBatchDMLIgnoreError = false
DefTiDBMemQuotaAnalyze = -1
Expand Down Expand Up @@ -886,6 +890,7 @@ var (
StatsLoadPseudoTimeout = atomic.NewBool(DefTiDBStatsLoadPseudoTimeout)
MemQuotaBindingCache = atomic.NewInt64(DefTiDBMemQuotaBindingCache)
GCMaxWaitTime = atomic.NewInt64(DefTiDBGCMaxWaitTime)
StatsCacheMemQuota = atomic.NewInt64(DefTiDBStatsCacheMemQuota)
)

var (
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat
lastAnalyzePos.Copy(&col.LastAnalyzePos)
table.Columns[hist.ID] = col
}
cache.Put(tblID, table)
}
}

Expand Down
24 changes: 17 additions & 7 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
if err != nil {
return errors.Trace(err)
}

tables := make([]*statistics.Table, 0, len(rows))
deletedTableIDs := make([]int64, 0, len(rows))
for _, row := range rows {
version := row.GetUint64(0)
physicalID := row.GetInt64(1)
Expand All @@ -275,7 +272,7 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
h.mu.Unlock()
if !ok {
logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID))
deletedTableIDs = append(deletedTableIDs, physicalID)
oldCache.Del(physicalID)
continue
}
tableInfo := table.Meta()
Expand All @@ -289,17 +286,17 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...TableStatsOpt) error {
continue
}
if tbl == nil {
deletedTableIDs = append(deletedTableIDs, physicalID)
oldCache.Del(physicalID)
continue
}
tbl.Version = version
tbl.Count = count
tbl.ModifyCount = modifyCount
tbl.Name = getFullTableName(is, tableInfo)
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
tables = append(tables, tbl)
oldCache.Put(physicalID, tbl)
}
h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion, opts...))
h.updateStatsCache(oldCache.update(nil, nil, lastVersion, opts...))
return nil
}

Expand Down Expand Up @@ -2066,3 +2063,16 @@ func WithTableStatsByQuery() TableStatsOpt {
option.byQuery = true
}
}

// SetStatsCacheCapacity sets capacity
func (h *Handle) SetStatsCacheCapacity(c int64) {
if h == nil {
return
}
v := h.statsCache.Load()
if v == nil {
return
}
sc := v.(statsCache)
sc.SetCapacity(c)
}
39 changes: 27 additions & 12 deletions statistics/handle/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package handle

import (
"container/list"
"math"
"sync"

"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -46,6 +47,9 @@ type innerItemLruCache struct {
}

func newInnerLruCache(c int64) *innerItemLruCache {
if c < 1 {
c = math.MaxInt64
}
return &innerItemLruCache{
capacity: c,
cache: list.New(),
Expand All @@ -65,6 +69,13 @@ type lruMapElement struct {
tblMemUsage *statistics.TableMemoryUsage
}

func (l *lruMapElement) copy() *lruMapElement {
return &lruMapElement{
tbl: l.tbl,
tblMemUsage: l.tblMemUsage,
}
}

// GetByQuery implements statsCacheInner
func (s *statsInnerCache) GetByQuery(tblID int64) (*statistics.Table, bool) {
s.Lock()
Expand Down Expand Up @@ -230,30 +241,26 @@ func (s *statsInnerCache) FreshMemUsage() {
}
}

// FreshTableCost implements statsCacheInner
func (s *statsInnerCache) FreshTableCost(tblID int64) {
s.Lock()
defer s.Unlock()
element, exist := s.elements[tblID]
if !exist {
return
}
s.freshTableCost(tblID, element)
}

// Copy implements statsCacheInner
func (s *statsInnerCache) Copy() statsCacheInner {
s.RLock()
defer s.RUnlock()
newCache := newStatsLruCache(s.lru.capacity)
newCache.lru = s.lru.copy()
for tblID, element := range s.elements {
newCache.elements[tblID] = element
newCache.elements[tblID] = element.copy()
}
newCache.lru.onEvict = newCache.onEvict
return newCache
}

// SetCapacity implements statsCacheInner
func (s *statsInnerCache) SetCapacity(c int64) {
s.Lock()
defer s.Unlock()
s.lru.setCapacity(c)
}

func (s *statsInnerCache) onEvict(tblID int64) {
element, exist := s.elements[tblID]
if !exist {
Expand Down Expand Up @@ -368,3 +375,11 @@ func (c *innerItemLruCache) copy() *innerItemLruCache {
}
return newLRU
}

func (c *innerItemLruCache) setCapacity(capacity int64) {
if capacity < 1 {
capacity = math.MaxInt64
}
c.capacity = capacity
c.evictIfNeeded()
}
21 changes: 0 additions & 21 deletions statistics/handle/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

var (
columnMemoryUsage = int64(4)
indexMemoryUsage = int64(4)
columnTotalMemoryUsage = statistics.EmptyHistogramSize + 4
indexTotalMemoryUsage = statistics.EmptyHistogramSize + 4
Expand Down Expand Up @@ -190,26 +189,6 @@ func TestLRUFreshMemUsage(t *testing.T) {
require.Equal(t, lru.Cost(), 7*indexMemoryUsage)
}

func TestLRUFreshTableMemUsage(t *testing.T) {
lru := newStatsLruCache(1000)
t1 := newMockStatisticsTable(1, 1)
t2 := newMockStatisticsTable(2, 2)
t3 := newMockStatisticsTable(3, 3)
lru.Put(int64(1), t1)
lru.Put(int64(2), t2)
lru.Put(int64(3), t3)
require.Equal(t, lru.TotalCost(), 6*columnTotalMemoryUsage+6*indexTotalMemoryUsage)
require.Equal(t, lru.Cost(), 6*columnMemoryUsage)
mockTableAppendColumn(t1)
lru.FreshTableCost(int64(1))
require.Equal(t, lru.TotalCost(), 7*columnTotalMemoryUsage+6*indexTotalMemoryUsage)
require.Equal(t, lru.Cost(), 6*indexMemoryUsage)
mockTableAppendIndex(t1)
lru.FreshTableCost(int64(1))
require.Equal(t, lru.TotalCost(), 7*columnTotalMemoryUsage+7*indexTotalMemoryUsage)
require.Equal(t, lru.Cost(), 7*indexMemoryUsage)
}

func TestLRUPutTooBig(t *testing.T) {
lru := newStatsLruCache(1)
mockTable := newMockStatisticsTable(1, 1)
Expand Down
Loading

0 comments on commit 53f228a

Please sign in to comment.