Skip to content

Commit

Permalink
Revert "Handle more detailed statistics from TiKV (tikv#536)"
Browse files Browse the repository at this point in the history
This reverts commit 57c12f7.
  • Loading branch information
lcwangchao committed Aug 8, 2022
1 parent e10841f commit 3873bc3
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 433 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
3 changes: 0 additions & 3 deletions integration_tests/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
ProcessedVersions: 10,
ProcessedVersionsSize: 10,
TotalVersions: 15,
GetSnapshotNanos: 500,
RocksdbBlockReadCount: 20,
RocksdbBlockReadByte: 15,
RocksdbDeleteSkippedCount: 5,
Expand All @@ -323,7 +322,6 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
"scan_detail: {total_process_keys: 10, " +
"total_process_keys_size: 10, " +
"total_keys: 15, " +
"get_snapshot_time: 500ns, " +
"rocksdb: {delete_skipped_count: 5, " +
"key_skipped_count: 1, " +
"block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
Expand All @@ -334,7 +332,6 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
"scan_detail: {total_process_keys: 20, " +
"total_process_keys_size: 20, " +
"total_keys: 30, " +
"get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, " +
"key_skipped_count: 2, " +
"block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
Expand Down
109 changes: 13 additions & 96 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,8 @@ func (c *RPCClient) closeConns() {
}

var (
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
rpcNetLatencyHistCache sync.Map
sendReqHistCache sync.Map
sendReqCounterCache sync.Map
)

type sendReqHistCacheKey struct {
Expand All @@ -396,61 +395,39 @@ type sendReqCounterCacheValue struct {
timeCounter prometheus.Counter
}

func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()

func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) {
histKey := sendReqHistCacheKey{
req.Type,
storeID,
req.Context.GetPeer().GetStoreId(),
staleRead,
}
counterKey := sendReqCounterCacheKey{
histKey,
req.GetRequestSource(),
}

reqType := req.Type.String()
var storeIDStr string

hist, ok := sendReqHistCache.Load(histKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
}
sendReqCounterCache.Store(counterKey, counter)
}

secs := time.Since(start).Seconds()
hist.(prometheus.Observer).Observe(secs)
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
sendReqHistCache.Store(storeID, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
Expand Down Expand Up @@ -481,13 +458,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
detail := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
}
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)

if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
si.addTo(spanRPC, start)
}
}
c.updateTiKVSendReqHistogram(req, start, staleRead)
}()

// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
Expand Down Expand Up @@ -741,57 +712,3 @@ func (si *spanInfo) String() string {
si.printTo(buf)
return buf.String()
}

