Skip to content

Commit

Permalink
lease: do lease pile-up reduction in the background
Browse files Browse the repository at this point in the history
This moves lease pile-up reduction into a goroutine which mostly operates on a copy of
the lease list, to avoid locking. This prevents timeouts when the lessor is locked
for a long time (when there are a lot of leases, mostly). This should solve etcd-io#9496.

Before:
```
BenchmarkLessorPromote1-16                        500000 4036 ns/op
BenchmarkLessorPromote10-16                       500000 3932 ns/op
BenchmarkLessorPromote100-16                      500000 3954 ns/op
BenchmarkLessorPromote1000-16                     300000 3906 ns/op
BenchmarkLessorPromote10000-16                    300000 4639 ns/op
BenchmarkLessorPromote100000-16                      100 27216481 ns/op
BenchmarkLessorPromote1000000-16                     100 325164684 ns/op
```

After:
```
BenchmarkLessorPromote1-16                        500000 3769 ns/op
BenchmarkLessorPromote10-16                       500000 3835 ns/op
BenchmarkLessorPromote100-16                      500000 3829 ns/op
BenchmarkLessorPromote1000-16                     500000 3665 ns/op
BenchmarkLessorPromote10000-16                    500000 3800 ns/op
BenchmarkLessorPromote100000-16                   300000 4114 ns/op
BenchmarkLessorPromote1000000-16                  300000 5143 ns/op
```
  • Loading branch information
braintreeps committed May 11, 2018
1 parent 67b1ff6 commit 81f80d9
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 46 deletions.
109 changes: 64 additions & 45 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}

// when the lease pile-up reduction is done this is true
Ready bool
}

func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
Expand Down Expand Up @@ -329,66 +332,82 @@ func (le *lessor) unsafeLeases() []*Lease {
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))
return leases
}

func (le *lessor) Leases() []*Lease {
le.mu.RLock()
ls := le.unsafeLeases()
le.mu.RUnlock()

sort.Sort(leasesByExpiry(ls))
return ls
}

func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()
le.Ready = false
go func() {
le.mu.RLock()
le.demotec = make(chan struct{})
leaseCopy := le.unsafeLeases()
le.mu.RUnlock()
var updateList []*LeaseWithTime
defer func() {
le.mu.Lock()
defer le.mu.Unlock()
for _, item := range updateList {
heap.Push(&le.leaseHeap, item)
}
le.Ready = true
}()

// refresh the expiries of all leases.
for _, l := range leaseCopy {
l.refresh(extend)
// check that the lease hasn't been revoked
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}

if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}

le.demotec = make(chan struct{})
// adjust expiries in case of overlap

// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
sort.Sort(leasesByExpiry(leaseCopy))

if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
baseWindow := leaseCopy[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4

// adjust expiries in case of overlap
leases := le.unsafeLeases()

baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
for _, l := range leaseCopy {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
}()
}

type leasesByExpiry []*Lease
Expand Down Expand Up @@ -490,7 +509,7 @@ func (le *lessor) runLoop() {
revokeLimit := leaseRevokeRate / 2

le.mu.RLock()
if le.isPrimary() {
if le.isPrimary() && le.Ready {
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
Expand Down
26 changes: 26 additions & 0 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lease
import (
"os"
"testing"
"time"

"github.com/coreos/etcd/mvcc/backend"
)
Expand Down Expand Up @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b
func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) }
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }

func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) }
func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) }
func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) }
func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) }
func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) }
func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) }
func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) }

func benchmarkLessorFindExpired(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
Expand Down Expand Up @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) {
}
}

func benchmarkLessorPromote(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}

b.ResetTimer()

go func() { le.Promote(100 * time.Second) }()
for i := 0; i < b.N; i++ {
le.Grant(LeaseID(i+size), int64(100+i+size))
}
}

func cleanup(b backend.Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
19 changes: 18 additions & 1 deletion lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"sort"
"sync"
"testing"
Expand All @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)
waitForPromotion(le)

l, err := le.Grant(1, 1)
if err != nil {
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)

waitForPromotion(le)
l, err := le.Grant(1, minLeaseTTL)
if err != nil {
t.Fatalf("failed to grant lease (%v)", err)
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {

// extend after recovery should extend expiration on lease pile-up
le.Promote(0)
waitForPromotion(le)

windowCounts := make(map[int64]int)
for _, l := range le.leaseMap {
Expand Down Expand Up @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
func waitForPromotion(le *lessor) {
for {
le.mu.RLock()
ready := le.Ready
le.mu.RUnlock()
if ready {
return
} else {
runtime.Gosched()
}
}
}

0 comments on commit 81f80d9

Please sign in to comment.