From 590500a72196dd216c725759a61dc8ef202deb7d Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 31 Oct 2024 21:06:10 +0800 Subject: [PATCH 01/12] Draft implementation Signed-off-by: MyonKeminta --- oracle/oracle.go | 13 +++ oracle/oracles/local.go | 12 +++ oracle/oracles/mock.go | 11 +++ oracle/oracles/pd.go | 196 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 214 insertions(+), 18 deletions(-) diff --git a/oracle/oracle.go b/oracle/oracle.go index 5579d3568a..a4ffa1b93b 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -51,6 +51,12 @@ type Oracle interface { GetLowResolutionTimestamp(ctx context.Context, opt *Option) (uint64, error) GetLowResolutionTimestampAsync(ctx context.Context, opt *Option) Future SetLowResolutionTimestampUpdateInterval(time.Duration) error + // GetStaleTimestamp generates a timestamp based on the recently fetched timestamp and the elapsed time since + // when that timestamp was fetched. The result is expected to be about `prevSecond` seconds before the current + // time. + // WARNING: This method does not guarantee whether the generated timestamp is legal for accessing the data. + // Neither is it safe to use it for verifying the legality of another calculated timestamp. + // Be sure to validate the timestamp before using it to access the data. GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error) IsExpired(lockTimestamp, TTL uint64, opt *Option) bool UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64 @@ -61,6 +67,13 @@ type Oracle interface { // GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups. GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) + + // ValidateSnapshotReadTS verifies whether it can be guaranteed that the given readTS doesn't exceed the maximum ts + // that has been allocated by the oracle, so that it's safe to use this ts to perform snapshot read, stale read, + // etc. + // Note that this method only checks the ts from the oracle's perspective. It doesn't check whether the snapshot + // has been GCed. + ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *Option) error } // Future is a future which promises to return a timestamp. diff --git a/oracle/oracles/local.go b/oracle/oracles/local.go index f8016f468c..851a38f46a 100644 --- a/oracle/oracles/local.go +++ b/oracle/oracles/local.go @@ -36,6 +36,7 @@ package oracles import ( "context" + "github.com/pingcap/errors" "sync" "time" @@ -148,3 +149,14 @@ func (l *localOracle) SetExternalTimestamp(ctx context.Context, newTimestamp uin func (l *localOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) { return l.getExternalTimestamp(ctx) } + +func (l *localOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error { + currentTS, err := l.GetTimestamp(ctx, opt) + if err != nil { + return errors.Errorf("fail to validate read timestamp: %v", err) + } + if currentTS < readTS { + return errors.Errorf("cannot set read timestamp to a future time") + } + return nil +} diff --git a/oracle/oracles/mock.go b/oracle/oracles/mock.go index 633d975371..cab3335aba 100644 --- a/oracle/oracles/mock.go +++ b/oracle/oracles/mock.go @@ -137,6 +137,17 @@ func (o *MockOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) erro return nil } +func (o *MockOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error { + currentTS, err := o.GetTimestamp(ctx, opt) + if err != nil { + return errors.Errorf("fail to validate read timestamp: %v", err) + } + if currentTS < readTS { + return errors.Errorf("cannot set read timestamp to a future time") + } + return nil +} + // IsExpired implements oracle.Oracle interface. func (o *MockOracle) IsExpired(lockTimestamp, TTL uint64, _ *oracle.Option) bool { o.RLock() diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 83dd41f3c3..4450182d4f 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -37,6 +37,7 @@ package oracles import ( "context" "fmt" + "golang.org/x/sync/singleflight" "strings" "sync" "sync/atomic" @@ -54,13 +55,44 @@ var _ oracle.Oracle = &pdOracle{} const slowDist = 30 * time.Millisecond +const minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond +const adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond +const adaptiveUpdateTSIntervalResetThreshold = 200 * time.Millisecond +const adaptiveUpdateTSIntervalResetPerSecond = 20 * time.Millisecond +const adaptiveUpdateTSIntervalResetTimeout = 5 * time.Minute + // pdOracle is an Oracle that uses a placement driver client as source. type pdOracle struct { c pd.Client // txn_scope (string) -> lastTSPointer (*atomic.Pointer[lastTSO]) - lastTSMap sync.Map - quit chan struct{} + lastTSMap sync.Map + quit chan struct{} + // The configured interval to update the low resolution ts. lastTSUpdateInterval atomic.Int64 + // The actual interval to update the low resolution ts. If the configured one is too large to satisfy the + // requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter + // value than lastTSUpdateInterval. + adaptiveLastTSUpdateInterval atomic.Int64 + + adaptiveUpdateIntervalState struct { + // The most recent time that a stale read / snapshot read requests a timestamp that is close enough to + // the current adaptive update interval. If there is such a request recently, the adaptive interval + // should avoid falling back to the original (configured) value. + // Stored in unix microseconds to make it able to be accessed atomically. + lastReachDropThresholdTime atomic.Int64 + // When someone requests need shrinking the update interval immediately, it sends the duration it expects to + // this channel. + shrinkIntervalCh chan time.Duration + + lastTick time.Time + } + + // When the low resolution ts is not new enough and there are many concurrent stane read / snapshot read + // operations that needs to validate the read ts, we can use this to avoid too many concurrent GetTS calls by + // reusing a result for different `ValidateSnapshotReadTS` calls. This can be done because that + // we don't require the ts for validation to be strictly the latest one. + // Note that the result can't be reused for different txnScopes. The txnScope is used as the key. + tsForValidation singleflight.Group } // lastTSO stores the last timestamp oracle gets from PD server and the local time when the TSO is fetched. @@ -84,6 +116,7 @@ func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracl if err != nil { return nil, err } + o.adaptiveLastTSUpdateInterval.Store(o.lastTSUpdateInterval.Load()) ctx := context.TODO() go o.updateTS(ctx) // Initialize the timestamp of the global txnScope by Get. @@ -241,28 +274,77 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { return last, true } +func (o *pdOracle) nextUpdateInterval(now time.Time, requireStaleness time.Duration) time.Duration { + configuredInterval := time.Duration(o.lastTSUpdateInterval.Load()) + currentAdaptiveInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) + if configuredInterval <= minAllowedAdaptiveUpdateTSInterval { + // If the user has configured a very short interval, we don't have any space to adjust it. Just use + // the user's configured value directly. + if currentAdaptiveInterval != configuredInterval { + o.adaptiveLastTSUpdateInterval.Store(int64(configuredInterval)) + } + return configuredInterval + } + + lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalResetTimeout { + return currentAdaptiveInterval + } + + timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick) + newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalResetPerSecond)) + if newInterval > configuredInterval { + newInterval = configuredInterval + } + + if newInterval != currentAdaptiveInterval { + o.adaptiveLastTSUpdateInterval.Store(int64(newInterval)) + } + return newInterval +} + func (o *pdOracle) updateTS(ctx context.Context) { - currentInterval := o.lastTSUpdateInterval.Load() - ticker := time.NewTicker(time.Duration(currentInterval)) + currentInterval := time.Duration(o.lastTSUpdateInterval.Load()) + ticker := time.NewTicker(currentInterval) defer ticker.Stop() + + doUpdate := func() { + // Update the timestamp for each txnScope + o.lastTSMap.Range(func(key, _ interface{}) bool { + txnScope := key.(string) + ts, err := o.getTimestamp(ctx, txnScope) + if err != nil { + logutil.Logger(ctx).Error("updateTS error", zap.String("txnScope", txnScope), zap.Error(err)) + return true + } + o.setLastTS(ts, txnScope) + return true + }) + } + for { select { - case <-ticker.C: - // Update the timestamp for each txnScope - o.lastTSMap.Range(func(key, _ interface{}) bool { - txnScope := key.(string) - ts, err := o.getTimestamp(ctx, txnScope) - if err != nil { - logutil.Logger(ctx).Error("updateTS error", zap.String("txnScope", txnScope), zap.Error(err)) - return true - } - o.setLastTS(ts, txnScope) - return true - }) - newInterval := o.lastTSUpdateInterval.Load() + case now := <-ticker.C: + doUpdate() + + newInterval := o.nextUpdateInterval(now, 0) if newInterval != currentInterval { currentInterval = newInterval - ticker.Reset(time.Duration(currentInterval)) + ticker.Reset(currentInterval) + } + + o.adaptiveUpdateIntervalState.lastTick = now + + case interval := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: + if interval < currentInterval && currentInterval > minAllowedAdaptiveUpdateTSInterval { + currentInterval = max(interval-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + + if time.Since(o.adaptiveUpdateIntervalState.lastTick) > currentInterval { + doUpdate() + o.adaptiveUpdateIntervalState.lastTick = time.Now() + } + + ticker.Reset(currentInterval) } case <-o.quit: return @@ -296,6 +378,8 @@ func (f lowResolutionTsFuture) Wait() (uint64, error) { // SetLowResolutionTimestampUpdateInterval sets the refresh interval for low resolution timestamps. Note this will take // effect up to the previous update interval amount of time after being called. +// This setting may not be strictly followed. If Stale Read requests too new data to be available, the low resolution +// ts may be actually updated in a shorter interval than the configured one. func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(updateInterval time.Duration) error { if updateInterval <= 0 { return fmt.Errorf("updateInterval must be > 0") @@ -366,3 +450,79 @@ func (o *pdOracle) SetExternalTimestamp(ctx context.Context, ts uint64) error { func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) { return o.c.GetExternalTimestamp(ctx) } + +func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Option) (uint64, error) { + ch := o.tsForValidation.DoChan(opt.TxnScope, func() (interface{}, error) { + //metrics.ValidateReadTSFromPDCount.Inc() + + // If the call that triggers the execution of this function is canceled by the context, other calls that are + // waiting for reusing the same result should not be canceled. So pass context.Background() instead of the + // current ctx. + res, err := o.GetTimestamp(context.Background(), opt) + // After finishing the current call, allow the next call to trigger fetching a new TS. + o.tsForValidation.Forget(opt.TxnScope) + return res, err + }) + select { + case <-ctx.Done(): + return 0, errors.WithStack(ctx.Err()) + case res := <-ch: + if res.Err != nil { + return 0, errors.WithStack(res.Err) + } + return res.Val.(uint64), nil + } +} + +func (o *pdOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error { + latestTS, err := o.GetLowResolutionTimestamp(ctx, opt) + // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check. + // But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function + // loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls. + if err != nil || readTS > latestTS { + currentTS, err := o.getCurrentTSForValidation(ctx, opt) + if err != nil { + return errors.Errorf("fail to validate read timestamp: %v", err) + } + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS) + if readTS > currentTS { + return errors.Errorf("cannot set read timestamp to a future time") + } + } else { + estimatedCurrentTS, err := o.getStaleTimestamp(opt.TxnScope, 0) + if err != nil { + logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval", + zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope)) + } else { + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS) + } + } + return nil +} + +func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS uint64, currentTS uint64) { + requiredStaleness := oracle.GetTimeFromTS(currentTS).Sub(oracle.GetTimeFromTS(readTS)) + currentUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) + + if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalResetThreshold { + now := time.Now().UnixMilli() + for { + last := o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load() + if last >= now { + break + } + if o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.CompareAndSwap(last, now) { + break + } + } + } + + if requiredStaleness <= currentUpdateInterval && currentUpdateInterval > minAllowedAdaptiveUpdateTSInterval { + // Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is + // busy, it means that there's another concurrent call trying to update it. Just skip it in this case. + select { + case o.adaptiveUpdateIntervalState.shrinkIntervalCh <- requiredStaleness: + default: + } + } +} From 6979a6d469402d539379fe55511bc381b6df6cc6 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 4 Nov 2024 18:07:51 +0800 Subject: [PATCH 02/12] Refine interactions between goroutines; add logs Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 153 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 134 insertions(+), 19 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 4450182d4f..eb0927306a 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -55,11 +55,37 @@ var _ oracle.Oracle = &pdOracle{} const slowDist = 30 * time.Millisecond -const minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond -const adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond -const adaptiveUpdateTSIntervalResetThreshold = 200 * time.Millisecond -const adaptiveUpdateTSIntervalResetPerSecond = 20 * time.Millisecond -const adaptiveUpdateTSIntervalResetTimeout = 5 * time.Minute +type adaptiveUpdateTSIntervalState int + +const ( + adaptiveUpdateTSIntervalStateNormal adaptiveUpdateTSIntervalState = iota + adaptiveUpdateTSIntervalStateAdapting + adaptiveUpdateTSIntervalStateRecovering + adaptiveUpdateTSIntervalStateUnadjustable +) + +func (s adaptiveUpdateTSIntervalState) String() string { + switch s { + case adaptiveUpdateTSIntervalStateNormal: + return "normal" + case adaptiveUpdateTSIntervalStateAdapting: + return "adapting" + case adaptiveUpdateTSIntervalStateRecovering: + return "recovering" + case adaptiveUpdateTSIntervalStateUnadjustable: + return "unadjustable" + default: + return "unknown" + } +} + +const ( + minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond + adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond + adaptiveUpdateTSIntervalResetThreshold = 200 * time.Millisecond + adaptiveUpdateTSIntervalResetPerSecond = 20 * time.Millisecond + adaptiveUpdateTSIntervalResetTimeout = 5 * time.Minute +) // pdOracle is an Oracle that uses a placement driver client as source. type pdOracle struct { @@ -72,9 +98,13 @@ type pdOracle struct { // The actual interval to update the low resolution ts. If the configured one is too large to satisfy the // requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter // value than lastTSUpdateInterval. + // This value is also possible to be updated by SetLowResolutionTimestampUpdateInterval, which may happen when + // user adjusting the update interval manually. adaptiveLastTSUpdateInterval atomic.Int64 adaptiveUpdateIntervalState struct { + // The mutex to avoid racing between updateTS goroutine and SetLowResolutionTimestampUpdateInterval. + mu sync.Mutex // The most recent time that a stale read / snapshot read requests a timestamp that is close enough to // the current adaptive update interval. If there is such a request recently, the adaptive interval // should avoid falling back to the original (configured) value. @@ -84,7 +114,10 @@ type pdOracle struct { // this channel. shrinkIntervalCh chan time.Duration + // Only accessed in updateTS goroutine. No need to use atomic value. lastTick time.Time + // For logging. + state adaptiveUpdateTSIntervalState } // When the low resolution ts is not new enough and there are many concurrent stane read / snapshot read @@ -107,20 +140,24 @@ type lastTSO struct { // `GetTimestamp()` is not called after `lastTSUpdateInterval`, it will be called by // itself to keep up with the timestamp on PD server. func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) { + if updateInterval <= 0 { + return nil, fmt.Errorf("updateInterval must be > 0") + } + o := &pdOracle{ c: pdClient, quit: make(chan struct{}), lastTSUpdateInterval: atomic.Int64{}, } - err := o.SetLowResolutionTimestampUpdateInterval(updateInterval) - if err != nil { - return nil, err - } - o.adaptiveLastTSUpdateInterval.Store(o.lastTSUpdateInterval.Load()) + o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1) + o.lastTSUpdateInterval.Store(int64(updateInterval)) + o.adaptiveLastTSUpdateInterval.Store(int64(updateInterval)) + o.adaptiveUpdateIntervalState.lastTick = time.Now() + ctx := context.TODO() go o.updateTS(ctx) // Initialize the timestamp of the global txnScope by Get. - _, err = o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { o.Close() return nil, err @@ -274,7 +311,10 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { return last, true } -func (o *pdOracle) nextUpdateInterval(now time.Time, requireStaleness time.Duration) time.Duration { +func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration { + o.adaptiveUpdateIntervalState.mu.Lock() + defer o.adaptiveUpdateIntervalState.mu.Unlock() + configuredInterval := time.Duration(o.lastTSUpdateInterval.Load()) currentAdaptiveInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) if configuredInterval <= minAllowedAdaptiveUpdateTSInterval { @@ -283,11 +323,39 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requireStaleness time.Durat if currentAdaptiveInterval != configuredInterval { o.adaptiveLastTSUpdateInterval.Store(int64(configuredInterval)) } + if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateUnadjustable { + logutil.Logger(context.Background()).Info("update low resolution ts interval is not being adaptive because the configured interval is too short", + zap.Duration("configuredInterval", configuredInterval), + zap.Stringer("state", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", adaptiveUpdateTSIntervalStateUnadjustable)) + o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateUnadjustable + } return configuredInterval } + if requiredStaleness != 0 && requiredStaleness < currentAdaptiveInterval { + if currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { + // If we are calculating the interval because of a request that requires a shorter staleness, we shrink the + // update interval immediately to adapt to it. + // We shrink the update interval to a value slightly lower than the requested staleness to avoid potential + // frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently. + prevAdaptiveInterval := currentAdaptiveInterval + currentAdaptiveInterval = max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveInterval)) + logutil.Logger(context.Background()).Info("shrink low resolution ts update interval immediately", + zap.Duration("requestedStaleness", requiredStaleness), + zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveInterval), + zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveInterval), + zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", adaptiveUpdateTSIntervalStateAdapting)) + o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateAdapting + } + return currentAdaptiveInterval + } + lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalResetTimeout { + // There is a recent request that requires a short staleness. Keep the current adaptive interval. return currentAdaptiveInterval } @@ -299,7 +367,26 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requireStaleness time.Durat if newInterval != currentAdaptiveInterval { o.adaptiveLastTSUpdateInterval.Store(int64(newInterval)) + if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateRecovering { + logutil.Logger(context.Background()).Info("update low resolution ts interval is recovering", + zap.Duration("currentAdaptiveUpdateInterval", newInterval), + zap.Duration("configuredInterval", configuredInterval), + zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", adaptiveUpdateTSIntervalStateRecovering)) + o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateRecovering + } } + + if newInterval == configuredInterval { + if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateNormal { + logutil.Logger(context.Background()).Info("update low resolution ts interval is now synced with the configuration", + zap.Duration("updateInterval", currentAdaptiveInterval), + zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", adaptiveUpdateTSIntervalStateNormal)) + o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateNormal + } + } + return newInterval } @@ -335,9 +422,9 @@ func (o *pdOracle) updateTS(ctx context.Context) { o.adaptiveUpdateIntervalState.lastTick = now - case interval := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: - if interval < currentInterval && currentInterval > minAllowedAdaptiveUpdateTSInterval { - currentInterval = max(interval-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + case requiredStaleness := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: + if requiredStaleness < currentInterval && currentInterval > minAllowedAdaptiveUpdateTSInterval { + currentInterval = max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) if time.Since(o.adaptiveUpdateIntervalState.lastTick) > currentInterval { doUpdate() @@ -380,11 +467,33 @@ func (f lowResolutionTsFuture) Wait() (uint64, error) { // effect up to the previous update interval amount of time after being called. // This setting may not be strictly followed. If Stale Read requests too new data to be available, the low resolution // ts may be actually updated in a shorter interval than the configured one. -func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(updateInterval time.Duration) error { - if updateInterval <= 0 { +func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(newUpdateInterval time.Duration) error { + if newUpdateInterval <= 0 { return fmt.Errorf("updateInterval must be > 0") } - o.lastTSUpdateInterval.Store(updateInterval.Nanoseconds()) + + o.adaptiveUpdateIntervalState.mu.Lock() + defer o.adaptiveUpdateIntervalState.mu.Unlock() + + prevConfigured := o.lastTSUpdateInterval.Swap(int64(newUpdateInterval)) + adaptiveUpdateInterval := o.adaptiveLastTSUpdateInterval.Load() + + var adaptiveUpdateIntervalUpdated bool + + if adaptiveUpdateInterval == prevConfigured || newUpdateInterval < time.Duration(adaptiveUpdateInterval) { + // If the adaptive update interval is the same as the configured one, treat it as the adaptive adjusting + // mechanism not taking effect. So update it immediately. + // If the new configured interval is short so that it's smaller than the current adaptive interval, also shrink + // the adaptive interval immediately. + o.adaptiveLastTSUpdateInterval.Store(int64(newUpdateInterval)) + adaptiveUpdateIntervalUpdated = true + } + logutil.Logger(context.Background()).Info("updated low resolution ts update interval", + zap.Duration("previous", time.Duration(prevConfigured)), + zap.Duration("new", newUpdateInterval), + zap.Duration("prevAdaptiveUpdateInterval", time.Duration(adaptiveUpdateInterval)), + zap.Bool("adaptiveUpdateIntervalUpdated", adaptiveUpdateIntervalUpdated)) + return nil } @@ -502,9 +611,15 @@ func (o *pdOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, op func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS uint64, currentTS uint64) { requiredStaleness := oracle.GetTimeFromTS(currentTS).Sub(oracle.GetTimeFromTS(readTS)) + + // Do not acquire the mutex, as here we only needs a rough check. + // So it's possible that we get inconsistent values from these two atomic fields, but it won't cause any problem. currentUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) + configuredUpdateInterval := time.Duration(o.lastTSUpdateInterval.Load()) - if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalResetThreshold { + if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalResetThreshold && currentUpdateInterval != configuredUpdateInterval { + // Record the most recent time when there's a read operation requesting the staleness close enough to the + // current update interval. now := time.Now().UnixMilli() for { last := o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load() From 5a322d8b702ada6f1d54c3e835299b519c212520 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 5 Nov 2024 18:10:02 +0800 Subject: [PATCH 03/12] Add tests Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 87 ++++++++++++++--------- oracle/oracles/pd_test.go | 142 +++++++++++++++++++++++++++++++++++--- tikv/kv.go | 4 +- 3 files changed, 189 insertions(+), 44 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index eb0927306a..6d4f95296b 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -80,11 +80,11 @@ func (s adaptiveUpdateTSIntervalState) String() string { } const ( - minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond - adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond - adaptiveUpdateTSIntervalResetThreshold = 200 * time.Millisecond - adaptiveUpdateTSIntervalResetPerSecond = 20 * time.Millisecond - adaptiveUpdateTSIntervalResetTimeout = 5 * time.Minute + minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond + adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond + adaptiveUpdateTSIntervalBlockRecoverThreshold = 200 * time.Millisecond + adaptiveUpdateTSIntervalRecoverPerSecond = 20 * time.Millisecond + adaptiveUpdateTSIntervalDelayBeforeRecovering = 5 * time.Minute ) // pdOracle is an Oracle that uses a placement driver client as source. @@ -116,7 +116,7 @@ type pdOracle struct { // Only accessed in updateTS goroutine. No need to use atomic value. lastTick time.Time - // For logging. + // Represents a description about the current state. For logging and diagnosing purposes. state adaptiveUpdateTSIntervalState } @@ -134,13 +134,20 @@ type lastTSO struct { arrival uint64 } +type PDOracleOptions struct { + // The duration to update the last ts, i.e., the low resolution ts. + UpdateInterval time.Duration + // Disable the background periodic update of the last ts. This is for test purposes only. + NoUpdateTS bool +} + // NewPdOracle create an Oracle that uses a pd client source. // Refer https://github.com/tikv/pd/blob/master/client/client.go for more details. // PdOracle maintains `lastTS` to store the last timestamp got from PD server. If // `GetTimestamp()` is not called after `lastTSUpdateInterval`, it will be called by // itself to keep up with the timestamp on PD server. -func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) { - if updateInterval <= 0 { +func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, error) { + if options.UpdateInterval <= 0 { return nil, fmt.Errorf("updateInterval must be > 0") } @@ -150,12 +157,14 @@ func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracl lastTSUpdateInterval: atomic.Int64{}, } o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1) - o.lastTSUpdateInterval.Store(int64(updateInterval)) - o.adaptiveLastTSUpdateInterval.Store(int64(updateInterval)) + o.lastTSUpdateInterval.Store(int64(options.UpdateInterval)) + o.adaptiveLastTSUpdateInterval.Store(int64(options.UpdateInterval)) o.adaptiveUpdateIntervalState.lastTick = time.Now() ctx := context.TODO() - go o.updateTS(ctx) + if !options.NoUpdateTS { + go o.updateTS(ctx) + } // Initialize the timestamp of the global txnScope by Get. _, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { @@ -311,6 +320,11 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { return last, true } +// nextUpdateInterval calculates the next interval to update the low resolution ts. When this is triggered by timer +// tick, requiredStaleness should be 0. This function may also be called in case there is a stale read / snapshot read +// requesting a short staleness (i.e., the read operation specifies the version it wants to read instead of allocating +// from PD, and the required version is very close to the current time). In this case, the requested staleness should be +// passed to this function as the requiredStaleness parameter. func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration { o.adaptiveUpdateIntervalState.mu.Lock() defer o.adaptiveUpdateIntervalState.mu.Unlock() @@ -333,8 +347,8 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura return configuredInterval } - if requiredStaleness != 0 && requiredStaleness < currentAdaptiveInterval { - if currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { + if requiredStaleness != 0 { + if requiredStaleness < currentAdaptiveInterval && currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { // If we are calculating the interval because of a request that requires a shorter staleness, we shrink the // update interval immediately to adapt to it. // We shrink the update interval to a value slightly lower than the requested staleness to avoid potential @@ -354,13 +368,24 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura } lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) - if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalResetTimeout { + if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { // There is a recent request that requires a short staleness. Keep the current adaptive interval. + + if currentAdaptiveInterval != configuredInterval && o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateAdapting { + logutil.Logger(context.Background()).Info("update low resolution ts interval is not recovering as there is a recent read requesting a short staleness", + zap.Duration("currentAdaptiveUpdateInterval", currentAdaptiveInterval), + zap.Duration("configuredInterval", configuredInterval), + zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", adaptiveUpdateTSIntervalStateAdapting), + zap.Time("recentRequestExceedingThreshold", lastReachDropThresholdTime)) + o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateAdapting + } + return currentAdaptiveInterval } timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick) - newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalResetPerSecond)) + newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond)) if newInterval > configuredInterval { newInterval = configuredInterval } @@ -423,10 +448,12 @@ func (o *pdOracle) updateTS(ctx context.Context) { o.adaptiveUpdateIntervalState.lastTick = now case requiredStaleness := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: - if requiredStaleness < currentInterval && currentInterval > minAllowedAdaptiveUpdateTSInterval { - currentInterval = max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + now := time.Now() + newInterval := o.nextUpdateInterval(now, requiredStaleness) + if newInterval != currentInterval { + currentInterval = newInterval - if time.Since(o.adaptiveUpdateIntervalState.lastTick) > currentInterval { + if time.Since(o.adaptiveUpdateIntervalState.lastTick) >= currentInterval { doUpdate() o.adaptiveUpdateIntervalState.lastTick = time.Now() } @@ -593,7 +620,7 @@ func (o *pdOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, op if err != nil { return errors.Errorf("fail to validate read timestamp: %v", err) } - o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS) + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now()) if readTS > currentTS { return errors.Errorf("cannot set read timestamp to a future time") } @@ -603,32 +630,28 @@ func (o *pdOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, op logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval", zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope)) } else { - o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS) + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now()) } } return nil } -func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS uint64, currentTS uint64) { +func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS uint64, currentTS uint64, now time.Time) { requiredStaleness := oracle.GetTimeFromTS(currentTS).Sub(oracle.GetTimeFromTS(readTS)) // Do not acquire the mutex, as here we only needs a rough check. // So it's possible that we get inconsistent values from these two atomic fields, but it won't cause any problem. currentUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) - configuredUpdateInterval := time.Duration(o.lastTSUpdateInterval.Load()) - if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalResetThreshold && currentUpdateInterval != configuredUpdateInterval { + if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalBlockRecoverThreshold { // Record the most recent time when there's a read operation requesting the staleness close enough to the // current update interval. - now := time.Now().UnixMilli() - for { - last := o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load() - if last >= now { - break - } - if o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.CompareAndSwap(last, now) { - break - } + nowMillis := now.UnixMilli() + last := o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load() + if last < nowMillis { + // Do not retry if the CAS fails (which may happen when there are other goroutines updating it + // concurrently), as we don't actually need to set it strictly. + o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.CompareAndSwap(last, nowMillis) } } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index c9fc24449a..fb77dbe7b2 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -32,7 +32,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package oracles_test +package oracles import ( "context" @@ -44,25 +44,24 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/oracle/oracles" pd "github.com/tikv/pd/client" ) func TestPDOracle_UntilExpired(t *testing.T) { lockAfter, lockExp := 10, 15 - o := oracles.NewEmptyPDOracle() + o := NewEmptyPDOracle() start := time.Now() - oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) + SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) lockTs := oracle.GoTimeToTS(start.Add(time.Duration(lockAfter)*time.Millisecond)) + 1 waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) assert.Equal(t, int64(lockAfter+lockExp), waitTs) } func TestPdOracle_GetStaleTimestamp(t *testing.T) { - o := oracles.NewEmptyPDOracle() + o := NewEmptyPDOracle() start := time.Now() - oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) + SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 10) assert.Nil(t, err) assert.WithinDuration(t, start.Add(-10*time.Second), oracle.GetTimeFromTS(ts), 2*time.Second) @@ -90,7 +89,7 @@ func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) { func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) { pdClient := MockPdClient{} - o := oracles.NewPdOracleWithClient(&pdClient) + o := NewPdOracleWithClient(&pdClient) ctx := context.TODO() wg := sync.WaitGroup{} @@ -131,7 +130,7 @@ func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) { assert.LessOrEqual(t, elapsed, 3*updateInterval) } - oracles.StartTsUpdateLoop(o, ctx, &wg) + StartTsUpdateLoop(o, ctx, &wg) // Check each update interval. Note that since these are in increasing // order the time for the new interval to take effect is always less // than the new interval. If we iterated in opposite order, then we'd have @@ -150,8 +149,8 @@ func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) { } func TestNonFutureStaleTSO(t *testing.T) { - o := oracles.NewEmptyPDOracle() - oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(time.Now())) + o := NewEmptyPDOracle() + SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(time.Now())) for i := 0; i < 100; i++ { time.Sleep(10 * time.Millisecond) now := time.Now() @@ -160,7 +159,7 @@ func TestNonFutureStaleTSO(t *testing.T) { closeCh := make(chan struct{}) go func() { time.Sleep(100 * time.Microsecond) - oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(now)) + SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(now)) close(closeCh) }() CHECK: @@ -180,3 +179,124 @@ func TestNonFutureStaleTSO(t *testing.T) { } } } + +func TestNextUpdateTSInterval(t *testing.T) { + oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{ + UpdateInterval: time.Second * 2, + NoUpdateTS: true, + }) + assert.NoError(t, err) + o := oracleInterface.(*pdOracle) + now := time.Now() + + mockTS := func(beforeNow time.Duration) uint64 { + return oracle.ComposeTS(oracle.GetPhysical(now.Add(-beforeNow)), 1) + } + mustNotifyShrinking := func(expectedRequiredStaleness time.Duration) { + // Normally this channel should be checked in pdOracle.updateTS method. Here we are testing the layer below the + // updateTS method, so we just do this assert to ensure the message is sent to this channel. + select { + case requiredStaleness := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: + assert.Equal(t, expectedRequiredStaleness, requiredStaleness) + default: + assert.Fail(t, "expects notifying shrinking update interval immediately, but no message received") + } + } + mustNoNotify := func() { + select { + case <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: + assert.Fail(t, "expects not notifying shrinking update interval immediately, but message was received") + default: + } + } + + now = now.Add(time.Second * 2) + assert.Equal(t, time.Second*2, o.nextUpdateInterval(now, 0)) + now = now.Add(time.Second * 2) + assert.Equal(t, time.Second*2, o.nextUpdateInterval(now, 0)) + assert.Equal(t, adaptiveUpdateTSIntervalStateNormal, o.adaptiveUpdateIntervalState.state) + + now = now.Add(time.Second) + // Simulate a read requesting a staleness larger than 2s, in which case nothing special will happen. + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(time.Second*3), mockTS(0), now) + mustNoNotify() + assert.Equal(t, time.Second*2, o.nextUpdateInterval(now, 0)) + + now = now.Add(time.Second) + // Simulate a read requesting a staleness less than 2s, in which case it should trigger immediate shrinking on the + // update interval. + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(time.Second), mockTS(0), now) + mustNotifyShrinking(time.Second) + expectedInterval := time.Second - adaptiveUpdateTSIntervalShrinkingPreserve + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, time.Second)) + assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) + assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + + // Let read with short staleness continue happening. + now = now.Add(adaptiveUpdateTSIntervalDelayBeforeRecovering / 2) + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(time.Second), mockTS(0), now) + mustNoNotify() + assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + + // The adaptiveUpdateTSIntervalDelayBeforeRecovering has not been elapsed since the last time there is a read with short + // staleness. The update interval won't start being reset at this time. + now = now.Add(adaptiveUpdateTSIntervalDelayBeforeRecovering/2 + time.Second) + o.adaptiveUpdateIntervalState.lastTick = now.Add(-time.Second) + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, 0)) + assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) + + // The adaptiveUpdateTSIntervalDelayBeforeRecovering has been elapsed. + now = now.Add(adaptiveUpdateTSIntervalDelayBeforeRecovering / 2) + o.adaptiveUpdateIntervalState.lastTick = now.Add(-time.Second) + expectedInterval += adaptiveUpdateTSIntervalRecoverPerSecond + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateRecovering, o.adaptiveUpdateIntervalState.state) + o.adaptiveUpdateIntervalState.lastTick = now + now = now.Add(time.Second * 2) + // No effect if the required staleness didn't trigger the threshold. + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(expectedInterval+adaptiveUpdateTSIntervalBlockRecoverThreshold*2), mockTS(0), now) + mustNoNotify() + expectedInterval += adaptiveUpdateTSIntervalRecoverPerSecond * 2 + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateRecovering, o.adaptiveUpdateIntervalState.state) + + // If there's a read operation requires a staleness that is close enough to the current adaptive update interval, + // then block the update interval from recovering. + o.adaptiveUpdateIntervalState.lastTick = now + now = now.Add(time.Second) + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(expectedInterval+adaptiveUpdateTSIntervalBlockRecoverThreshold/2), mockTS(0), now) + mustNoNotify() + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) + o.adaptiveUpdateIntervalState.lastTick = now + now = now.Add(time.Second) + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) + + // Now adaptiveUpdateTSIntervalDelayBeforeRecovering + 1s has been elapsed. Continue recovering. + now = now.Add(adaptiveUpdateTSIntervalDelayBeforeRecovering) + o.adaptiveUpdateIntervalState.lastTick = now.Add(-time.Second) + expectedInterval += adaptiveUpdateTSIntervalRecoverPerSecond + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateRecovering, o.adaptiveUpdateIntervalState.state) + + // Without any other interruption, the update interval will gradually recover to the same value as configured. + for { + o.adaptiveUpdateIntervalState.lastTick = now + now = now.Add(time.Second) + expectedInterval += adaptiveUpdateTSIntervalRecoverPerSecond + if expectedInterval >= time.Second*2 { + break + } + assert.InEpsilon(t, expectedInterval.Seconds(), o.nextUpdateInterval(now, 0).Seconds(), 1e-3) + assert.Equal(t, adaptiveUpdateTSIntervalStateRecovering, o.adaptiveUpdateIntervalState.state) + } + expectedInterval = time.Second * 2 + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, 0)) + assert.Equal(t, adaptiveUpdateTSIntervalStateNormal, o.adaptiveUpdateIntervalState.state) + + // Test adjusting configurations manually. + // When the adaptive update interval is not taking effect, the actual used update interval follows the change of + // the configuration immediately. + +} diff --git a/tikv/kv.go b/tikv/kv.go index 7c45137b21..db375ae57f 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -259,7 +259,9 @@ func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvCli // NewKVStore creates a new TiKV store instance. func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { - o, err := oracles.NewPdOracle(pdClient, defaultOracleUpdateInterval) + o, err := oracles.NewPdOracle(pdClient, &oracles.PDOracleOptions{ + UpdateInterval: defaultOracleUpdateInterval, + }) if err != nil { return nil, err } From 59d5568e4a785c51db0ff7f1387fbbc5b652afc2 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 6 Nov 2024 14:51:48 +0800 Subject: [PATCH 04/12] Cover configuration changes Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 4 ++-- oracle/oracles/pd_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 6d4f95296b..08b411ddcc 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -368,10 +368,10 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura } lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) - if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { + if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering && currentAdaptiveInterval != configuredInterval { // There is a recent request that requires a short staleness. Keep the current adaptive interval. - if currentAdaptiveInterval != configuredInterval && o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateAdapting { + if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateAdapting { logutil.Logger(context.Background()).Info("update low resolution ts interval is not recovering as there is a recent read requesting a short staleness", zap.Duration("currentAdaptiveUpdateInterval", currentAdaptiveInterval), zap.Duration("configuredInterval", configuredInterval), diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index fb77dbe7b2..46e3ba4de4 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -298,5 +298,44 @@ func TestNextUpdateTSInterval(t *testing.T) { // Test adjusting configurations manually. // When the adaptive update interval is not taking effect, the actual used update interval follows the change of // the configuration immediately. + err = o.SetLowResolutionTimestampUpdateInterval(time.Second * 1) + assert.NoError(t, err) + assert.Equal(t, time.Second, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, time.Second, o.nextUpdateInterval(now, 0)) + err = o.SetLowResolutionTimestampUpdateInterval(time.Second * 2) + assert.NoError(t, err) + assert.Equal(t, time.Second*2, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, time.Second*2, o.nextUpdateInterval(now, 0)) + + // If the adaptive update interval is taking effect, the configuration change doesn't immediately affect the actual + // update interval. + now = now.Add(time.Second) + o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(time.Second), mockTS(0), now) + mustNotifyShrinking(time.Second) + expectedInterval = time.Second - adaptiveUpdateTSIntervalShrinkingPreserve + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, time.Second)) + assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) + err = o.SetLowResolutionTimestampUpdateInterval(time.Second * 3) + assert.NoError(t, err) + assert.Equal(t, expectedInterval, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, 0)) + err = o.SetLowResolutionTimestampUpdateInterval(time.Second) + assert.NoError(t, err) + assert.Equal(t, expectedInterval, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, 0)) + + // ...unless it's set to a value shorter than the current actual update interval. + err = o.SetLowResolutionTimestampUpdateInterval(time.Millisecond * 800) + assert.NoError(t, err) + assert.Equal(t, time.Millisecond*800, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, time.Millisecond*800, o.nextUpdateInterval(now, 0)) + assert.Equal(t, adaptiveUpdateTSIntervalStateNormal, o.adaptiveUpdateIntervalState.state) + + // If the configured value is too short, the actual update interval won't be adaptive + err = o.SetLowResolutionTimestampUpdateInterval(minAllowedAdaptiveUpdateTSInterval / 2) + assert.NoError(t, err) + assert.Equal(t, minAllowedAdaptiveUpdateTSInterval/2, time.Duration(o.adaptiveLastTSUpdateInterval.Load())) + assert.Equal(t, minAllowedAdaptiveUpdateTSInterval/2, o.nextUpdateInterval(now, 0)) + assert.Equal(t, adaptiveUpdateTSIntervalStateUnadjustable, o.adaptiveUpdateIntervalState.state) } From 19c65465b2943fcfe9419315377076ebd5673a3d Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 6 Nov 2024 17:35:44 +0800 Subject: [PATCH 05/12] Add more tests Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 2 +- oracle/oracles/pd_test.go | 159 +++++++++++++++++++++++++++++++++++++- 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 08b411ddcc..e57b73d3fb 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -612,7 +612,7 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op func (o *pdOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error { latestTS, err := o.GetLowResolutionTimestamp(ctx, opt) - // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check. + // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double-check. // But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function // loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls. if err != nil || readTS > latestTS { diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 46e3ba4de4..23fff989a0 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -180,13 +180,15 @@ func TestNonFutureStaleTSO(t *testing.T) { } } -func TestNextUpdateTSInterval(t *testing.T) { +func TestAdaptiveUpdateTSInterval(t *testing.T) { oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{ UpdateInterval: time.Second * 2, NoUpdateTS: true, }) assert.NoError(t, err) o := oracleInterface.(*pdOracle) + defer o.Close() + now := time.Now() mockTS := func(beforeNow time.Duration) uint64 { @@ -339,3 +341,158 @@ func TestNextUpdateTSInterval(t *testing.T) { assert.Equal(t, minAllowedAdaptiveUpdateTSInterval/2, o.nextUpdateInterval(now, 0)) assert.Equal(t, adaptiveUpdateTSIntervalStateUnadjustable, o.adaptiveUpdateIntervalState.state) } + +func TestValidateSnapshotReadTS(t *testing.T) { + pdClient := MockPdClient{} + o, err := NewPdOracle(&pdClient, &PDOracleOptions{ + UpdateInterval: time.Second * 2, + }) + assert.NoError(t, err) + defer o.Close() + + ctx := context.Background() + opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} + ts, err := o.GetTimestamp(ctx, opt) + assert.NoError(t, err) + assert.GreaterOrEqual(t, ts, uint64(1)) + + err = o.ValidateSnapshotReadTS(ctx, 1, opt) + assert.NoError(t, err) + ts, err = o.GetTimestamp(ctx, opt) + assert.NoError(t, err) + // The readTS exceeds the latest ts, so it first fails the check with the low resolution ts. Then it fallbacks to + // the fetching-from-PD path, and it can get the previous ts + 1, which can allow this validation to pass. + err = o.ValidateSnapshotReadTS(ctx, ts+1, opt) + assert.NoError(t, err) + // It can't pass if the readTS is newer than previous ts + 2. + ts, err = o.GetTimestamp(ctx, opt) + assert.NoError(t, err) + err = o.ValidateSnapshotReadTS(ctx, ts+2, opt) + assert.Error(t, err) + + // Simulate other PD clients requests a timestamp. + ts, err = o.GetTimestamp(ctx, opt) + assert.NoError(t, err) + pdClient.logicalTimestamp.Add(2) + err = o.ValidateSnapshotReadTS(ctx, ts+3, opt) + assert.NoError(t, err) +} + +type MockPDClientWithPause struct { + MockPdClient + mu sync.Mutex +} + +func (c *MockPDClientWithPause) GetTS(ctx context.Context) (int64, int64, error) { + c.mu.Lock() + defer c.mu.Unlock() + return c.MockPdClient.GetTS(ctx) +} + +func (c *MockPDClientWithPause) Pause() { + c.mu.Lock() +} + +func (c *MockPDClientWithPause) Resume() { + c.mu.Unlock() +} + +func TestValidateSnapshotReadTSReusingGetTSResult(t *testing.T) { + pdClient := &MockPDClientWithPause{} + o, err := NewPdOracle(pdClient, &PDOracleOptions{ + UpdateInterval: time.Second * 2, + NoUpdateTS: true, + }) + assert.NoError(t, err) + defer o.Close() + + asyncValidate := func(ctx context.Context, readTS uint64) chan error { + ch := make(chan error, 1) + go func() { + err := o.ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ch <- err + }() + return ch + } + + noResult := func(ch chan error) { + select { + case <-ch: + assert.FailNow(t, "a ValidateSnapshotReadTS operation is not blocked while it's expected to be blocked") + default: + } + } + + cancelIndices := []int{-1, -1, 0, 1} + for i, ts := range []uint64{100, 200, 300, 400} { + // Note: the ts is the result that the next GetTS will return. Any validation with readTS <= ts should pass, otherwise fail. + + // We will cancel the cancelIndex-th validation call. This is for testing that canceling some of the calls + // doesn't affect other calls that are waiting + cancelIndex := cancelIndices[i] + + pdClient.Pause() + + results := make([]chan error, 0, 5) + + ctx, cancel := context.WithCancel(context.Background()) + + getCtx := func(index int) context.Context { + if cancelIndex == index { + return ctx + } else { + return context.Background() + } + } + + results = append(results, asyncValidate(getCtx(0), ts-2)) + results = append(results, asyncValidate(getCtx(1), ts+2)) + results = append(results, asyncValidate(getCtx(2), ts-1)) + results = append(results, asyncValidate(getCtx(3), ts+1)) + results = append(results, asyncValidate(getCtx(4), ts)) + + expectedSucceeds := []bool{true, false, true, false, true} + + time.Sleep(time.Millisecond * 50) + for _, ch := range results { + noResult(ch) + } + + cancel() + + for i, ch := range results { + if i == cancelIndex { + select { + case err := <-ch: + assert.Errorf(t, err, "index: %v", i) + assert.Containsf(t, err.Error(), "context canceled", "index: %v", i) + case <-time.After(time.Second): + assert.FailNowf(t, "expected result to be ready but still blocked", "index: %v", i) + } + } else { + noResult(ch) + } + } + + // ts will be the next ts returned to these validation calls. + pdClient.logicalTimestamp.Store(int64(ts - 1)) + pdClient.Resume() + for i, ch := range results { + if i == cancelIndex { + continue + } + + select { + case err = <-ch: + case <-time.After(time.Second): + assert.FailNowf(t, "expected result to be ready but still blocked", "index: %v", i) + } + if expectedSucceeds[i] { + assert.NoErrorf(t, err, "index: %v", i) + } else { + assert.Errorf(t, err, "index: %v", i) + assert.NotContainsf(t, err.Error(), "context canceled", "index: %v", i) + } + } + } +} From 5a945e9e4e6c8c083ba67a3786ddd048f8157790 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 6 Nov 2024 17:43:37 +0800 Subject: [PATCH 06/12] fix lint Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index e57b73d3fb..1ceb6229d4 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -37,7 +37,6 @@ package oracles import ( "context" "fmt" - "golang.org/x/sync/singleflight" "strings" "sync" "sync/atomic" @@ -49,6 +48,7 @@ import ( "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/sync/singleflight" ) var _ oracle.Oracle = &pdOracle{} From 7e9bae9c3f781c7ad22fe1be9c5b034556a694c1 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 6 Nov 2024 17:50:07 +0800 Subject: [PATCH 07/12] fix lint Signed-off-by: MyonKeminta --- oracle/oracles/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle/oracles/local.go b/oracle/oracles/local.go index 851a38f46a..bf9b43c892 100644 --- a/oracle/oracles/local.go +++ b/oracle/oracles/local.go @@ -36,10 +36,10 @@ package oracles import ( "context" - "github.com/pingcap/errors" "sync" "time" + "github.com/pingcap/errors" "github.com/tikv/client-go/v2/oracle" ) From aea7b19308159d023ee66f1b27f63d55bce32c8f Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 7 Nov 2024 13:43:13 +0800 Subject: [PATCH 08/12] Adderss comments Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 48 +++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 1ceb6229d4..d899075146 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -58,9 +58,21 @@ const slowDist = 30 * time.Millisecond type adaptiveUpdateTSIntervalState int const ( + // adaptiveUpdateTSIntervalStateNormal represents the state that the adaptive update ts interval is synced with the + // configuration without performing any automatic adjustment. adaptiveUpdateTSIntervalStateNormal adaptiveUpdateTSIntervalState = iota + // adaptiveUpdateTSIntervalStateAdapting represents the state that as there are recently some stale read / snapshot + // read operations requesting a short staleness (now - readTS is nearly or exceeds the current update interval), + // so that we automatically shrink the update interval. Otherwise, read operations may don't have low resolution ts + // that is new enough for checking the legality of the read ts, causing them have to fetch the latest ts from PD, + // which is time-consuming. adaptiveUpdateTSIntervalStateAdapting + // adaptiveUpdateTSIntervalStateRecovering represents the state that the update ts interval have once been shrunk, + // to adapt to reads with short staleness, but there isn't any such read operations for a while, so that we + // gradually recover the update interval to the configured value. adaptiveUpdateTSIntervalStateRecovering + // adaptiveUpdateTSIntervalStateUnadjustable represents the state that the user has configured a very short update + // interval, so that we don't have any space to automatically adjust it. adaptiveUpdateTSIntervalStateUnadjustable ) @@ -80,10 +92,21 @@ func (s adaptiveUpdateTSIntervalState) String() string { } const ( - minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond - adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond + // minAllowedAdaptiveUpdateTSInterval is the lower bound of the adaptive update ts interval for avoiding an abnormal + // read operation causing the update interval to be too short. + minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond + // adaptiveUpdateTSIntervalShrinkingPreserve is the duration that we additionally shrinks when adapting to a read + // operation that requires a short staleness. + adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond + // adaptiveUpdateTSIntervalBlockRecoverThreshold is the threshold of the difference between the current update + // interval and the staleness the read operation request to prevent the update interval from recovering back to + // normal. adaptiveUpdateTSIntervalBlockRecoverThreshold = 200 * time.Millisecond - adaptiveUpdateTSIntervalRecoverPerSecond = 20 * time.Millisecond + // adaptiveUpdateTSIntervalRecoverPerSecond is the duration that the update interval should grow per second when + // recovering to normal state from adapting state. + adaptiveUpdateTSIntervalRecoverPerSecond = 20 * time.Millisecond + // adaptiveUpdateTSIntervalDelayBeforeRecovering is the duration that we should hold the current adaptive update + // interval before turning back to normal state. adaptiveUpdateTSIntervalDelayBeforeRecovering = 5 * time.Minute ) @@ -93,7 +116,8 @@ type pdOracle struct { // txn_scope (string) -> lastTSPointer (*atomic.Pointer[lastTSO]) lastTSMap sync.Map quit chan struct{} - // The configured interval to update the low resolution ts. + // The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval. + // For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`. lastTSUpdateInterval atomic.Int64 // The actual interval to update the low resolution ts. If the configured one is too large to satisfy the // requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter @@ -368,9 +392,10 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura } lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) - if now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering && currentAdaptiveInterval != configuredInterval { + if currentAdaptiveInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { // There is a recent request that requires a short staleness. Keep the current adaptive interval. - + // If it's not adapting state, it's possible that it's previously in recovering state, and it stops recovering + // as there is a new read operation requesting a short staleness. if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateAdapting { logutil.Logger(context.Background()).Info("update low resolution ts interval is not recovering as there is a recent read requesting a short staleness", zap.Duration("currentAdaptiveUpdateInterval", currentAdaptiveInterval), @@ -420,7 +445,7 @@ func (o *pdOracle) updateTS(ctx context.Context) { ticker := time.NewTicker(currentInterval) defer ticker.Stop() - doUpdate := func() { + doUpdate := func(now time.Time) { // Update the timestamp for each txnScope o.lastTSMap.Range(func(key, _ interface{}) bool { txnScope := key.(string) @@ -432,12 +457,14 @@ func (o *pdOracle) updateTS(ctx context.Context) { o.setLastTS(ts, txnScope) return true }) + + o.adaptiveUpdateIntervalState.lastTick = now } for { select { case now := <-ticker.C: - doUpdate() + doUpdate(now) newInterval := o.nextUpdateInterval(now, 0) if newInterval != currentInterval { @@ -445,8 +472,6 @@ func (o *pdOracle) updateTS(ctx context.Context) { ticker.Reset(currentInterval) } - o.adaptiveUpdateIntervalState.lastTick = now - case requiredStaleness := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh: now := time.Now() newInterval := o.nextUpdateInterval(now, requiredStaleness) @@ -454,8 +479,7 @@ func (o *pdOracle) updateTS(ctx context.Context) { currentInterval = newInterval if time.Since(o.adaptiveUpdateIntervalState.lastTick) >= currentInterval { - doUpdate() - o.adaptiveUpdateIntervalState.lastTick = time.Now() + doUpdate(time.Now()) } ticker.Reset(currentInterval) From 96c60076dc17b882255356e78d48d22b45a74b44 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 8 Nov 2024 15:11:37 +0800 Subject: [PATCH 09/12] clear the code of nextInterval Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 164 +++++++++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 67 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index d899075146..324b10a1bc 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -58,9 +58,10 @@ const slowDist = 30 * time.Millisecond type adaptiveUpdateTSIntervalState int const ( + adaptiveUpdateTSIntervalStateNone adaptiveUpdateTSIntervalState = iota // adaptiveUpdateTSIntervalStateNormal represents the state that the adaptive update ts interval is synced with the // configuration without performing any automatic adjustment. - adaptiveUpdateTSIntervalStateNormal adaptiveUpdateTSIntervalState = iota + adaptiveUpdateTSIntervalStateNormal // adaptiveUpdateTSIntervalStateAdapting represents the state that as there are recently some stale read / snapshot // read operations requesting a short staleness (now - readTS is nearly or exceeds the current update interval), // so that we automatically shrink the update interval. Otherwise, read operations may don't have low resolution ts @@ -87,7 +88,7 @@ func (s adaptiveUpdateTSIntervalState) String() string { case adaptiveUpdateTSIntervalStateUnadjustable: return "unadjustable" default: - return "unknown" + return fmt.Sprintf("unknown(%v)", int(s)) } } @@ -344,99 +345,123 @@ func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { return last, true } -// nextUpdateInterval calculates the next interval to update the low resolution ts. When this is triggered by timer -// tick, requiredStaleness should be 0. This function may also be called in case there is a stale read / snapshot read -// requesting a short staleness (i.e., the read operation specifies the version it wants to read instead of allocating -// from PD, and the required version is very close to the current time). In this case, the requested staleness should be -// passed to this function as the requiredStaleness parameter. func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration { o.adaptiveUpdateIntervalState.mu.Lock() defer o.adaptiveUpdateIntervalState.mu.Unlock() configuredInterval := time.Duration(o.lastTSUpdateInterval.Load()) - currentAdaptiveInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) - if configuredInterval <= minAllowedAdaptiveUpdateTSInterval { + prevAdaptiveInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) + lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + + currentAdaptiveInterval := prevAdaptiveInterval + + // Shortcut + const none = adaptiveUpdateTSIntervalStateNone + + // The following `checkX` functions checks whether it should transit to the X state. Returns + // a tuple representing (state, newInterval). + // When `checkX` returns a valid state, it means that the current situation matches the state. In this case, it + // also returns the new interval that should be used next. + // When it returns `none`, we need to check if it should transit to other states. For each call to + // nextUpdateInterval, if all attempts to `checkX` function returns false, it keeps the previous state unchanged. + + checkUnadjustable := func() (adaptiveUpdateTSIntervalState, time.Duration) { // If the user has configured a very short interval, we don't have any space to adjust it. Just use // the user's configured value directly. - if currentAdaptiveInterval != configuredInterval { - o.adaptiveLastTSUpdateInterval.Store(int64(configuredInterval)) + if configuredInterval <= minAllowedAdaptiveUpdateTSInterval { + return adaptiveUpdateTSIntervalStateUnadjustable, configuredInterval } - if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateUnadjustable { - logutil.Logger(context.Background()).Info("update low resolution ts interval is not being adaptive because the configured interval is too short", - zap.Duration("configuredInterval", configuredInterval), - zap.Stringer("state", o.adaptiveUpdateIntervalState.state), - zap.Stringer("newState", adaptiveUpdateTSIntervalStateUnadjustable)) - o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateUnadjustable + return none, 0 + } + + checkNormal := func() (adaptiveUpdateTSIntervalState, time.Duration) { + // If the current actual update interval is synced with the configured value, and it's not unadjustable state, + // then it's the normal state. + if configuredInterval > minAllowedAdaptiveUpdateTSInterval && currentAdaptiveInterval == configuredInterval { + return adaptiveUpdateTSIntervalStateNormal, currentAdaptiveInterval } - return configuredInterval + return none, 0 } - if requiredStaleness != 0 { - if requiredStaleness < currentAdaptiveInterval && currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { + checkAdapting := func() (adaptiveUpdateTSIntervalState, time.Duration) { + if requiredStaleness != 0 && requiredStaleness < currentAdaptiveInterval && currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { // If we are calculating the interval because of a request that requires a shorter staleness, we shrink the // update interval immediately to adapt to it. // We shrink the update interval to a value slightly lower than the requested staleness to avoid potential // frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently. - prevAdaptiveInterval := currentAdaptiveInterval - currentAdaptiveInterval = max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) - o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveInterval)) - logutil.Logger(context.Background()).Info("shrink low resolution ts update interval immediately", - zap.Duration("requestedStaleness", requiredStaleness), - zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveInterval), - zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveInterval), - zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), - zap.Stringer("newState", adaptiveUpdateTSIntervalStateAdapting)) - o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateAdapting + newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval) + return adaptiveUpdateTSIntervalStateAdapting, newInterval } - return currentAdaptiveInterval - } - lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) - if currentAdaptiveInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { - // There is a recent request that requires a short staleness. Keep the current adaptive interval. - // If it's not adapting state, it's possible that it's previously in recovering state, and it stops recovering - // as there is a new read operation requesting a short staleness. - if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateAdapting { - logutil.Logger(context.Background()).Info("update low resolution ts interval is not recovering as there is a recent read requesting a short staleness", - zap.Duration("currentAdaptiveUpdateInterval", currentAdaptiveInterval), - zap.Duration("configuredInterval", configuredInterval), - zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), - zap.Stringer("newState", adaptiveUpdateTSIntervalStateAdapting), - zap.Time("recentRequestExceedingThreshold", lastReachDropThresholdTime)) - o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateAdapting + if currentAdaptiveInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { + // There is a recent request that requires a short staleness. Keep the current adaptive interval. + // If it's not adapting state, it's possible that it's previously in recovering state, and it stops recovering + // as there is a new read operation requesting a short staleness. + return adaptiveUpdateTSIntervalStateAdapting, currentAdaptiveInterval } - return currentAdaptiveInterval + return none, 0 } - timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick) - newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond)) - if newInterval > configuredInterval { - newInterval = configuredInterval - } + checkRecovering := func() (adaptiveUpdateTSIntervalState, time.Duration) { + if currentAdaptiveInterval == configuredInterval || now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { + return none, 0 + } - if newInterval != currentAdaptiveInterval { - o.adaptiveLastTSUpdateInterval.Store(int64(newInterval)) - if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateRecovering { - logutil.Logger(context.Background()).Info("update low resolution ts interval is recovering", - zap.Duration("currentAdaptiveUpdateInterval", newInterval), - zap.Duration("configuredInterval", configuredInterval), - zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), - zap.Stringer("newState", adaptiveUpdateTSIntervalStateRecovering)) - o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateRecovering + timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick) + newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond)) + if newInterval > configuredInterval { + newInterval = configuredInterval } + + return adaptiveUpdateTSIntervalStateRecovering, newInterval } - if newInterval == configuredInterval { - if o.adaptiveUpdateIntervalState.state != adaptiveUpdateTSIntervalStateNormal { - logutil.Logger(context.Background()).Info("update low resolution ts interval is now synced with the configuration", - zap.Duration("updateInterval", currentAdaptiveInterval), - zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), - zap.Stringer("newState", adaptiveUpdateTSIntervalStateNormal)) - o.adaptiveUpdateIntervalState.state = adaptiveUpdateTSIntervalStateNormal + // Check the specified states in order, until the state becomes determined. + // If it's still undetermined after all checks, keep the previous state. + nextState := func(checkFuncs ...func() (adaptiveUpdateTSIntervalState, time.Duration)) time.Duration { + for _, f := range checkFuncs { + state, newInterval := f() + if state == none { + continue + } + + currentAdaptiveInterval = newInterval + + // If the final state is the recovering state, do an additional step to check whether it can go back to + // normal state immediately. + if state == adaptiveUpdateTSIntervalStateRecovering { + var nextState adaptiveUpdateTSIntervalState + nextState, newInterval = checkNormal() + if nextState != none { + state = nextState + currentAdaptiveInterval = newInterval + } + } + + o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveInterval)) + if o.adaptiveUpdateIntervalState.state != state { + logutil.Logger(context.Background()).Info("adaptive update ts interval state transition", + zap.Duration("configuredInterval", configuredInterval), + zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveInterval), + zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveInterval), + zap.Duration("requiredStaleness", requiredStaleness), + zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), + zap.Stringer("newState", state)) + o.adaptiveUpdateIntervalState.state = state + } + + return currentAdaptiveInterval } + return currentAdaptiveInterval } + var newInterval time.Duration + if requiredStaleness != 0 { + newInterval = nextState(checkUnadjustable, checkAdapting) + } else { + newInterval = nextState(checkUnadjustable, checkAdapting, checkNormal, checkRecovering) + } return newInterval } @@ -680,6 +705,11 @@ func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(rea } if requiredStaleness <= currentUpdateInterval && currentUpdateInterval > minAllowedAdaptiveUpdateTSInterval { + // Considering system time / PD time drifts, it's possible that we get a non-positive value from the + // calculation. Make sure it's always positive before passing it to the updateTS goroutine. + // Note that `nextUpdateInterval` method expects the requiredStaleness is always non-zero when triggerred + // by this path. + requiredStaleness = max(requiredStaleness, time.Millisecond) // Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is // busy, it means that there's another concurrent call trying to update it. Just skip it in this case. select { From 0e9c8b8ef99f791de8910a1dbca3ef543eb771b8 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 8 Nov 2024 15:38:43 +0800 Subject: [PATCH 10/12] Address comments Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 44 +++++++++++++++++++-------------------- oracle/oracles/pd_test.go | 4 ++-- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 324b10a1bc..aa07c90568 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -134,7 +134,7 @@ type pdOracle struct { // the current adaptive update interval. If there is such a request recently, the adaptive interval // should avoid falling back to the original (configured) value. // Stored in unix microseconds to make it able to be accessed atomically. - lastReachDropThresholdTime atomic.Int64 + lastShortStalenessReadTime atomic.Int64 // When someone requests need shrinking the update interval immediately, it sends the duration it expects to // this channel. shrinkIntervalCh chan time.Duration @@ -350,10 +350,10 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura defer o.adaptiveUpdateIntervalState.mu.Unlock() configuredInterval := time.Duration(o.lastTSUpdateInterval.Load()) - prevAdaptiveInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) - lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + prevAdaptiveUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load()) + lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load()) - currentAdaptiveInterval := prevAdaptiveInterval + currentAdaptiveUpdateInterval := prevAdaptiveUpdateInterval // Shortcut const none = adaptiveUpdateTSIntervalStateNone @@ -377,14 +377,14 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura checkNormal := func() (adaptiveUpdateTSIntervalState, time.Duration) { // If the current actual update interval is synced with the configured value, and it's not unadjustable state, // then it's the normal state. - if configuredInterval > minAllowedAdaptiveUpdateTSInterval && currentAdaptiveInterval == configuredInterval { - return adaptiveUpdateTSIntervalStateNormal, currentAdaptiveInterval + if configuredInterval > minAllowedAdaptiveUpdateTSInterval && currentAdaptiveUpdateInterval == configuredInterval { + return adaptiveUpdateTSIntervalStateNormal, currentAdaptiveUpdateInterval } return none, 0 } checkAdapting := func() (adaptiveUpdateTSIntervalState, time.Duration) { - if requiredStaleness != 0 && requiredStaleness < currentAdaptiveInterval && currentAdaptiveInterval > minAllowedAdaptiveUpdateTSInterval { + if requiredStaleness != 0 && requiredStaleness < currentAdaptiveUpdateInterval && currentAdaptiveUpdateInterval > minAllowedAdaptiveUpdateTSInterval { // If we are calculating the interval because of a request that requires a shorter staleness, we shrink the // update interval immediately to adapt to it. // We shrink the update interval to a value slightly lower than the requested staleness to avoid potential @@ -393,23 +393,23 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura return adaptiveUpdateTSIntervalStateAdapting, newInterval } - if currentAdaptiveInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { + if currentAdaptiveUpdateInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { // There is a recent request that requires a short staleness. Keep the current adaptive interval. // If it's not adapting state, it's possible that it's previously in recovering state, and it stops recovering // as there is a new read operation requesting a short staleness. - return adaptiveUpdateTSIntervalStateAdapting, currentAdaptiveInterval + return adaptiveUpdateTSIntervalStateAdapting, currentAdaptiveUpdateInterval } return none, 0 } checkRecovering := func() (adaptiveUpdateTSIntervalState, time.Duration) { - if currentAdaptiveInterval == configuredInterval || now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { + if currentAdaptiveUpdateInterval == configuredInterval || now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering { return none, 0 } timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick) - newInterval := currentAdaptiveInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond)) + newInterval := currentAdaptiveUpdateInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond)) if newInterval > configuredInterval { newInterval = configuredInterval } @@ -426,7 +426,7 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura continue } - currentAdaptiveInterval = newInterval + currentAdaptiveUpdateInterval = newInterval // If the final state is the recovering state, do an additional step to check whether it can go back to // normal state immediately. @@ -435,25 +435,25 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura nextState, newInterval = checkNormal() if nextState != none { state = nextState - currentAdaptiveInterval = newInterval + currentAdaptiveUpdateInterval = newInterval } } - o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveInterval)) + o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveUpdateInterval)) if o.adaptiveUpdateIntervalState.state != state { - logutil.Logger(context.Background()).Info("adaptive update ts interval state transition", + logutil.BgLogger().Info("adaptive update ts interval state transition", zap.Duration("configuredInterval", configuredInterval), - zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveInterval), - zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveInterval), + zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveUpdateInterval), + zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveUpdateInterval), zap.Duration("requiredStaleness", requiredStaleness), zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state), zap.Stringer("newState", state)) o.adaptiveUpdateIntervalState.state = state } - return currentAdaptiveInterval + return currentAdaptiveUpdateInterval } - return currentAdaptiveInterval + return currentAdaptiveUpdateInterval } var newInterval time.Duration @@ -644,8 +644,6 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op // waiting for reusing the same result should not be canceled. So pass context.Background() instead of the // current ctx. res, err := o.GetTimestamp(context.Background(), opt) - // After finishing the current call, allow the next call to trigger fetching a new TS. - o.tsForValidation.Forget(opt.TxnScope) return res, err }) select { @@ -696,11 +694,11 @@ func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(rea // Record the most recent time when there's a read operation requesting the staleness close enough to the // current update interval. nowMillis := now.UnixMilli() - last := o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load() + last := o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load() if last < nowMillis { // Do not retry if the CAS fails (which may happen when there are other goroutines updating it // concurrently), as we don't actually need to set it strictly. - o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.CompareAndSwap(last, nowMillis) + o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.CompareAndSwap(last, nowMillis) } } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 23fff989a0..b1fa6d3e33 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -232,13 +232,13 @@ func TestAdaptiveUpdateTSInterval(t *testing.T) { expectedInterval := time.Second - adaptiveUpdateTSIntervalShrinkingPreserve assert.Equal(t, expectedInterval, o.nextUpdateInterval(now, time.Second)) assert.Equal(t, adaptiveUpdateTSIntervalStateAdapting, o.adaptiveUpdateIntervalState.state) - assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load()) // Let read with short staleness continue happening. now = now.Add(adaptiveUpdateTSIntervalDelayBeforeRecovering / 2) o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(mockTS(time.Second), mockTS(0), now) mustNoNotify() - assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastReachDropThresholdTime.Load()) + assert.Equal(t, now.UnixMilli(), o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load()) // The adaptiveUpdateTSIntervalDelayBeforeRecovering has not been elapsed since the last time there is a read with short // staleness. The update interval won't start being reset at this time. From 2fef49a224142c84f6f0635d06b8dc0b74634888 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 8 Nov 2024 16:02:47 +0800 Subject: [PATCH 11/12] Add metrics Signed-off-by: MyonKeminta --- metrics/metrics.go | 168 ++++++++++++++++++++++++------------------- oracle/oracles/pd.go | 5 +- 2 files changed, 98 insertions(+), 75 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index e608d800e7..ce247600ce 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -41,80 +41,82 @@ import ( // Client metrics. var ( - TiKVTxnCmdHistogram *prometheus.HistogramVec - TiKVBackoffHistogram *prometheus.HistogramVec - TiKVSendReqHistogram *prometheus.HistogramVec - TiKVSendReqCounter *prometheus.CounterVec - TiKVSendReqTimeCounter *prometheus.CounterVec - TiKVRPCNetLatencyHistogram *prometheus.HistogramVec - TiKVCoprocessorHistogram *prometheus.HistogramVec - TiKVLockResolverCounter *prometheus.CounterVec - TiKVRegionErrorCounter *prometheus.CounterVec - TiKVRPCErrorCounter *prometheus.CounterVec - TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec - TiKVTxnWriteSizeHistogram *prometheus.HistogramVec - TiKVRawkvCmdHistogram *prometheus.HistogramVec - TiKVRawkvSizeHistogram *prometheus.HistogramVec - TiKVTxnRegionsNumHistogram *prometheus.HistogramVec - TiKVLoadSafepointCounter *prometheus.CounterVec - TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec - TiKVRegionCacheCounter *prometheus.CounterVec - TiKVLoadRegionCounter *prometheus.CounterVec - TiKVLoadRegionCacheHistogram *prometheus.HistogramVec - TiKVLocalLatchWaitTimeHistogram prometheus.Histogram - TiKVStatusDuration *prometheus.HistogramVec - TiKVStatusCounter *prometheus.CounterVec - TiKVBatchSendTailLatency prometheus.Histogram - TiKVBatchSendLoopDuration *prometheus.SummaryVec - TiKVBatchRecvLoopDuration *prometheus.SummaryVec - TiKVBatchHeadArrivalInterval *prometheus.SummaryVec - TiKVBatchBestSize *prometheus.SummaryVec - TiKVBatchMoreRequests *prometheus.SummaryVec - TiKVBatchWaitOverLoad prometheus.Counter - TiKVBatchPendingRequests *prometheus.HistogramVec - TiKVBatchRequests *prometheus.HistogramVec - TiKVBatchRequestDuration *prometheus.SummaryVec - TiKVBatchClientUnavailable prometheus.Histogram - TiKVBatchClientWaitEstablish prometheus.Histogram - TiKVBatchClientRecycle prometheus.Histogram - TiKVRangeTaskStats *prometheus.GaugeVec - TiKVRangeTaskPushDuration *prometheus.HistogramVec - TiKVTokenWaitDuration prometheus.Histogram - TiKVTxnHeartBeatHistogram *prometheus.HistogramVec - TiKVTTLManagerHistogram prometheus.Histogram - TiKVPessimisticLockKeysDuration prometheus.Histogram - TiKVTTLLifeTimeReachCounter prometheus.Counter - TiKVNoAvailableConnectionCounter prometheus.Counter - TiKVTwoPCTxnCounter *prometheus.CounterVec - TiKVAsyncCommitTxnCounter *prometheus.CounterVec - TiKVOnePCTxnCounter *prometheus.CounterVec - TiKVStoreLimitErrorCounter *prometheus.CounterVec - TiKVGRPCConnTransientFailureCounter *prometheus.CounterVec - TiKVPanicCounter *prometheus.CounterVec - TiKVForwardRequestCounter *prometheus.CounterVec - TiKVTSFutureWaitDuration prometheus.Histogram - TiKVSafeTSUpdateCounter *prometheus.CounterVec - TiKVMinSafeTSGapSeconds *prometheus.GaugeVec - TiKVReplicaSelectorFailureCounter *prometheus.CounterVec - TiKVRequestRetryTimesHistogram prometheus.Histogram - TiKVTxnCommitBackoffSeconds prometheus.Histogram - TiKVTxnCommitBackoffCount prometheus.Histogram - TiKVSmallReadDuration prometheus.Histogram - TiKVReadThroughput prometheus.Histogram - TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec - TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec - TiKVGrpcConnectionState *prometheus.GaugeVec - TiKVAggressiveLockedKeysCounter *prometheus.CounterVec - TiKVStoreSlowScoreGauge *prometheus.GaugeVec - TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec - TiKVHealthFeedbackOpsCounter *prometheus.CounterVec - TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec - TiKVStaleReadCounter *prometheus.CounterVec - TiKVStaleReadReqCounter *prometheus.CounterVec - TiKVStaleReadBytes *prometheus.CounterVec - TiKVPipelinedFlushLenHistogram prometheus.Histogram - TiKVPipelinedFlushSizeHistogram prometheus.Histogram - TiKVPipelinedFlushDuration prometheus.Histogram + TiKVTxnCmdHistogram *prometheus.HistogramVec + TiKVBackoffHistogram *prometheus.HistogramVec + TiKVSendReqHistogram *prometheus.HistogramVec + TiKVSendReqCounter *prometheus.CounterVec + TiKVSendReqTimeCounter *prometheus.CounterVec + TiKVRPCNetLatencyHistogram *prometheus.HistogramVec + TiKVCoprocessorHistogram *prometheus.HistogramVec + TiKVLockResolverCounter *prometheus.CounterVec + TiKVRegionErrorCounter *prometheus.CounterVec + TiKVRPCErrorCounter *prometheus.CounterVec + TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec + TiKVTxnWriteSizeHistogram *prometheus.HistogramVec + TiKVRawkvCmdHistogram *prometheus.HistogramVec + TiKVRawkvSizeHistogram *prometheus.HistogramVec + TiKVTxnRegionsNumHistogram *prometheus.HistogramVec + TiKVLoadSafepointCounter *prometheus.CounterVec + TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec + TiKVRegionCacheCounter *prometheus.CounterVec + TiKVLoadRegionCounter *prometheus.CounterVec + TiKVLoadRegionCacheHistogram *prometheus.HistogramVec + TiKVLocalLatchWaitTimeHistogram prometheus.Histogram + TiKVStatusDuration *prometheus.HistogramVec + TiKVStatusCounter *prometheus.CounterVec + TiKVBatchSendTailLatency prometheus.Histogram + TiKVBatchSendLoopDuration *prometheus.SummaryVec + TiKVBatchRecvLoopDuration *prometheus.SummaryVec + TiKVBatchHeadArrivalInterval *prometheus.SummaryVec + TiKVBatchBestSize *prometheus.SummaryVec + TiKVBatchMoreRequests *prometheus.SummaryVec + TiKVBatchWaitOverLoad prometheus.Counter + TiKVBatchPendingRequests *prometheus.HistogramVec + TiKVBatchRequests *prometheus.HistogramVec + TiKVBatchRequestDuration *prometheus.SummaryVec + TiKVBatchClientUnavailable prometheus.Histogram + TiKVBatchClientWaitEstablish prometheus.Histogram + TiKVBatchClientRecycle prometheus.Histogram + TiKVRangeTaskStats *prometheus.GaugeVec + TiKVRangeTaskPushDuration *prometheus.HistogramVec + TiKVTokenWaitDuration prometheus.Histogram + TiKVTxnHeartBeatHistogram *prometheus.HistogramVec + TiKVTTLManagerHistogram prometheus.Histogram + TiKVPessimisticLockKeysDuration prometheus.Histogram + TiKVTTLLifeTimeReachCounter prometheus.Counter + TiKVNoAvailableConnectionCounter prometheus.Counter + TiKVTwoPCTxnCounter *prometheus.CounterVec + TiKVAsyncCommitTxnCounter *prometheus.CounterVec + TiKVOnePCTxnCounter *prometheus.CounterVec + TiKVStoreLimitErrorCounter *prometheus.CounterVec + TiKVGRPCConnTransientFailureCounter *prometheus.CounterVec + TiKVPanicCounter *prometheus.CounterVec + TiKVForwardRequestCounter *prometheus.CounterVec + TiKVTSFutureWaitDuration prometheus.Histogram + TiKVSafeTSUpdateCounter *prometheus.CounterVec + TiKVMinSafeTSGapSeconds *prometheus.GaugeVec + TiKVReplicaSelectorFailureCounter *prometheus.CounterVec + TiKVRequestRetryTimesHistogram prometheus.Histogram + TiKVTxnCommitBackoffSeconds prometheus.Histogram + TiKVTxnCommitBackoffCount prometheus.Histogram + TiKVSmallReadDuration prometheus.Histogram + TiKVReadThroughput prometheus.Histogram + TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec + TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec + TiKVGrpcConnectionState *prometheus.GaugeVec + TiKVAggressiveLockedKeysCounter *prometheus.CounterVec + TiKVStoreSlowScoreGauge *prometheus.GaugeVec + TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec + TiKVHealthFeedbackOpsCounter *prometheus.CounterVec + TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec + TiKVStaleReadCounter *prometheus.CounterVec + TiKVStaleReadReqCounter *prometheus.CounterVec + TiKVStaleReadBytes *prometheus.CounterVec + TiKVPipelinedFlushLenHistogram prometheus.Histogram + TiKVPipelinedFlushSizeHistogram prometheus.Histogram + TiKVPipelinedFlushDuration prometheus.Histogram + TiKVValidateReadTSFromPDCount prometheus.Counter + TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge ) // Label constants. @@ -834,6 +836,22 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h }) + TiKVValidateReadTSFromPDCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "validate_read_ts_from_pd_count", + Help: "Counter of validating read ts by getting a timestamp from PD", + }) + + TiKVLowResolutionTSOUpdateIntervalSecondsGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "low_resolution_tso_update_interval_seconds", + Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.", + }) + initShortcuts() } @@ -928,6 +946,8 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPipelinedFlushLenHistogram) prometheus.MustRegister(TiKVPipelinedFlushSizeHistogram) prometheus.MustRegister(TiKVPipelinedFlushDuration) + prometheus.MustRegister(TiKVValidateReadTSFromPDCount) + prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge) } // readCounter reads the value of a prometheus.Counter. diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index aa07c90568..d879bda1c7 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -462,6 +462,9 @@ func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Dura } else { newInterval = nextState(checkUnadjustable, checkAdapting, checkNormal, checkRecovering) } + + metrics.TiKVLowResolutionTSOUpdateIntervalSecondsGauge.Set(newInterval.Seconds()) + return newInterval } @@ -638,7 +641,7 @@ func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) { func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Option) (uint64, error) { ch := o.tsForValidation.DoChan(opt.TxnScope, func() (interface{}, error) { - //metrics.ValidateReadTSFromPDCount.Inc() + metrics.TiKVValidateReadTSFromPDCount.Inc() // If the call that triggers the execution of this function is canceled by the context, other calls that are // waiting for reusing the same result should not be canceled. So pass context.Background() instead of the From 385419dd50779e6be7f14f19aa2980b36676e28d Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 8 Nov 2024 16:55:33 +0800 Subject: [PATCH 12/12] Adjust comments Signed-off-by: MyonKeminta --- oracle/oracles/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index d879bda1c7..3c4cb02fab 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -141,7 +141,7 @@ type pdOracle struct { // Only accessed in updateTS goroutine. No need to use atomic value. lastTick time.Time - // Represents a description about the current state. For logging and diagnosing purposes. + // Represents a description about the current state. state adaptiveUpdateTSIntervalState }