Skip to content

Commit

Permalink
Support adaptive update interval for low resolution ts (tikv#1484) (t…
Browse files Browse the repository at this point in the history
…ikv#1491)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored and ekexium committed Jan 9, 2025
1 parent cde1d96 commit 6ec6f79
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
24 changes: 12 additions & 12 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ type pdOracle struct {
lastTSMap sync.Map
quit chan struct{}
// The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval.
// For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`.
// For TiDB >=v8.0.0, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`.
// For versions before v8.0.0, this value is fixed to 2s.
lastTSUpdateInterval atomic.Int64

// The actual interval to update the low resolution ts. If the configured one is too large to satisfy the
// requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter
// value than lastTSUpdateInterval.
Expand Down Expand Up @@ -351,13 +353,6 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) {
return last, true
}

func max(x, y time.Duration) time.Duration {
if x > y {
return x
}
return y
}

func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration {
o.adaptiveUpdateIntervalState.mu.Lock()
defer o.adaptiveUpdateIntervalState.mu.Unlock()
Expand Down Expand Up @@ -402,7 +397,11 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura
// update interval immediately to adapt to it.
// We shrink the update interval to a value slightly lower than the requested staleness to avoid potential
// frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently.
newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval)
// newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval)
newInterval := requiredStaleness - adaptiveUpdateTSIntervalShrinkingPreserve
if newInterval < minAllowedAdaptiveUpdateTSInterval {
newInterval = minAllowedAdaptiveUpdateTSInterval
}
return adaptiveUpdateTSIntervalStateAdapting, newInterval
}

Expand Down Expand Up @@ -486,8 +485,6 @@ func (o *pdOracle) updateTS(ctx context.Context) {
ticker := time.NewTicker(currentInterval)
defer ticker.Stop()

// Note that as `doUpdate` updates last tick time while `nextUpdateInterval` may perform calculation depending on the
// last tick time, `doUpdate` should be called after finishing calculating the next interval.
doUpdate := func(now time.Time) {
// Update the timestamp for each txnScope
o.lastTSMap.Range(func(key, _ interface{}) bool {
Expand Down Expand Up @@ -694,7 +691,10 @@ func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(rea
// calculation. Make sure it's always positive before passing it to the updateTS goroutine.
// Note that `nextUpdateInterval` method expects the requiredStaleness is always non-zero when triggerred
// by this path.
requiredStaleness = max(requiredStaleness, time.Millisecond)
// requiredStaleness = max(requiredStaleness, time.Millisecond)
if requiredStaleness < time.Millisecond {
requiredStaleness = time.Millisecond
}
// Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is
// busy, it means that there's another concurrent call trying to update it. Just skip it in this case.
select {
Expand Down
3 changes: 2 additions & 1 deletion oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,9 @@ func TestValidateSnapshotReadTSReusingGetTSResult(t *testing.T) {
getCtx := func(index int) context.Context {
if cancelIndex == index {
return ctx
} else {
return context.Background()
}
return context.Background()
}

results = append(results, asyncValidate(getCtx(0), ts-2))
Expand Down

0 comments on commit 6ec6f79

Please sign in to comment.