Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease/lessor: recheck if exprired lease is revoked #10693

Merged
merged 1 commit into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,14 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval})
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
Expand Down
51 changes: 51 additions & 0 deletions lease/lease_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package lease

import "container/heap"

// LeaseWithTime contains lease object with a time.
// For the lessor's lease heap, time identifies the lease expiration time.
// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
Expand Down Expand Up @@ -53,3 +55,52 @@ func (pq *LeaseQueue) Pop() interface{} {
*pq = old[0 : n-1]
return item
}

// LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
// Only save one item for a lease, `Register` will update time of the corresponding lease.
type LeaseExpiredNotifier struct {
m map[LeaseID]*LeaseWithTime
queue LeaseQueue
}

func newLeaseExpiredNotifier() *LeaseExpiredNotifier {
return &LeaseExpiredNotifier{
m: make(map[LeaseID]*LeaseWithTime),
queue: make(LeaseQueue, 0),
}
}

func (mq *LeaseExpiredNotifier) Init() {
heap.Init(&mq.queue)
mq.m = make(map[LeaseID]*LeaseWithTime)
for _, item := range mq.queue {
mq.m[item.id] = item
}
}

func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
if old, ok := mq.m[item.id]; ok {
old.time = item.time
heap.Fix(&mq.queue, old.index)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
} else {
heap.Push(&mq.queue, item)
mq.m[item.id] = item
}
}

func (mq *LeaseExpiredNotifier) Unregister() *LeaseWithTime {
item := heap.Pop(&mq.queue).(*LeaseWithTime)
delete(mq.m, item.id)
return item
}

func (mq *LeaseExpiredNotifier) Poll() *LeaseWithTime {
if mq.Len() == 0 {
return nil
}
return mq.queue[0]
}

func (mq *LeaseExpiredNotifier) Len() int {
return len(mq.m)
}
61 changes: 42 additions & 19 deletions lease/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
package lease

import (
"container/heap"
"testing"
"time"
)

func TestLeaseQueue(t *testing.T) {
expiredRetryInterval := 100 * time.Millisecond
le := &lessor{
leaseHeap: make(LeaseQueue, 0),
leaseMap: make(map[LeaseID]*Lease),
leaseExpiredNotifier: newLeaseExpiredNotifier(),
leaseMap: make(map[LeaseID]*Lease),
expiredLeaseRetryInterval: expiredRetryInterval,
}
heap.Init(&le.leaseHeap)
le.leaseExpiredNotifier.Init()

// insert in reverse order of expiration time
for i := 50; i >= 1; i-- {
Expand All @@ -34,26 +35,48 @@ func TestLeaseQueue(t *testing.T) {
exp = time.Now().UnixNano()
}
le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)}
heap.Push(&le.leaseHeap, &LeaseWithTime{id: LeaseID(i), time: exp})
le.leaseExpiredNotifier.RegisterOrUpdate(&LeaseWithTime{id: LeaseID(i), time: exp})
}

// first element must be front
if le.leaseHeap[0].id != LeaseID(1) {
t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id)
// first element is expired.
if le.leaseExpiredNotifier.Poll().id != LeaseID(1) {
t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseExpiredNotifier.Poll().id)
}

