Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed May 2, 2019
1 parent 1bde799 commit 7f3b1ce
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 38 deletions.
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval})
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout()})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
Expand Down
7 changes: 4 additions & 3 deletions lease/lease_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ package lease
type LeaseWithTime struct {
id LeaseID
// Unix nanos timestamp.
time int64
index int
time int64
delayCheckTime int64
index int
}

type LeaseQueue []*LeaseWithTime

func (pq LeaseQueue) Len() int { return len(pq) }

func (pq LeaseQueue) Less(i, j int) bool {
return pq[i].time < pq[j].time
return pq[i].time+pq[i].delayCheckTime < pq[j].time+pq[j].delayCheckTime
}

func (pq LeaseQueue) Swap(i, j int) {
Expand Down
50 changes: 37 additions & 13 deletions lease/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
)

func TestLeaseQueue(t *testing.T) {
expiredRetryInterval := 100 * time.Millisecond
le := &lessor{
leaseHeap: make(LeaseQueue, 0),
leaseMap: make(map[LeaseID]*Lease),
leaseHeap: make(LeaseQueue, 0),
leaseMap: make(map[LeaseID]*Lease),
expiredLeaseRetryInterval: expiredRetryInterval,
}
heap.Init(&le.leaseHeap)

Expand All @@ -42,18 +44,40 @@ func TestLeaseQueue(t *testing.T) {
t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id)
}

l, ok, more := le.expireExists()
if l.ID != 1 {
t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
}
if !ok {
t.Fatal("expect expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
existExpiredEvent := func() {
l, ok, more := le.expireExists()
if l.ID != 1 {
t.Fatalf("first item expected lease ID %d, got %d", 1, l.ID)
}
if !ok {
t.Fatal("expect expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
}

if le.leaseHeap.Len() != 50 {
t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len())
}

if le.leaseHeap[0].id != LeaseID(1) && le.leaseHeap[0].delayCheckTime > 0 {
t.Fatalf("first item expected lease ID %d, got %d", LeaseID(1), le.leaseHeap[0].id)
}
}

if le.leaseHeap.Len() != 50 {
t.Fatalf("expected lease heap pop, got %d", le.leaseHeap.Len())
noExpiredEvent := func() {
// re-acquire the expired item, nothing exists
_, ok, more := le.expireExists()
if ok {
t.Fatal("expect no expiry lease exists")
}
if more {
t.Fatal("expect no more expiry lease")
}
}

existExpiredEvent() // first acquire
noExpiredEvent() // second acquire
time.Sleep(expiredRetryInterval)
existExpiredEvent() // acquire after retry interval
}
52 changes: 31 additions & 21 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ var (
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000

// the default intervall of lease checkpoint
defaultLeaseCheckpointInterval = 5 * time.Minute

// maximum number of lease checkpoints to batch into a single consensus log entry
maxLeaseCheckpointBatchSize = 1000

// interval to check if the expired lease is revoked
expiredleaseRetryInterval = 3 * time.Second
// the default interval to check if the expired lease is revoked
defaultExpiredleaseRetryInterval = 3 * time.Second

ErrNotPrimary = errors.New("not a primary lessor")
ErrLeaseNotFound = errors.New("lease not found")
Expand Down Expand Up @@ -176,30 +179,40 @@ type lessor struct {

// Wait duration between lease checkpoints.
checkpointInterval time.Duration
// the interval of expired lease check
expiredLeaseRetryInterval time.Duration
}

type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
}

func (c *LessorConfig) adjust() {
if c.CheckpointInterval == 0 {
c.CheckpointInterval = defaultLeaseCheckpointInterval
}
if c.ExpiredLeasesRetryInterval == 0 {
c.ExpiredLeasesRetryInterval = defaultExpiredleaseRetryInterval
}
}

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
if checkpointInterval == 0 {
checkpointInterval = 5 * time.Minute
}
cfg.adjust()
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: cfg.CheckpointInterval,
expiredLeaseRetryInterval: cfg.ExpiredLeasesRetryInterval,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
Expand Down Expand Up @@ -654,18 +667,15 @@ func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
return nil, false, true
}
now := time.Now()
if now.UnixNano() < item.time /* expiration time */ {
if now.UnixNano() < item.time+item.delayCheckTime /* expiration time */ {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l, false, false
}
// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap

heap.Pop(&le.leaseHeap) // O(log N)

// recheck if revoke is complete after retry interval
item.time = now.Add(expiredleaseRetryInterval).UnixNano()
heap.Push(&le.leaseHeap, item)
item.delayCheckTime = now.Add(le.expiredLeaseRetryInterval).UnixNano() - item.time
heap.Fix(&le.leaseHeap, item.index)
return l, true, false
}

Expand Down

0 comments on commit 7f3b1ce

Please sign in to comment.