Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support learner replica read #643

Merged
merged 2 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ type replicaSelector struct {
// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
//
// +-------->+ accessKnownLeader +----------------+
// | +------+------------+ |
// | | |
Expand All @@ -282,7 +283,8 @@ type replicaSelector struct {
// | leader becomes v +---+---+
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
// +-----------+
//
// +-----------+
type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -520,6 +522,7 @@ type accessFollower struct {
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
learnerOnly bool
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand Down Expand Up @@ -589,7 +592,7 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels)))
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)))
}

type invalidStore struct {
Expand Down Expand Up @@ -647,6 +650,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
}
}

Expand Down
104 changes: 78 additions & 26 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package locate

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -276,6 +277,69 @@ func refreshEpochs(regionStore *regionStore) {
}
}

func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
s.Equal(rpcCtx.Store, target.store)
s.Equal(rpcCtx.Peer, target.peer)
s.Equal(rpcCtx.Addr, target.store.addr)
s.Equal(rpcCtx.AccessMode, tiKVOnly)
if proxy != nil {
s.Equal(rpcCtx.ProxyStore, proxy.store)
s.Equal(rpcCtx.ProxyAddr, proxy.store.addr)
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
region := s.cache.GetCachedRegionWithRLock(regionLoc.Region)
regionStore := region.getStore()
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{})

// Create a fake region and change its leader to the last peer.
regionStore = regionStore.clone()
regionStore.workTiKVIdx = AccessIndex(len(regionStore.stores) - 1)
sidx, _ := regionStore.accessStore(tiKVOnly, regionStore.workTiKVIdx)
regionStore.stores[sidx].epoch++

// Add a TiKV learner peer to the region.
storeID := s.cluster.AllocID()
s.cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner}
tikvLearnerAccessIdx := len(regionStore.stores)
regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx)
regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId})
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
meta: region.GetMeta(),
}
region.lastAccess = time.Now().Unix()
region.meta.Peers = append(region.meta.Peers, tikvLearner)
atomic.StorePointer(&region.store, unsafe.Pointer(regionStore))

cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.insertRegionToCache(region)

// Test accessFollower state with kv.ReplicaReadLearner request type.
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
req.ReplicaReadType = kv.ReplicaReadLearner
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)

accessLearner, _ := replicaSelector.state.(*accessFollower)
// Invalidate the region if the leader is not in the region.
region.lastAccess = time.Now().Unix()
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
}

func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
Expand All @@ -291,16 +355,16 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
regionStore.stores[sidx].epoch++
regionStore.storeEpochs[sidx]++
// Add a TiFlash peer to the region.
peer := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}
tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}
regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores))
regionStore.stores = append(regionStore.stores, &Store{storeID: peer.StoreId, storeType: tikvrpc.TiFlash})
regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash})
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
meta: region.GetMeta(),
}
region.lastAccess = time.Now().Unix()
region.meta.Peers = append(region.meta.Peers, peer)
region.meta.Peers = append(region.meta.Peers, tiflash)
atomic.StorePointer(&region.store, unsafe.Pointer(regionStore))

cache := NewRegionCache(s.cache.pdClient)
Expand Down Expand Up @@ -330,24 +394,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
}
}

assertRPCCtxEqual := func(rpcCtx *RPCContext, target *replica, proxy *replica) {
s.Equal(rpcCtx.Store, target.store)
s.Equal(rpcCtx.Peer, target.peer)
s.Equal(rpcCtx.Addr, target.store.addr)
s.Equal(rpcCtx.AccessMode, tiKVOnly)
if proxy != nil {
s.Equal(rpcCtx.ProxyStore, proxy.store)
s.Equal(rpcCtx.ProxyAddr, proxy.store.addr)
}
}

// Test accessKnownLeader state
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Try the leader for maxReplicaAttempt times
for i := 1; i <= maxReplicaAttempt; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i)
}
Expand All @@ -361,7 +414,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
s.NotEqual(state.lastIdx, regionStore.workTiKVIdx)
s.Equal(replicaSelector.targetIdx, state.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
s.Equal(replicaSelector.targetReplica().attempts, 1)
}
// In tryFollower state, if all replicas are tried, nil RPCContext should be returned
Expand All @@ -388,7 +441,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
s.IsType(&tryFollower{}, replicaSelector.state)
s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), nil)
s.Equal(replicaSelector.targetReplica().attempts, 1)
// If the NotLeader errors provides an unreachable leader, do not switch to it.
replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{
Expand Down Expand Up @@ -454,7 +507,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Equal(replicaSelector.targetReplica().attempts, 1)
s.Equal(replicaSelector.proxyReplica().attempts, 1)

Expand Down Expand Up @@ -488,14 +541,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
_, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())

// Switch to tryNewProxy if the current proxy is not available
replicaSelector.onSendFailure(s.bo, nil)
s.IsType(&tryNewProxy{}, replicaSelector.state)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx)
Expand All @@ -510,7 +563,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotNil(replicaSelector)
state3, ok := replicaSelector.state.(*accessFollower)
s.True(ok)
s.False(state3.tryLeader)
s.Equal(regionStore.workTiKVIdx, state3.leaderIdx)
s.Equal(state3.lastIdx, AccessIndex(-1))

Expand All @@ -523,15 +575,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
lastIdx = state3.lastIdx
}
// Fallback to the leader for 1 time
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// All replicas are exhausted.
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(rpcCtx)
Expand All @@ -554,7 +606,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)

// Test accessFollower state filtering label-not-match stores.
region.lastAccess = time.Now().Unix()
Expand All @@ -575,7 +627,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[accessIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[accessIdx], nil)
}

// Test accessFollower state with leaderOnly option
Expand All @@ -588,7 +640,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
// Should always access the leader.
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
}

// Test accessFollower state with kv.ReplicaReadMixed request type.
Expand Down
2 changes: 2 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
ReplicaReadFollower
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if follower is going to be used to read data.
Expand Down