func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {
details := resp.GetExecDetailsV2()
if details == nil {
return nil
}

td := details.TimeDetail
sd := details.ScanDetailV2
wd := details.WriteDetail

if td == nil {
return nil
}

spanRPC := spanInfo{name: "tikv.RPC", dur: td.TotalRpcWallTimeNs}
spanWait := spanInfo{name: "tikv.Wait", dur: td.WaitWallTimeMs * uint64(time.Millisecond)}
spanProcess := spanInfo{name: "tikv.Process", dur: td.ProcessWallTimeMs * uint64(time.Millisecond)}

if sd != nil {
spanWait.children = append(spanWait.children, spanInfo{name: "tikv.GetSnapshot", dur: sd.GetSnapshotNanos})
if wd == nil {
spanProcess.children = append(spanProcess.children, spanInfo{name: "tikv.RocksDBBlockRead", dur: sd.RocksdbBlockReadNanos})
}
}

spanRPC.children = append(spanRPC.children, spanWait, spanProcess)

if wd != nil {
spanAsyncWrite := spanInfo{
name: "tikv.AsyncWrite",
children: []spanInfo{
{name: "tikv.StoreBatchWait", dur: wd.StoreBatchWaitNanos},
{name: "tikv.ProposeSendWait", dur: wd.ProposeSendWaitNanos},
{name: "tikv.PersistLog", dur: wd.PersistLogNanos, async: true, children: []spanInfo{
{name: "tikv.RaftDBWriteWait", dur: wd.RaftDbWriteLeaderWaitNanos}, // MutexLock + WriteLeader
{name: "tikv.RaftDBWriteWAL", dur: wd.RaftDbSyncLogNanos},
{name: "tikv.RaftDBWriteMemtable", dur: wd.RaftDbWriteMemtableNanos},
}},
{name: "tikv.CommitLog", dur: wd.CommitLogNanos},
{name: "tikv.ApplyBatchWait", dur: wd.ApplyBatchWaitNanos},
{name: "tikv.ApplyLog", dur: wd.ApplyLogNanos, children: []spanInfo{
{name: "tikv.ApplyMutexLock", dur: wd.ApplyMutexLockNanos},
{name: "tikv.ApplyWriteLeaderWait", dur: wd.ApplyWriteLeaderWaitNanos},
{name: "tikv.ApplyWriteWAL", dur: wd.ApplyWriteWalNanos},
{name: "tikv.ApplyWriteMemtable", dur: wd.ApplyWriteMemtableNanos},
}},
},
}
spanRPC.children = append(spanRPC.children, spanAsyncWrite)
}

return &spanRPC
}
11 changes: 0 additions & 11 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (
TiKVSendReqHistogram *prometheus.HistogramVec
TiKVSendReqCounter *prometheus.CounterVec
TiKVSendReqTimeCounter *prometheus.CounterVec
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
Expand Down Expand Up @@ -162,15 +161,6 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})

TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Expand Down Expand Up @@ -609,7 +599,6 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVSendReqHistogram)
prometheus.MustRegister(TiKVSendReqCounter)
prometheus.MustRegister(TiKVSendReqTimeCounter)
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
prometheus.MustRegister(TiKVCoprocessorHistogram)
prometheus.MustRegister(TiKVLockResolverCounter)
prometheus.MustRegister(TiKVRegionErrorCounter)
Expand Down
16 changes: 0 additions & 16 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,22 +926,6 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
return err.GetRegionError(), nil
}

type getExecDetailsV2 interface {
GetExecDetailsV2() *kvrpcpb.ExecDetailsV2
}

// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response.
func (resp *Response) GetExecDetailsV2() *kvrpcpb.ExecDetailsV2 {
if resp == nil || resp.Resp == nil {
return nil
}
details, ok := resp.Resp.(getExecDetailsV2)
if !ok {
return nil
}
return details.GetExecDetailsV2()
}

// CallRPC launches a rpc call.
// ch is needed to implement timeout for coprocessor streaming, the stream object's
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.
Expand Down
5 changes: 1 addition & 4 deletions txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
for {
attempts++
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
if time.Since(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -141,8 +140,6 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
// we can clean undetermined error.
if batch.isPrimary && !c.isAsyncCommit() {
c.setUndeterminedErr(nil)
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
Expand Down
8 changes: 2 additions & 6 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
time.Sleep(300 * time.Millisecond)
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
}
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
startTime := time.Now()
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
reqDuration := time.Since(startTime)
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
Expand Down Expand Up @@ -185,8 +183,6 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)

if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
Expand Down
8 changes: 1 addition & 7 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
if attempts > 1 || action.retry {
req.IsRetryRequest = true
}
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
if time.Since(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
Expand Down Expand Up @@ -306,10 +305,6 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
// Clear the RPC Error since the request is evaluated successfully.
sender.SetRPCError(nil)

// Update CommitDetails
reqDuration := time.Since(reqBegin)
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2)

if batch.isPrimary {
// After writing the primary key, if the size of the transaction is larger than 32M,
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
Expand Down Expand Up @@ -363,7 +358,6 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
c.mu.Unlock()
}
}

return nil
}
var locks []*txnlock.Lock
Expand Down
Loading

0 comments on commit 3873bc3

Please sign in to comment.