Skip to content

Commit

Permalink
Merge pull request #10094 from nolouch/drop-read
Browse files Browse the repository at this point in the history
server: drop read request if the leader is changed
  • Loading branch information
gyuho authored Sep 24, 2018
2 parents 12cfc5f + 6ea5419 commit fb67483
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 12 deletions.
10 changes: 5 additions & 5 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ func TestKVGetErrConnClosed(t *testing.T) {
cli := clus.Client(0)

donec := make(chan struct{})
if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)

go func() {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
Expand All @@ -447,11 +452,6 @@ func TestKVGetErrConnClosed(t *testing.T) {
}
}()

if err := cli.Close(); err != nil {
t.Fatal(err)
}
clus.TakeClient(0)

select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("kv.Get took too long")
Expand Down
7 changes: 3 additions & 4 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {

cli := clus.Client(0)
clus.TakeClient(0)
if err := cli.Close(); err != nil {
t.Fatal(err)
}

donec := make(chan struct{})
go func() {
Expand All @@ -303,10 +306,6 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
}
}()

if err := cli.Close(); err != nil {
t.Fatal(err)
}

select {
case <-time.After(integration.RequestWaitTimeout):
t.Fatal("le.Grant took too long")
Expand Down
1 change: 1 addition & 0 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,7 @@ func TestLeasingReconnectTxn(t *testing.T) {
clus.Members[0].DropConnections()
time.Sleep(time.Millisecond)
}
time.Sleep(10 * time.Millisecond)
}()

_, lerr := lkv.Txn(context.TODO()).
Expand Down
52 changes: 52 additions & 0 deletions clientv3/integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -265,3 +266,54 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
t.Fatal("took too long to detect leader lost")
}
}

func TestDropReadUnderNetworkPartition(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
SkipCreatingClient: true,
})
defer clus.Terminate(t)
leaderIndex := clus.WaitLeader(t)
// get a follower endpoint
eps := []string{clus.Members[(leaderIndex+1)%3].GRPCAddr()}
ccfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[0] to be pinned
mustWaitPinReady(t, cli)

// add other endpoints for later endpoint switch
cli.SetEndpoints(eps...)
time.Sleep(time.Second * 2)
conn, err := cli.Dial(clus.Members[(leaderIndex+1)%3].GRPCAddr())
if err != nil {
t.Fatal(err)
}
defer conn.Close()

clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3])
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil)
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
_, err = kvc.Get(ctx, "a")
cancel()
if err != rpctypes.ErrLeaderChanged {
t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
}

ctx, cancel = context.WithTimeout(context.TODO(), 10*time.Second)
_, err = kvc.Get(ctx, "a")
cancel()
if err != nil {
t.Fatalf("expected nil, got %v", err)
}
}
3 changes: 3 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (

ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err()
ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
Expand Down Expand Up @@ -111,6 +112,7 @@ var (

ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
ErrorDesc(ErrGRPCLeaderChanged): ErrGRPCLeaderChanged,
ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
Expand Down Expand Up @@ -163,6 +165,7 @@ var (

ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotLeader = Error(ErrGRPCNotLeader)
ErrLeaderChanged = Error(ErrGRPCLeaderChanged)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{

etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
Expand Down
1 change: 1 addition & 0 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
Expand Down
7 changes: 7 additions & 0 deletions etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ var (
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
leaseExpired = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "server",
Expand Down Expand Up @@ -132,6 +138,7 @@ func init() {
prometheus.MustRegister(proposalsPending)
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
Expand Down
18 changes: 17 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type EtcdServer struct {
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
leaderChanged chan struct{}
leaderChangedMu sync.RWMutex

errorc chan error
id types.ID
Expand Down Expand Up @@ -752,6 +755,7 @@ func (s *EtcdServer) start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = make(chan struct{})
if s.ClusterVersion() != nil {
if lg != nil {
lg.Info(
Expand Down Expand Up @@ -938,7 +942,13 @@ func (s *EtcdServer) run() {
s.compactor.Resume()
}
}

if newLeader {
s.leaderChangedMu.Lock()
lc := s.leaderChanged
s.leaderChanged = make(chan struct{})
close(lc)
s.leaderChangedMu.Unlock()
}
// TODO: remove the nil checking
// current test utility does not provide the stats
if s.stats != nil {
Expand Down Expand Up @@ -1688,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 {
return atomic.LoadUint64(&s.lead)
}

func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
s.leaderChangedMu.RLock()
defer s.leaderChangedMu.RUnlock()
return s.leaderChanged
}

// RaftStatusGetter represents etcd server and Raft progress.
type RaftStatusGetter interface {
ID() types.ID
Expand Down
10 changes: 9 additions & 1 deletion etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,10 @@ func (s *EtcdServer) linearizableReadLoop() {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)

leaderChangedNotifier := s.leaderChangedNotify()
select {
case <-leaderChangedNotifier:
continue
case <-s.readwaitc:
case <-s.stopping:
return
Expand All @@ -660,6 +662,7 @@ func (s *EtcdServer) linearizableReadLoop() {
} else {
plog.Errorf("failed to get read index from raft: %v", err)
}
readIndexFailed.Inc()
nr.notify(err)
continue
}
Expand Down Expand Up @@ -691,6 +694,11 @@ func (s *EtcdServer) linearizableReadLoop() {
}
slowReadIndex.Inc()
}
case <-leaderChangedNotifier:
timeout = true
readIndexFailed.Inc()
// return a retryable error.
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
if lg != nil {
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
Expand Down
3 changes: 2 additions & 1 deletion integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func TestV3LeaseSwitch(t *testing.T) {
// election timeout after it loses its quorum. And the new leader extends the TTL of
// the lease to at least TTL + election timeout.
func TestV3LeaseFailover(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand Down Expand Up @@ -568,7 +570,6 @@ func TestV3LeaseFailover(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer lac.CloseSend()

// send keep alive to old leader until the old leader starts
// to drop lease request.
Expand Down

0 comments on commit fb67483

Please sign in to comment.