From 6ec6f79f9f09aeb9105b74b454a572c1102ee40d Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 11 Nov 2024 22:20:04 +0800 Subject: [PATCH] Support adaptive update interval for low resolution ts (#1484) (#1491) Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 24 ++++++++++++------------ oracle/oracles/pd_test.go | 3 ++- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index b26a09858f..2e20fa17d5 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -120,8 +120,10 @@ type pdOracle struct { lastTSMap sync.Map quit chan struct{} // The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval. - // For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`. + // For TiDB >=v8.0.0, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`. + // For versions before v8.0.0, this value is fixed to 2s. lastTSUpdateInterval atomic.Int64 + // The actual interval to update the low resolution ts. If the configured one is too large to satisfy the // requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter // value than lastTSUpdateInterval. @@ -351,13 +353,6 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { return last, true } -func max(x, y time.Duration) time.Duration { - if x > y { - return x - } - return y -} - func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration { o.adaptiveUpdateIntervalState.mu.Lock() defer o.adaptiveUpdateIntervalState.mu.Unlock() @@ -402,7 +397,11 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura // update interval immediately to adapt to it. // We shrink the update interval to a value slightly lower than the requested staleness to avoid potential // frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently. - newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + // newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + newInterval := requiredStaleness - adaptiveUpdateTSIntervalShrinkingPreserve + if newInterval < minAllowedAdaptiveUpdateTSInterval { + newInterval = minAllowedAdaptiveUpdateTSInterval + } return adaptiveUpdateTSIntervalStateAdapting, newInterval } @@ -486,8 +485,6 @@ func (o *pdOracle) updateTS(ctx context.Context) { ticker := time.NewTicker(currentInterval) defer ticker.Stop() - // Note that as `doUpdate` updates last tick time while `nextUpdateInterval` may perform calculation depending on the - // last tick time, `doUpdate` should be called after finishing calculating the next interval. doUpdate := func(now time.Time) { // Update the timestamp for each txnScope o.lastTSMap.Range(func(key, _ interface{}) bool { @@ -694,7 +691,10 @@ func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(rea // calculation. Make sure it's always positive before passing it to the updateTS goroutine. // Note that `nextUpdateInterval` method expects the requiredStaleness is always non-zero when triggerred // by this path. - requiredStaleness = max(requiredStaleness, time.Millisecond) + // requiredStaleness = max(requiredStaleness, time.Millisecond) + if requiredStaleness < time.Millisecond { + requiredStaleness = time.Millisecond + } // Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is // busy, it means that there's another concurrent call trying to update it. Just skip it in this case. select { diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 72b6dfab23..c409247158 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -335,8 +335,9 @@ func TestValidateSnapshotReadTSReusingGetTSResult(t *testing.T) { getCtx := func(index int) context.Context { if cancelIndex == index { return ctx + } else { + return context.Background() } - return context.Background() } results = append(results, asyncValidate(getCtx(0), ts-2))