Skip to content

Commit

Permalink
admission,goschedstats: reduce scheduler sampling frequency when unde…
Browse files Browse the repository at this point in the history
…rloaded

The goschestats makes the determination of the tick interval
every 1s, and either ticks at 1ms or 250ms. 250ms is used when
the cpu is very underloaded.
The admission control code disables slot and token enforcement
if the tick interval is greater than 1ms. This is done since
the reduced frequency of CPULoad could cause us to not adjust
slots fast enough.

Fixes cockroachdb#66881

Release justification: Fix for high-priority issue in new
functionality.

Release note: None
  • Loading branch information
sumeerbhola committed Aug 25, 2021
1 parent 7a00767 commit 34e8973
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 36 deletions.
69 changes: 43 additions & 26 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type tokenGranter struct {
requester requester
availableBurstTokens int
maxBurstTokens int
skipTokenEnforcement bool
// Optional. Practically, both uses of tokenGranter, for SQLKVResponseWork
// and SQLSQLResponseWork have a non-nil value. We don't expect to use
// memory overload indicators here since memory accounting and disk spilling
Expand All @@ -396,7 +397,7 @@ func (tg *tokenGranter) getPairedRequester() requester {
return tg.requester
}

func (tg *tokenGranter) refillBurstTokens() {
func (tg *tokenGranter) refillBurstTokens(skipTokenEnforcement bool) {
tg.availableBurstTokens = tg.maxBurstTokens
}

Expand All @@ -412,7 +413,7 @@ func (tg *tokenGranter) tryGetLocked() grantResult {
if tg.cpuOverload != nil && tg.cpuOverload.isOverloaded() {
return grantFailDueToSharedResource
}
if tg.availableBurstTokens > 0 {
if tg.availableBurstTokens > 0 || tg.skipTokenEnforcement {
tg.availableBurstTokens--
return grantSuccess
}
Expand Down Expand Up @@ -446,10 +447,12 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) {
// KVWork, that are limited by slots (CPU bound work) and/or tokens (IO
// bound work).
type kvGranter struct {
coord *GrantCoordinator
requester requester
usedSlots int
totalSlots int
coord *GrantCoordinator
requester requester
usedSlots int
totalSlots int
skipSlotEnforcement bool

ioTokensEnabled bool
// There is no rate limiting in granting these tokens. That is, they are all
// burst tokens.
Expand Down Expand Up @@ -478,7 +481,7 @@ func (sg *kvGranter) tryGet() bool {
}

func (sg *kvGranter) tryGetLocked() grantResult {
if sg.usedSlots < sg.totalSlots {
if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement {
if !sg.ioTokensEnabled || sg.availableIOTokens > 0 {
sg.usedSlots++
if sg.usedSlotsMetric != nil {
Expand Down Expand Up @@ -842,20 +845,32 @@ func (coord *GrantCoordinator) GetWorkQueue(workKind WorkKind) *WorkQueue {
return coord.queues[workKind].(*WorkQueue)
}

// CPULoad implements CPULoadListener and is called every 1ms. The same
// frequency is used for refilling the burst tokens since synchronizing the
// two means that the refilled burst can take into account the latest
// schedulers stats (indirectly, via the implementation of
// cpuOverloadIndicator).
// TODO(sumeer): after experimentation, possibly generalize the 1ms ticks used
// for CPULoad.
func (coord *GrantCoordinator) CPULoad(runnable int, procs int) {
// CPULoad implements CPULoadListener and is called periodically (see
// CPULoadListener for details). The same frequency is used for refilling the
// burst tokens since synchronizing the two means that the refilled burst can
// take into account the latest schedulers stats (indirectly, via the
// implementation of cpuOverloadIndicator).
func (coord *GrantCoordinator) CPULoad(runnable int, procs int, samplePeriod time.Duration) {
coord.mu.Lock()
defer coord.mu.Unlock()
coord.numProcs = procs
coord.cpuLoadListener.CPULoad(runnable, procs)
coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens()
coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens()
coord.cpuLoadListener.CPULoad(runnable, procs, samplePeriod)

// Slot adjustment and token refilling requires 1ms periods to work well. If
// the CPULoad ticks are less frequent, there is no guarantee that the
// tokens or slots will be sufficient to service requests. This is
// particularly the case for slots where we dynamically adjust them, and
// high contention can suddenly result in high slot utilization even while
// cpu utilization stays low. We don't want to artificially bottleneck
// request processing when we are in this slow CPULoad ticks regime since we
// can't adjust slots or refill tokens fast enough. So we explicitly tell
// the granters to not do token or slot enforcement.
skipEnforcement := samplePeriod > time.Millisecond
coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement)
coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement)
if coord.granters[KVWork] != nil {
coord.granters[KVWork].(*kvGranter).skipSlotEnforcement = skipEnforcement
}
if coord.grantChainActive && !coord.tryTerminateGrantChain() {
return
}
Expand Down Expand Up @@ -1264,12 +1279,11 @@ type cpuOverloadIndicator interface {
}

// CPULoadListener listens to the latest CPU load information. Currently we
// expect this to be called every 1ms.
// TODO(sumeer): experiment with more smoothing. It is possible that rapid
// slot fluctuation may be resulting in under-utilization at a time scale that
// is not observable at our metrics frequency.
// expect this to be called every 1ms, unless the cpu is extremely
// underloaded. If the samplePeriod is > 1ms, admission control enforcement
// for CPU is disabled.
type CPULoadListener interface {
CPULoad(runnable int, procs int)
CPULoad(runnable int, procs int, samplePeriod time.Duration)
}

// kvSlotAdjuster is an implementer of CPULoadListener and
Expand All @@ -1293,8 +1307,9 @@ type kvSlotAdjuster struct {
var _ cpuOverloadIndicator = &kvSlotAdjuster{}
var _ CPULoadListener = &kvSlotAdjuster{}

func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) {
func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) {
threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV))

// Simple heuristic, which worked ok in experiments. More sophisticated ones
// could be devised.
if runnable >= threshold*procs {
Expand Down Expand Up @@ -1332,7 +1347,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) {
}

func (kvsa *kvSlotAdjuster) isOverloaded() bool {
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots
return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement
}

// sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator
Expand Down Expand Up @@ -1543,7 +1558,9 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
var _ cpuOverloadIndicator = &sqlNodeCPUOverloadIndicator{}
var _ CPULoadListener = &sqlNodeCPUOverloadIndicator{}

func (sn *sqlNodeCPUOverloadIndicator) CPULoad(runnable int, procs int) {
func (sn *sqlNodeCPUOverloadIndicator) CPULoad(
runnable int, procs int, samplePeriod time.Duration,
) {
}

func (sn *sqlNodeCPUOverloadIndicator) isOverloaded() bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestGranterBasic(t *testing.T) {
var runnable, procs int
d.ScanArgs(t, "runnable", &runnable)
d.ScanArgs(t, "procs", &procs)
coord.CPULoad(runnable, procs)
coord.CPULoad(runnable, procs, time.Millisecond)
return flushAndReset()

case "set-io-tokens":
Expand Down
45 changes: 36 additions & 9 deletions pkg/util/goschedstats/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
package goschedstats

import (
"context"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -54,8 +56,15 @@ func RecentNormalizedRunnableGoroutines() float64 {
// will have to add a new version of that file.
var _ = numRunnableGoroutines

// We sample the number of runnable goroutines once per samplePeriod.
const samplePeriod = time.Millisecond
// We sample the number of runnable goroutines once per samplePeriodShort or
// samplePeriodLong (if the system is underloaded). Using samplePeriodLong can
// cause sluggish response to a load spike, from the perspective of
// RunnableCountCallback implementers (admission control), so it is not ideal.
// We support this behavior only because of deficiencies in the Go runtime
// that cause 5-10% of cpu utilization for what is otherwise cheap 1ms
// polling. See #66881.
const samplePeriodShort = time.Millisecond
const samplePeriodLong = 250 * time.Millisecond

// We "report" the average value every reportingPeriod.
// Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines()
Expand All @@ -78,8 +87,8 @@ var total uint64
var ewma uint64

// RunnableCountCallback is provided the current value of runnable goroutines,
// and GOMAXPROCS.
type RunnableCountCallback func(numRunnable int, numProcs int)
// GOMAXPROCS, and the current sampling period.
type RunnableCountCallback func(numRunnable int, numProcs int, samplePeriod time.Duration)

type callbackWithID struct {
RunnableCountCallback
Expand Down Expand Up @@ -146,25 +155,43 @@ func init() {
var sum uint64
var numSamples int

ticker := time.NewTicker(samplePeriod)
curPeriod := samplePeriodLong
ticker := time.NewTicker(curPeriod)
// We keep local versions of "total" and "ewma" and we just Store the
// updated values to the globals.
var localTotal, localEWMA uint64
for {
t := <-ticker.C
if t.Sub(lastTime) > reportingPeriod {
var avgValue uint64
if numSamples > 0 {
// We want the average value over the reporting period, so we divide
// by numSamples.
newValue := sum / uint64(numSamples)
localTotal += newValue
avgValue = sum / uint64(numSamples)
localTotal += avgValue
atomic.StoreUint64(&total, localTotal)

// ewma(t) = c * value(t) + (1 - c) * ewma(t-1)
// We use c = 0.5.
localEWMA = (newValue + localEWMA) / 2
localEWMA = (avgValue + localEWMA) / 2
atomic.StoreUint64(&ewma, localEWMA)
}
nextPeriod := samplePeriodLong
avgNumRunnablePerProc := float64(avgValue) * fromFixedPoint
if avgNumRunnablePerProc > 1 {
// Not extremely underloaded
nextPeriod = samplePeriodShort
}
// We switch the sample period only at reportingPeriod boundaries
// since it ensures that all samples contributing to a reporting
// period were at equal intervals (this is desirable since we average
// them). It also naturally reduces the frequency at which we reset a
// ticker.
if nextPeriod != curPeriod {
ticker.Reset(nextPeriod)
curPeriod = nextPeriod
log.Infof(context.Background(), "switching to period %s", curPeriod.String())
}
lastTime = t
sum = 0
numSamples = 0
Expand All @@ -174,7 +201,7 @@ func init() {
cbs := callbackInfo.cbs
callbackInfo.mu.Unlock()
for i := range cbs {
cbs[i].RunnableCountCallback(runnable, numProcs)
cbs[i].RunnableCountCallback(runnable, numProcs, curPeriod)
}
// The value of the sample is the ratio of runnable to numProcs (scaled
// for fixed-point arithmetic).
Expand Down

0 comments on commit 34e8973

Please sign in to comment.