From 9125b83f264950d3843a0d0f788cab81ca2ddbbc Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 17 Sep 2018 12:02:24 +0800 Subject: [PATCH] server: broadcast leader changed --- etcdserver/server.go | 20 ++++++++++++++------ etcdserver/v3_server.go | 4 ++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index fc070ce51a76..90473e666ce9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -213,7 +213,8 @@ type EtcdServer struct { // done is closed when all goroutines from start() complete. done chan struct{} // leaderChanged is used to notify the linearizable read loop to drop the old read requests. - leaderChanged chan struct{} + leaderChanged chan struct{} + leaderChangedMu sync.RWMutex errorc chan error id types.ID @@ -754,7 +755,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() - s.leaderChanged = make(chan struct{}, 1) + s.leaderChanged = make(chan struct{}) if s.ClusterVersion() != nil { if lg != nil { lg.Info( @@ -942,10 +943,11 @@ func (s *EtcdServer) run() { } } if newLeader { - select { - case s.leaderChanged <- struct{}{}: - default: - } + s.leaderChangedMu.Lock() + lc := s.leaderChanged + s.leaderChanged = make(chan struct{}) + s.leaderChangedMu.Unlock() + close(lc) } // TODO: remove the nil checking // current test utility does not provide the stats @@ -1696,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 { return atomic.LoadUint64(&s.lead) } +func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { + s.leaderChangedMu.RLock() + defer s.leaderChangedMu.RUnlock() + return s.leaderChanged +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 5bcb7fc32d07..7d3366c13011 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -636,7 +636,7 @@ func (s *EtcdServer) linearizableReadLoop() { binary.BigEndian.PutUint64(ctxToSend, id1) select { - case <-s.leaderChanged: + case <-s.leaderChangedNotify(): continue case <-s.readwaitc: case <-s.stopping: @@ -694,7 +694,7 @@ func (s *EtcdServer) linearizableReadLoop() { } slowReadIndex.Inc() } - case <-s.leaderChanged: + case <-s.leaderChangedNotify(): timeout = true readIndexFailed.Inc() // return a retryable error.