Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add Destroy method and handle session recycling (#59546) (#59634) #59670

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,18 @@ type Domain struct {
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
<<<<<<< HEAD:domain/domain.go
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
=======
// Note: If you no longer need the session, you must call Destroy to release it.
// Otherwise, the session will be leaked. Because there is a strong reference from the domain to the session.
sysSessionPool *sessionPool
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
>>>>>>> d8d4e74a5ed (statistics: add Destroy method and handle session recycling (#59546) (#59634)):pkg/domain/domain.go
// autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service.
autoidClient *autoid.ClientDiscover
sysVarCache sysVarCache // replaces GlobalVariableCache
Expand Down Expand Up @@ -1339,6 +1348,17 @@ func (p *sessionPool) Put(resource pools.Resource) {
resource.Close()
}
}
<<<<<<< HEAD:domain/domain.go
=======

// Destroy destroys the session.
func (p *sessionPool) Destroy(resource pools.Resource) {
// Delete the internal session to the map of SessionManager
infosync.DeleteInternalSession(resource)
resource.Close()
}

>>>>>>> d8d4e74a5ed (statistics: add Destroy method and handle session recycling (#59546) (#59634)):pkg/domain/domain.go
func (p *sessionPool) Close() {
p.mu.Lock()
if p.mu.closed {
Expand Down Expand Up @@ -1957,7 +1977,11 @@ func (do *Domain) initStats() {
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
if err != nil {
<<<<<<< HEAD:domain/domain.go
logutil.BgLogger().Error("init stats info failed", zap.Duration("take time", time.Since(t)), zap.Error(err))
=======
logutil.BgLogger().Error("init stats info failed", zap.Bool("lite", liteInitStats), zap.Duration("take time", time.Since(t)), zap.String("error", fmt.Sprintf("%+v", err)))
>>>>>>> d8d4e74a5ed (statistics: add Destroy method and handle session recycling (#59546) (#59634)):pkg/domain/domain.go
} else {
logutil.BgLogger().Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
Expand Down Expand Up @@ -1986,11 +2010,11 @@ func (do *Domain) loadStatsWorker() {
}
err = statsHandle.Update(do.InfoSchema())
if err != nil {
logutil.BgLogger().Debug("update stats info failed", zap.Error(err))
logutil.BgLogger().Warn("update stats info failed", zap.Error(err))
}
err = statsHandle.LoadNeededHistograms()
if err != nil {
logutil.BgLogger().Debug("load histograms failed", zap.Error(err))
logutil.BgLogger().Warn("load histograms failed", zap.String("error", fmt.Sprintf("%+v", err)))
}
case <-do.exit:
return
Expand Down Expand Up @@ -2028,6 +2052,85 @@ func (do *Domain) syncIndexUsageWorker(owner owner.Manager) {
}
}

<<<<<<< HEAD:domain/domain.go
=======
func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle, owner owner.Manager) {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats")
statsHandle.FlushStats()
logutil.BgLogger().Info("updateStatsWorker ready to release owner")
owner.Cancel()
ch <- struct{}{}
}()
select {
case <-ch:
logutil.BgLogger().Info("updateStatsWorker exit preprocessing finished")
return
case <-timeout.Done():
logutil.BgLogger().Warn("updateStatsWorker exit preprocessing timeout, force exiting")
return
}
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
lease := do.statsLease
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
gcStatsTicker := time.NewTicker(100 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
updateStatsHealthyTicker := time.NewTicker(20 * lease)
readMemTricker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
defer func() {
dumpColStatsUsageTicker.Stop()
gcStatsTicker.Stop()
deltaUpdateTicker.Stop()
readMemTricker.Stop()
updateStatsHealthyTicker.Stop()
do.SetStatsUpdating(false)
logutil.BgLogger().Info("updateStatsWorker exited.")
}()
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)

for {
select {
case <-do.exit:
do.updateStatsWorkerExitPreprocessing(statsHandle, owner)
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
logutil.BgLogger().Warn("GC stats failed", zap.Error(err))
}
case <-dumpColStatsUsageTicker.C:
err := statsHandle.DumpColStatsUsageToKV()
if err != nil {
logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err))
}

case <-readMemTricker.C:
memory.ForceReadMemStats()
case <-updateStatsHealthyTicker.C:
statsHandle.UpdateStatsHealthyMetrics()
}
}
}

>>>>>>> d8d4e74a5ed (statistics: add Destroy method and handle session recycling (#59546) (#59634)):pkg/domain/domain.go
func (do *Domain) handleDDLEvent() {
logutil.BgLogger().Info("handleDDLEvent started.")
defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false)
Expand Down
15 changes: 15 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,21 @@ func DeleteInternalSession(se interface{}) {
sm.DeleteInternalSession(se)
}

// ContainsInternalSessionForTest is the entry function for check whether an internal session is in SessionManager.
// It is only used for test.
func ContainsInternalSessionForTest(se any) bool {
is, err := getGlobalInfoSyncer()
if err != nil {
return false
}
sm := is.GetSessionManager()
if sm == nil {
return false
}

return sm.ContainsInternalSession(se)
}

// SetEtcdClient is only used for test.
func SetEtcdClient(etcdCli *clientv3.Client) {
is, err := getGlobalInfoSyncer()
Expand Down
Loading