Skip to content

Commit

Permalink
tikv: invalidate store's regions when store be removed in kv (#11567) (
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and ngaut committed Aug 2, 2019
1 parent 540dd06 commit 234a610
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
7 changes: 7 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
tikvRegionCacheCounterWithGetRegionError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_region", "err")
tikvRegionCacheCounterWithGetStoreOK = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "ok")
tikvRegionCacheCounterWithGetStoreError = metrics.TiKVRegionCacheCounter.WithLabelValues("get_store", "err")
tikvRegionCacheCounterWithInvalidateStoreRegionsOK = metrics.TiKVRegionCacheCounter.WithLabelValues("invalidate_store_regions", "ok")
)

const (
Expand Down Expand Up @@ -885,6 +886,7 @@ func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
epoch := rs.storeFails[rs.workStoreIdx]
if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) {
logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr))
tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
}

Expand Down Expand Up @@ -1017,6 +1019,11 @@ func (s *Store) reResolve(c *RegionCache) {
return
}
if store == nil {
// store has be removed in PD, we should invalidate all regions using those store.
logutil.Logger(context.Background()).Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
atomic.AddUint32(&s.fail, 1)
tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
return
}

Expand Down
29 changes: 29 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,35 @@ func (s *testRegionCacheSuite) TestUpdateStoreAddr(c *C) {
c.Assert(getVal, BytesEquals, testValue)
}

func (s *testRegionCacheSuite) TestReplaceAddrWithNewStore(c *C) {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()

client := &RawKVClient{
clusterID: 0,
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(testKey, testValue)
c.Assert(err, IsNil)

// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.store1)

getVal, err := client.Get(testKey)

c.Assert(err, IsNil)
c.Assert(getVal, BytesEquals, testValue)
}

func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) {
// ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
Expand Down

0 comments on commit 234a610

Please sign in to comment.