diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index b39eb5a43a..c242da7ce3 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -264,8 +264,9 @@ type replicaSelector struct { // selectorState is the interface of states of the replicaSelector. // Here is the main state transition diagram: // -// exceeding maxReplicaAttempt -// +-------------------+ || RPC failure && unreachable && no forwarding +// exceeding maxReplicaAttempt +// +-------------------+ || RPC failure && unreachable && no forwarding +// // +-------->+ accessKnownLeader +----------------+ // | +------+------------+ | // | | | @@ -282,7 +283,8 @@ type replicaSelector struct { // | leader becomes v +---+---+ // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ -// +-----------+ +// +// +-----------+ type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) @@ -520,6 +522,7 @@ type accessFollower struct { option storeSelectorOp leaderIdx AccessIndex lastIdx AccessIndex + learnerOnly bool } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -583,7 +586,7 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool // The request can only be sent to the leader. ((state.option.leaderOnly && idx == state.leaderIdx) || // Choose a replica with matched labels. - (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner))) } type invalidStore struct { @@ -641,6 +644,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik option: option, leaderIdx: regionStore.workTiKVIdx, lastIdx: -1, + learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, } } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index a9f495bf5f..ab4f7a2962 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "fmt" "sync/atomic" "testing" "time" @@ -275,6 +276,69 @@ func refreshEpochs(regionStore *regionStore) { } } +func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) { + s.Equal(rpcCtx.Store, target.store) + s.Equal(rpcCtx.Peer, target.peer) + s.Equal(rpcCtx.Addr, target.store.addr) + s.Equal(rpcCtx.AccessMode, tiKVOnly) + if proxy != nil { + s.Equal(rpcCtx.ProxyStore, proxy.store) + s.Equal(rpcCtx.ProxyAddr, proxy.store.addr) + } +} + +func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + region := s.cache.GetCachedRegionWithRLock(regionLoc.Region) + regionStore := region.getStore() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{}) + + // Create a fake region and change its leader to the last peer. + regionStore = regionStore.clone() + regionStore.workTiKVIdx = AccessIndex(len(regionStore.stores) - 1) + sidx, _ := regionStore.accessStore(tiKVOnly, regionStore.workTiKVIdx) + regionStore.stores[sidx].epoch++ + + // Add a TiKV learner peer to the region. + storeID := s.cluster.AllocID() + s.cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID)) + tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner} + tikvLearnerAccessIdx := len(regionStore.stores) + regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx) + regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId}) + regionStore.storeEpochs = append(regionStore.storeEpochs, 0) + + region = &Region{ + meta: region.GetMeta(), + } + region.lastAccess = time.Now().Unix() + region.meta.Peers = append(region.meta.Peers, tikvLearner) + atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) + + cache := NewRegionCache(s.cache.pdClient) + defer cache.Close() + cache.insertRegionToCache(region) + + // Test accessFollower state with kv.ReplicaReadLearner request type. + region.lastAccess = time.Now().Unix() + refreshEpochs(regionStore) + req.ReplicaReadType = kv.ReplicaReadLearner + replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) + s.NotNil(replicaSelector) + s.Nil(err) + + accessLearner, _ := replicaSelector.state.(*accessFollower) + // Invalidate the region if the leader is not in the region. + region.lastAccess = time.Now().Unix() + rpcCtx, err := replicaSelector.next(s.bo) + s.Nil(err) + // Should swith to the next follower. + s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) +} + func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) @@ -290,16 +354,16 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { regionStore.stores[sidx].epoch++ regionStore.storeEpochs[sidx]++ // Add a TiFlash peer to the region. - peer := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()} + tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()} regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores)) - regionStore.stores = append(regionStore.stores, &Store{storeID: peer.StoreId, storeType: tikvrpc.TiFlash}) + regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash}) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ meta: region.GetMeta(), } region.lastAccess = time.Now().Unix() - region.meta.Peers = append(region.meta.Peers, peer) + region.meta.Peers = append(region.meta.Peers, tiflash) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) cache := NewRegionCache(s.cache.pdClient) @@ -329,24 +393,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { } } - assertRPCCtxEqual := func(rpcCtx *RPCContext, target *replica, proxy *replica) { - s.Equal(rpcCtx.Store, target.store) - s.Equal(rpcCtx.Peer, target.peer) - s.Equal(rpcCtx.Addr, target.store.addr) - s.Equal(rpcCtx.AccessMode, tiKVOnly) - if proxy != nil { - s.Equal(rpcCtx.ProxyStore, proxy.store) - s.Equal(rpcCtx.ProxyAddr, proxy.store.addr) - } - } - // Test accessKnownLeader state s.IsType(&accessKnownLeader{}, replicaSelector.state) // Try the leader for maxReplicaAttempt times for i := 1; i <= maxReplicaAttempt; i++ { rpcCtx, err := replicaSelector.next(s.bo) s.Nil(err) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) s.IsType(&accessKnownLeader{}, replicaSelector.state) s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i) } @@ -360,7 +413,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Equal(regionStore.workTiKVIdx, state.leaderIdx) s.NotEqual(state.lastIdx, regionStore.workTiKVIdx) s.Equal(replicaSelector.targetIdx, state.lastIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) s.Equal(replicaSelector.targetReplica().attempts, 1) } // In tryFollower state, if all replicas are tried, nil RPCContext should be returned @@ -387,7 +440,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) s.IsType(&tryFollower{}, replicaSelector.state) s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), nil) s.Equal(replicaSelector.targetReplica().attempts, 1) // If the NotLeader errors provides an unreachable leader, do not switch to it. replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{ @@ -453,7 +506,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Equal(regionStore.workTiKVIdx, state.leaderIdx) s.Equal(AccessIndex(2), replicaSelector.targetIdx) s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) s.Equal(replicaSelector.targetReplica().attempts, 1) s.Equal(replicaSelector.proxyReplica().attempts, 1) @@ -487,14 +540,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) _, err = replicaSelector.next(s.bo) s.Nil(err) - assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) // Switch to tryNewProxy if the current proxy is not available replicaSelector.onSendFailure(s.bo, nil) s.IsType(&tryNewProxy{}, replicaSelector.state) rpcCtx, err = replicaSelector.next(s.bo) s.Nil(err) - assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) s.Equal(AccessIndex(2), replicaSelector.targetIdx) s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx) @@ -509,7 +562,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.NotNil(replicaSelector) state3, ok := replicaSelector.state.(*accessFollower) s.True(ok) - s.False(state3.tryLeader) s.Equal(regionStore.workTiKVIdx, state3.leaderIdx) s.Equal(state3.lastIdx, AccessIndex(-1)) @@ -522,7 +574,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Shouldn't access the leader if followers aren't exhausted. s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) s.Equal(replicaSelector.targetIdx, state3.lastIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) lastIdx = state3.lastIdx } // Fallback to the leader for 1 time @@ -530,7 +582,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) s.Equal(regionStore.workTiKVIdx, state3.lastIdx) s.Equal(replicaSelector.targetIdx, state3.lastIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // All replicas are exhausted. rpcCtx, err = replicaSelector.next(s.bo) s.Nil(rpcCtx) @@ -553,7 +605,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) s.Equal(regionStore.workTiKVIdx, state3.lastIdx) s.Equal(replicaSelector.targetIdx, state3.lastIdx) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // Test accessFollower state filtering label-not-match stores. region.lastAccess = time.Now().Unix() @@ -574,7 +626,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) rpcCtx, err = replicaSelector.next(s.bo) s.Nil(err) - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[accessIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[accessIdx], nil) } // Test accessFollower state with leaderOnly option @@ -587,7 +639,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { rpcCtx, err = replicaSelector.next(s.bo) s.Nil(err) // Should always access the leader. - assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) } // Test accessFollower state with kv.ReplicaReadMixed request type. diff --git a/kv/store_vars.go b/kv/store_vars.go index d0abff21db..1f10d3b45f 100644 --- a/kv/store_vars.go +++ b/kv/store_vars.go @@ -62,6 +62,8 @@ const ( ReplicaReadFollower // ReplicaReadMixed stands for 'read from leader and follower and learner'. ReplicaReadMixed + // ReplicaReadLearner stands for 'read from learner'. + ReplicaReadLearner ) // IsFollowerRead checks if follower is going to be used to read data.