From 7f3b1ce68b33d0d9ebfe57f62f8bacea71f1690f Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 3 May 2019 00:43:13 +0800 Subject: [PATCH] address comments --- etcdserver/server.go | 2 +- lease/lease_queue.go | 7 +++--- lease/lease_queue_test.go | 50 +++++++++++++++++++++++++++---------- lease/lessor.go | 52 +++++++++++++++++++++++---------------- 4 files changed, 73 insertions(+), 38 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 5a97b8341ef6..cd3e5055d39c 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -524,7 +524,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval}) + srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout()}) srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() diff --git a/lease/lease_queue.go b/lease/lease_queue.go index 5ecb38b591dc..19023cd54660 100644 --- a/lease/lease_queue.go +++ b/lease/lease_queue.go @@ -20,8 +20,9 @@ package lease type LeaseWithTime struct { id LeaseID // Unix nanos timestamp. - time int64 - index int + time int64 + delayCheckTime int64 + index int } type LeaseQueue []*LeaseWithTime @@ -29,7 +30,7 @@ type LeaseQueue []*LeaseWithTime func (pq LeaseQueue) Len() int { return len(pq) } func (pq LeaseQueue) Less(i, j int) bool { - return pq[i].time < pq[j].time + return pq[i].time+pq[i].delayCheckTime < pq[j].time+pq[j].delayCheckTime } func (pq LeaseQueue) Swap(i, j int) { diff --git a/lease/lease_queue_test.go b/lease/lease_queue_test.go index 8d228d16af0b..b4b2f11d74f8 100644 --- a/lease/lease_queue_test.go +++ b/lease/lease_queue_test.go @@ -21,9 +21,11 @@ import ( ) func TestLeaseQueue(t *testing.T) { + expiredRetryInterval := 100 * time.Millisecond le := &lessor{ - leaseHeap: make(LeaseQueue, 0), - leaseMap: make(map[LeaseID]*Lease), + leaseHeap: make(LeaseQueue, 0), + leaseMap: make(map[LeaseID]*Lease), + expiredLeaseRetryInterval: expiredRetryInterval, } heap.Init(&le.leaseHeap) @@ -42,18 +44,40 @@ func TestLeaseQueue(t *testing.T) { t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id) } - l, ok, more := le.expireExists() - if l.ID != 1 { - t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID) - } - if !ok { - t.Fatal("expect expiry lease exists") - } - if more { - t.Fatal("expect no more expiry lease") + existExpiredEvent := func() { + l, ok, more := le.expireExists() + if l.ID != 1 { + t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID) + } + if !ok { + t.Fatal("expect expiry lease exists") + } + if more { + t.Fatal("expect no more expiry lease") + } + + if le.leaseHeap.Len() != 50 { + t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len()) + } + + if le.leaseHeap[0].id != LeaseID(1) && le.leaseHeap[0].delayCheckTime > 0 { + t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id) + } } - if le.leaseHeap.Len() != 50 { - t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len()) + noExpiredEvent := func() { + // re-acquire the expired item, nothing exists + _, ok, more := le.expireExists() + if ok { + t.Fatal("expect no expiry lease exists") + } + if more { + t.Fatal("expect no more expiry lease") + } } + + existExpiredEvent() // first acquire + noExpiredEvent() // second acquire + time.Sleep(expiredRetryInterval) + existExpiredEvent() // acquire after retry interval } diff --git a/lease/lessor.go b/lease/lessor.go index 5ca3a359af1a..630a83372753 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -47,11 +47,14 @@ var ( // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests leaseCheckpointRate = 1000 + // the default intervall of lease checkpoint + defaultLeaseCheckpointInterval = 5 * time.Minute + // maximum number of lease checkpoints to batch into a single consensus log entry maxLeaseCheckpointBatchSize = 1000 - // interval to check if the expired lease is revoked - expiredleaseRetryInterval = 3 * time.Second + // the default interval to check if the expired lease is revoked + defaultExpiredleaseRetryInterval = 3 * time.Second ErrNotPrimary = errors.New("not a primary lessor") ErrLeaseNotFound = errors.New("lease not found") @@ -176,11 +179,23 @@ type lessor struct { // Wait duration between lease checkpoints. checkpointInterval time.Duration + // the interval of expired lease check + expiredLeaseRetryInterval time.Duration } type LessorConfig struct { - MinLeaseTTL int64 - CheckpointInterval time.Duration + MinLeaseTTL int64 + CheckpointInterval time.Duration + ExpiredLeasesRetryInterval time.Duration +} + +func (c *LessorConfig) adjust() { + if c.CheckpointInterval == 0 { + c.CheckpointInterval = defaultLeaseCheckpointInterval + } + if c.ExpiredLeasesRetryInterval == 0 { + c.ExpiredLeasesRetryInterval = defaultExpiredleaseRetryInterval + } } func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { @@ -188,18 +203,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { } func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { - checkpointInterval := cfg.CheckpointInterval - if checkpointInterval == 0 { - checkpointInterval = 5 * time.Minute - } + cfg.adjust() l := &lessor{ - leaseMap: make(map[LeaseID]*Lease), - itemMap: make(map[LeaseItem]LeaseID), - leaseHeap: make(LeaseQueue, 0), - leaseCheckpointHeap: make(LeaseQueue, 0), - b: b, - minLeaseTTL: cfg.MinLeaseTTL, - checkpointInterval: checkpointInterval, + leaseMap: make(map[LeaseID]*Lease), + itemMap: make(map[LeaseItem]LeaseID), + leaseHeap: make(LeaseQueue, 0), + leaseCheckpointHeap: make(LeaseQueue, 0), + b: b, + minLeaseTTL: cfg.MinLeaseTTL, + checkpointInterval: cfg.CheckpointInterval, + expiredLeaseRetryInterval: cfg.ExpiredLeasesRetryInterval, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), @@ -654,18 +667,15 @@ func (le *lessor) expireExists() (l *Lease, ok bool, next bool) { return nil, false, true } now := time.Now() - if now.UnixNano() < item.time /* expiration time */ { + if now.UnixNano() < item.time+item.delayCheckTime /* expiration time */ { // Candidate expirations are caught up, reinsert this item // and no need to revoke (nothing is expiry) return l, false, false } - // if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap - - heap.Pop(&le.leaseHeap) // O(log N) // recheck if revoke is complete after retry interval - item.time = now.Add(expiredleaseRetryInterval).UnixNano() - heap.Push(&le.leaseHeap, item) + item.delayCheckTime = now.Add(le.expiredLeaseRetryInterval).UnixNano() - item.time + heap.Fix(&le.leaseHeap, item.index) return l, true, false }