l, ok, more := le.expireExists()
if l.ID != 1 {
t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
}
if !ok {
t.Fatal("expect expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
existExpiredEvent := func() {
l, ok, more := le.expireExists()
if l.ID != 1 {
t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
}
if !ok {
t.Fatal("expect expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
}

if le.leaseExpiredNotifier.Len() != 50 {
t.Fatalf("expected the expired lease to be pushed back to the heap, heap size got %d", le.leaseExpiredNotifier.Len())
}

if le.leaseExpiredNotifier.Poll().id != LeaseID(1) {
t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseExpiredNotifier.Poll().id)
}
}

if le.leaseHeap.Len() != 49 {
t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len())
noExpiredEvent := func() {
// re-acquire the expired item, nothing exists
_, ok, more := le.expireExists()
if ok {
t.Fatal("expect no expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
}
}

existExpiredEvent() // first acquire
noExpiredEvent() // second acquire
time.Sleep(expiredRetryInterval)
existExpiredEvent() // acquire after retry interval
}
67 changes: 41 additions & 26 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ var (
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000

// the default interval of lease checkpoint
defaultLeaseCheckpointInterval = 5 * time.Minute

// maximum number of lease checkpoints to batch into a single consensus log entry
maxLeaseCheckpointBatchSize = 1000

// the default interval to check if the expired lease is revoked
defaultExpiredleaseRetryInterval = 3 * time.Second

ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
ErrLeaseExists = errors.New("lease already exists")
Expand Down Expand Up @@ -142,10 +148,10 @@ type lessor struct {
// demotec will be closed if the lessor is demoted.
demotec chan struct{}

leaseMap map[LeaseID]*Lease
leaseHeap LeaseQueue
leaseCheckpointHeap LeaseQueue
itemMap map[LeaseItem]LeaseID
leaseMap map[LeaseID]*Lease
leaseExpiredNotifier *LeaseExpiredNotifier
leaseCheckpointHeap LeaseQueue
itemMap map[LeaseItem]LeaseID

// When a lease expires, the lessor will delete the
// leased range (or key) by the RangeDeleter.
Expand Down Expand Up @@ -173,11 +179,14 @@ type lessor struct {

// Wait duration between lease checkpoints.
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
}

type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
Expand All @@ -186,17 +195,22 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
checkpointInterval = 5 * time.Minute
checkpointInterval = defaultLeaseCheckpointInterval
}
if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
}
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseExpiredNotifier: newLeaseExpiredNotifier(),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
Expand Down Expand Up @@ -278,7 +292,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {

le.leaseMap[id] = l
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
le.leaseExpiredNotifier.RegisterOrUpdate(item)
l.persistTo(le.b)

leaseTotalTTLs.Observe(float64(l.ttl))
Expand Down Expand Up @@ -393,7 +407,7 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.Lock()
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.mu.Unlock()

leaseRenewed.Inc()
Expand Down Expand Up @@ -432,7 +446,7 @@ func (le *lessor) Promote(extend time.Duration) {
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
le.leaseExpiredNotifier.RegisterOrUpdate(item)
}

if len(le.leaseMap) < leaseRevokeRate {
Expand Down Expand Up @@ -470,7 +484,7 @@ func (le *lessor) Promote(extend time.Duration) {
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}
}
Expand Down Expand Up @@ -638,27 +652,28 @@ func (le *lessor) clearScheduledLeasesCheckpoints() {
// It pops only when expiry item exists.
// "next" is true, to indicate that it may exist in next attempt.
func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
if le.leaseHeap.Len() == 0 {
if le.leaseExpiredNotifier.Len() == 0 {
return nil, false, false
}

item := le.leaseHeap[0]
item := le.leaseExpiredNotifier.Poll()
l = le.leaseMap[item.id]
if l == nil {
// lease has expired or been revoked
// no need to revoke (nothing is expiry)
heap.Pop(&le.leaseHeap) // O(log N)
le.leaseExpiredNotifier.Unregister() // O(log N)
return nil, false, true
}

if time.Now().UnixNano() < item.time /* expiration time */ {
now := time.Now()
if now.UnixNano() < item.time /* expiration time */ {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l, false, false
}
// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap

heap.Pop(&le.leaseHeap) // O(log N)
jingyih marked this conversation as resolved.
Show resolved Hide resolved
// recheck if revoke is complete after retry interval
item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano()
le.leaseExpiredNotifier.RegisterOrUpdate(item)
return l, true, false
}

Expand Down Expand Up @@ -775,7 +790,7 @@ func (le *lessor) initAndRecover() {
revokec: make(chan struct{}),
}
}
heap.Init(&le.leaseHeap)
le.leaseExpiredNotifier.Init()
jingyih marked this conversation as resolved.
Show resolved Hide resolved
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()

Expand Down