Skip to content

Commit

Permalink
Support adaptive update interval for low resolution ts (tikv#1484)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Dec 11, 2024
1 parent 08d0b3b commit ca20e3a
Show file tree
Hide file tree
Showing 7 changed files with 836 additions and 109 deletions.
158 changes: 89 additions & 69 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,75 +41,77 @@ import (

// Client metrics.
var (
TiKVTxnCmdHistogram *prometheus.HistogramVec
TiKVBackoffHistogram *prometheus.HistogramVec
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
TiKVRPCErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
TiKVLoadSafepointCounter *prometheus.CounterVec
TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec
TiKVRegionCacheCounter *prometheus.CounterVec
TiKVLoadRegionCounter *prometheus.CounterVec
TiKVLoadRegionCacheHistogram *prometheus.HistogramVec
TiKVLocalLatchWaitTimeHistogram prometheus.Histogram
TiKVStatusDuration *prometheus.HistogramVec
TiKVStatusCounter *prometheus.CounterVec
TiKVBatchWaitDuration prometheus.Histogram
TiKVBatchSendLatency prometheus.Histogram
TiKVBatchWaitOverLoad prometheus.Counter
TiKVBatchPendingRequests *prometheus.HistogramVec
TiKVBatchRequests *prometheus.HistogramVec
TiKVBatchClientUnavailable prometheus.Histogram
TiKVBatchClientWaitEstablish prometheus.Histogram
TiKVBatchClientRecycle prometheus.Histogram
TiKVBatchRecvLatency *prometheus.HistogramVec
TiKVRangeTaskStats *prometheus.GaugeVec
TiKVRangeTaskPushDuration *prometheus.HistogramVec
TiKVTokenWaitDuration prometheus.Histogram
TiKVTxnHeartBeatHistogram *prometheus.HistogramVec
TiKVTTLManagerHistogram prometheus.Histogram
TiKVPessimisticLockKeysDuration prometheus.Histogram
TiKVTTLLifeTimeReachCounter prometheus.Counter
TiKVNoAvailableConnectionCounter prometheus.Counter
TiKVTwoPCTxnCounter *prometheus.CounterVec
TiKVAsyncCommitTxnCounter *prometheus.CounterVec
TiKVOnePCTxnCounter *prometheus.CounterVec
TiKVStoreLimitErrorCounter *prometheus.CounterVec
TiKVGRPCConnTransientFailureCounter *prometheus.CounterVec
TiKVPanicCounter *prometheus.CounterVec
TiKVForwardRequestCounter *prometheus.CounterVec
TiKVTSFutureWaitDuration prometheus.Histogram
TiKVSafeTSUpdateCounter *prometheus.CounterVec
TiKVMinSafeTSGapSeconds *prometheus.GaugeVec
TiKVReplicaSelectorFailureCounter *prometheus.CounterVec
TiKVRequestRetryTimesHistogram prometheus.Histogram
TiKVTxnCommitBackoffSeconds prometheus.Histogram
TiKVTxnCommitBackoffCount prometheus.Histogram
TiKVSmallReadDuration prometheus.Histogram
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVGrpcConnectionState *prometheus.GaugeVec
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
TiKVPipelinedFlushLenHistogram prometheus.Histogram
TiKVPipelinedFlushSizeHistogram prometheus.Histogram
TiKVPipelinedFlushDuration prometheus.Histogram
TiKVTxnCmdHistogram *prometheus.HistogramVec
TiKVBackoffHistogram *prometheus.HistogramVec
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
TiKVRPCErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
TiKVLoadSafepointCounter *prometheus.CounterVec
TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec
TiKVRegionCacheCounter *prometheus.CounterVec
TiKVLoadRegionCounter *prometheus.CounterVec
TiKVLoadRegionCacheHistogram *prometheus.HistogramVec
TiKVLocalLatchWaitTimeHistogram prometheus.Histogram
TiKVStatusDuration *prometheus.HistogramVec
TiKVStatusCounter *prometheus.CounterVec
TiKVBatchWaitDuration prometheus.Histogram
TiKVBatchSendLatency prometheus.Histogram
TiKVBatchWaitOverLoad prometheus.Counter
TiKVBatchPendingRequests *prometheus.HistogramVec
TiKVBatchRequests *prometheus.HistogramVec
TiKVBatchClientUnavailable prometheus.Histogram
TiKVBatchClientWaitEstablish prometheus.Histogram
TiKVBatchClientRecycle prometheus.Histogram
TiKVBatchRecvLatency *prometheus.HistogramVec
TiKVRangeTaskStats *prometheus.GaugeVec
TiKVRangeTaskPushDuration *prometheus.HistogramVec
TiKVTokenWaitDuration prometheus.Histogram
TiKVTxnHeartBeatHistogram *prometheus.HistogramVec
TiKVTTLManagerHistogram prometheus.Histogram
TiKVPessimisticLockKeysDuration prometheus.Histogram
TiKVTTLLifeTimeReachCounter prometheus.Counter
TiKVNoAvailableConnectionCounter prometheus.Counter
TiKVTwoPCTxnCounter *prometheus.CounterVec
TiKVAsyncCommitTxnCounter *prometheus.CounterVec
TiKVOnePCTxnCounter *prometheus.CounterVec
TiKVStoreLimitErrorCounter *prometheus.CounterVec
TiKVGRPCConnTransientFailureCounter *prometheus.CounterVec
TiKVPanicCounter *prometheus.CounterVec
TiKVForwardRequestCounter *prometheus.CounterVec
TiKVTSFutureWaitDuration prometheus.Histogram
TiKVSafeTSUpdateCounter *prometheus.CounterVec
TiKVMinSafeTSGapSeconds *prometheus.GaugeVec
TiKVReplicaSelectorFailureCounter *prometheus.CounterVec
TiKVRequestRetryTimesHistogram prometheus.Histogram
TiKVTxnCommitBackoffSeconds prometheus.Histogram
TiKVTxnCommitBackoffCount prometheus.Histogram
TiKVSmallReadDuration prometheus.Histogram
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
TiKVGrpcConnectionState *prometheus.GaugeVec
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
TiKVStaleReadCounter *prometheus.CounterVec
TiKVStaleReadReqCounter *prometheus.CounterVec
TiKVStaleReadBytes *prometheus.CounterVec
TiKVPipelinedFlushLenHistogram prometheus.Histogram
TiKVPipelinedFlushSizeHistogram prometheus.Histogram
TiKVPipelinedFlushDuration prometheus.Histogram
TiKVValidateReadTSFromPDCount prometheus.Counter
TiKVLowResolutionTSOUpdateIntervalSecondsGauge prometheus.Gauge
)

// Label constants.
Expand Down Expand Up @@ -786,6 +788,22 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 18h
})

TiKVValidateReadTSFromPDCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "validate_read_ts_from_pd_count",
Help: "Counter of validating read ts by getting a timestamp from PD",
})

TiKVLowResolutionTSOUpdateIntervalSecondsGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "low_resolution_tso_update_interval_seconds",
Help: "The actual working update interval for the low resolution TSO. As there are adaptive mechanism internally, this value may differ from the config.",
})

initShortcuts()
}

Expand Down Expand Up @@ -875,6 +893,8 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVPipelinedFlushLenHistogram)
prometheus.MustRegister(TiKVPipelinedFlushSizeHistogram)
prometheus.MustRegister(TiKVPipelinedFlushDuration)
prometheus.MustRegister(TiKVValidateReadTSFromPDCount)
prometheus.MustRegister(TiKVLowResolutionTSOUpdateIntervalSecondsGauge)
}

// readCounter reads the value of a prometheus.Counter.
Expand Down
13 changes: 13 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type Oracle interface {
GetLowResolutionTimestamp(ctx context.Context, opt *Option) (uint64, error)
GetLowResolutionTimestampAsync(ctx context.Context, opt *Option) Future
SetLowResolutionTimestampUpdateInterval(time.Duration) error
// GetStaleTimestamp generates a timestamp based on the recently fetched timestamp and the elapsed time since
// when that timestamp was fetched. The result is expected to be about `prevSecond` seconds before the current
// time.
// WARNING: This method does not guarantee whether the generated timestamp is legal for accessing the data.
// Neither is it safe to use it for verifying the legality of another calculated timestamp.
// Be sure to validate the timestamp before using it to access the data.
GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error)
IsExpired(lockTimestamp, TTL uint64, opt *Option) bool
UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64
Expand All @@ -61,6 +67,13 @@ type Oracle interface {

// GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups.
GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error)

// ValidateSnapshotReadTS verifies whether it can be guaranteed that the given readTS doesn't exceed the maximum ts
// that has been allocated by the oracle, so that it's safe to use this ts to perform snapshot read, stale read,
// etc.
// Note that this method only checks the ts from the oracle's perspective. It doesn't check whether the snapshot
// has been GCed.
ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *Option) error
}

// Future is a future which promises to return a timestamp.
Expand Down
12 changes: 12 additions & 0 deletions oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -148,3 +149,14 @@ func (l *localOracle) SetExternalTimestamp(ctx context.Context, newTimestamp uin
func (l *localOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return l.getExternalTimestamp(ctx)
}

func (l *localOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error {
currentTS, err := l.GetTimestamp(ctx, opt)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if currentTS < readTS {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
}
11 changes: 11 additions & 0 deletions oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func (o *MockOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) erro
return nil
}

func (o *MockOracle) ValidateSnapshotReadTS(ctx context.Context, readTS uint64, opt *oracle.Option) error {
currentTS, err := o.GetTimestamp(ctx, opt)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if currentTS < readTS {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
}

// IsExpired implements oracle.Oracle interface.
func (o *MockOracle) IsExpired(lockTimestamp, TTL uint64, _ *oracle.Option) bool {
o.RLock()
Expand Down
Loading

0 comments on commit ca20e3a

Please sign in to comment.