Skip to content

Commit

Permalink
add ut for GetMinSafeTS
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <awd123456sss@gmail.com>
  • Loading branch information
hehechen committed Dec 16, 2022
1 parent 6683ffd commit 24c1cd9
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 16 deletions.
4 changes: 3 additions & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,16 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
addr: addr,
peerAddr: peerAddr,
}
}

Expand Down
37 changes: 23 additions & 14 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var _ cluster.Cluster = &Cluster{}

// Cluster simulates a TiKV cluster. It focuses on management and the change of
// meta data. A Cluster mainly includes following 3 kinds of meta data:
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
// 1. Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2. Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3. Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *Cluster) AddStore(storeID uint64, addr string, labels ...*metapb.StoreL
c.Lock()
defer c.Unlock()

c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// RemoveStore removes a Store from the cluster.
Expand All @@ -248,7 +248,15 @@ func (c *Cluster) MarkTombstone(storeID uint64) {
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// UpdateStorePeerAddr updates store peer address for cluster.
func (c *Cluster) UpdateStorePeerAddr(storeID uint64, peerAddr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
addr := c.stores[storeID].meta.Address
c.stores[storeID] = newStore(storeID, addr, peerAddr, labels...)
}

// GetRegion returns a Region's meta and leader ID.
Expand Down Expand Up @@ -691,12 +699,13 @@ type Store struct {
cancel bool // return context.Cancelled error when cancel is true.
}

func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store {
func newStore(storeID uint64, addr string, peerAddr string, labels ...*metapb.StoreLabel) *Store {
return &Store{
meta: &metapb.Store{
Id: storeID,
Address: addr,
Labels: labels,
Id: storeID,
Address: addr,
PeerAddress: peerAddr,
Labels: labels,
},
}
}
Expand Down
131 changes: 131 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/locate/region_cache_test.go
//

// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestKV(t *testing.T) {
suite.Run(t, new(testKVSuite))
}

type testKVSuite struct {
suite.Suite
store *KVStore
cluster *mocktikv.Cluster
tikvStoreId uint64 // store1 is TiKV
tiflashStoreId uint64 // store2 is TiFlash
}

func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
s.Require().Nil(err)

s.store = store
s.cluster = cluster

storeIDs, _, _, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
s.tikvStoreId = storeIDs[0]
s.tiflashStoreId = storeIDs[1]
s.cluster.UpdateStorePeerAddr(s.tiflashStoreId, s.storeAddr(s.tiflashStoreId), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tikvStoreId, s.storeAddr(s.tikvStoreId), s.storeAddr(s.tikvStoreId), tikvrpc.TiKV, 1, nil)
var labels []*metapb.StoreLabel
labels = append(labels, &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tiflashStoreId, s.storeAddr(s.tiflashStoreId), s.storeAddr(s.tiflashStoreId), tikvrpc.TiFlash, 1, labels)

}

func (s *testKVSuite) TearDownTest() {
s.Require().Nil(s.store.Close())
}

func (s *testKVSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}

type storeSafeTsMockClient struct {
Client
requestCount int
testSuite *testKVSuite
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
c.requestCount++
resp := &tikvrpc.Response{}
if addr == c.testSuite.storeAddr(c.testSuite.tiflashStoreId) {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 80}
} else {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 100}
}
return resp, nil
}

func (c *storeSafeTsMockClient) Close() error {
return c.Client.Close()
}

func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *testKVSuite) TestMinSafeTs() {
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)
// wait for updateSafeTS
time.Sleep(4 * time.Second)
s.Require().Equal(2, mockClient.requestCount)
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
s.regionCache.SetRegionCacheStore(id, storeType, state, labels)
s.regionCache.SetRegionCacheStore(id, "", "", storeType, state, labels)
}

// SetSafeTS is used to set safeTS for the store with `storeID`
Expand Down

0 comments on commit 24c1cd9

Please sign in to comment.