Skip to content

Commit

Permalink
Add RURuntimeStats clean-up
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Mar 14, 2023
1 parent ab813e3 commit a189b02
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
49 changes: 44 additions & 5 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ import (
"google.golang.org/grpc/keepalive"
)

// DCLabelKey indicates the key of label which represents the dc for Store.
const DCLabelKey = "zone"
const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
safeTSUpdateInterval = time.Second * 2
// Since the default max transaction TTL is 1 hour, we can use this to
// clean up the RU runtime stats as well.
ruRuntimeStatsCleanThreshold = time.Hour
ruRuntimeStatsCleanInterval = ruRuntimeStatsCleanThreshold / 2
)

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
Expand Down Expand Up @@ -216,9 +223,10 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
store.lockResolver = txnlock.NewLockResolver(store)
loadOption(store, opt...)

store.wg.Add(2)
store.wg.Add(3)
go store.runSafePointChecker()
go store.safeTSUpdater()
go store.ruRuntimeStatsMapCleaner()

return store, nil
}
Expand Down Expand Up @@ -534,14 +542,14 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {

func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(time.Second * 2)
t := time.NewTicker(safeTSUpdateInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
return
case <-t.C:
s.updateSafeTS(ctx)
Expand Down Expand Up @@ -603,6 +611,37 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
defer cancel()

cleanThreshold := ruRuntimeStatsCleanThreshold
if _, e := util.EvalFailpoint("mockFastRURuntimeStatsMapClean"); e == nil {
t.Reset(time.Millisecond * 100)
cleanThreshold = time.Millisecond
}

for {
select {
case <-ctx.Done():
return
case <-t.C:
now := time.Now()
s.ruRuntimeStatsMap.Range(func(key, _ interface{}) bool {
startTSTime := oracle.GetTimeFromTS(key.(uint64))
if now.Sub(startTSTime) >= cleanThreshold {
s.ruRuntimeStatsMap.Delete(key)
}
return true
})
}
}
}

// CreateRURuntimeStats creates a RURuntimeStats for the startTS and returns it.
func (s *KVStore) CreateRURuntimeStats(startTS uint64) *util.RURuntimeStats {
rrs, _ := s.ruRuntimeStatsMap.LoadOrStore(startTS, util.NewRURuntimeStats())
Expand Down
26 changes: 26 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -122,3 +123,28 @@ func (s *testKVSuite) TestMinSafeTs() {
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *testKVSuite) TestRURuntimeStatsCleanUp() {
s.Nil(failpoint.Enable("tikvclient/mockFastRURuntimeStatsMapClean", `return()`))
defer func() {
s.Nil(failpoint.Disable("tikvclient/mockFastRURuntimeStatsMapClean"))
}()

mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)

// Create a ruRuntimeStats first.
startTS := oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
ruRuntimeStats := s.store.CreateRURuntimeStats(startTS)
s.NotNil(ruRuntimeStats)
// Wait for the cleanup goroutine to clean up the ruRuntimeStatsMap.
time.Sleep(time.Millisecond * 150)
// The ruRuntimeStatsMap should be cleaned up.
s.store.ruRuntimeStatsMap.Range(func(key, value interface{}) bool {
s.Fail("ruRuntimeStatsMap should be cleaned up")
return true
})
}

0 comments on commit a189b02

Please sign in to comment.