diff --git a/CHANGELOG-3.4.md b/CHANGELOG-3.4.md index b5899d85538..13067254863 100644 --- a/CHANGELOG-3.4.md +++ b/CHANGELOG-3.4.md @@ -27,6 +27,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [ - The retention window of compaction period moves for every given compaction period or hour. - For instance, when hourly writes are 100 and `--auto-compaction-mode=periodic --auto-compaction-retention=24h`, `v3.2.x`, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 2400, 2640, and 2880 for every 2.4-hour, while `v3.3.3` *or later* compacts revision 2400, 2500, 2600 for every 1-hour. - Futhermore, when `--auto-compaction-mode=periodic --auto-compaction-retention=30m` and writes per minute are about 1000, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 30000, 33000, and 36000, for every 3-minute, while `v3.3.3` *or later* compacts revision 30000, 60000, and 90000, for every 30-minute. +- Improve [lease expire/revoke operation performance](https://github.com/coreos/etcd/pull/9418), address [lease scalability issue](https://github.com/coreos/etcd/issues/9496). - Make [Lease `Lookup` non-blocking with concurrent `Grant`/`Revoke`](https://github.com/coreos/etcd/pull/9229). ### Breaking Changes diff --git a/lease/lease_queue.go b/lease/lease_queue.go new file mode 100644 index 00000000000..a7c3cf553c4 --- /dev/null +++ b/lease/lease_queue.go @@ -0,0 +1,51 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +type LeaseWithTime struct { + leaseId LeaseID + expiration int64 + index int +} + +type LeaseQueue []*LeaseWithTime + +func (pq LeaseQueue) Len() int { return len(pq) } + +func (pq LeaseQueue) Less(i, j int) bool { + return pq[i].expiration < pq[j].expiration +} + +func (pq LeaseQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *LeaseQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*LeaseWithTime) + item.index = n + *pq = append(*pq, item) +} + +func (pq *LeaseQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} diff --git a/lease/lessor.go b/lease/lessor.go index 31f645fa337..8609e47b63c 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -15,6 +15,7 @@ package lease import ( + "container/heap" "encoding/binary" "errors" "math" @@ -128,9 +129,9 @@ type lessor struct { // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and // Renew O(1). // findExpiredLeases and Renew should be the most frequent operations. - leaseMap map[LeaseID]*Lease - - itemMap map[LeaseItem]LeaseID + leaseMap map[LeaseID]*Lease + leaseHeap LeaseQueue + itemMap map[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. @@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), + leaseHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: minLeaseTTL, // expiredC is a small buffered chan to avoid unnecessary blocking. @@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) l.persistTo(le.b) return l, nil @@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { } l.refresh(0) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) return l.ttl, nil } @@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { l.refresh(extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } if len(le.leaseMap) < leaseRevokeRate { @@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) { delay := time.Duration(rateDelay) nextWindow = baseWindow + delay l.refresh(delay + extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } } @@ -516,9 +526,24 @@ func (le *lessor) runLoop() { func (le *lessor) findExpiredLeases(limit int) []*Lease { leases := make([]*Lease, 0, 16) - for _, l := range le.leaseMap { - // TODO: probably should change to <= 100-500 millisecond to - // make up committing latency. + for { + if le.leaseHeap.Len() == 0 { + break + } + + item := heap.Pop(&le.leaseHeap).(*LeaseWithTime) + l := le.leaseMap[item.leaseId] + if l == nil { + // lease has expired or been revoked, continue + continue + } + if time.Now().UnixNano() < item.expiration { + // Candidate expirations are caught up, reinsert this item + heap.Push(&le.leaseHeap, item) + break + } + // if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap + if l.expired() { leases = append(leases, l) @@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() { revokec: make(chan struct{}), } } + heap.Init(&le.leaseHeap) tx.Unlock() le.b.ForceCommit() diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go new file mode 100644 index 00000000000..a3be6aa95b2 --- /dev/null +++ b/lease/lessor_bench_test.go @@ -0,0 +1,121 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +import ( + "os" + "testing" + + "github.com/coreos/etcd/mvcc/backend" +) + +func BenchmarkLessorFindExpired1(b *testing.B) { benchmarkLessorFindExpired(1, b) } +func BenchmarkLessorFindExpired10(b *testing.B) { benchmarkLessorFindExpired(10, b) } +func BenchmarkLessorFindExpired100(b *testing.B) { benchmarkLessorFindExpired(100, b) } +func BenchmarkLessorFindExpired1000(b *testing.B) { benchmarkLessorFindExpired(1000, b) } +func BenchmarkLessorFindExpired10000(b *testing.B) { benchmarkLessorFindExpired(10000, b) } +func BenchmarkLessorFindExpired100000(b *testing.B) { benchmarkLessorFindExpired(100000, b) } +func BenchmarkLessorFindExpired1000000(b *testing.B) { benchmarkLessorFindExpired(1000000, b) } + +func BenchmarkLessorGrant1(b *testing.B) { benchmarkLessorGrant(1, b) } +func BenchmarkLessorGrant10(b *testing.B) { benchmarkLessorGrant(10, b) } +func BenchmarkLessorGrant100(b *testing.B) { benchmarkLessorGrant(100, b) } +func BenchmarkLessorGrant1000(b *testing.B) { benchmarkLessorGrant(1000, b) } +func BenchmarkLessorGrant10000(b *testing.B) { benchmarkLessorGrant(10000, b) } +func BenchmarkLessorGrant100000(b *testing.B) { benchmarkLessorGrant(100000, b) } +func BenchmarkLessorGrant1000000(b *testing.B) { benchmarkLessorGrant(1000000, b) } + +func BenchmarkLessorRenew1(b *testing.B) { benchmarkLessorRenew(1, b) } +func BenchmarkLessorRenew10(b *testing.B) { benchmarkLessorRenew(10, b) } +func BenchmarkLessorRenew100(b *testing.B) { benchmarkLessorRenew(100, b) } +func BenchmarkLessorRenew1000(b *testing.B) { benchmarkLessorRenew(1000, b) } +func BenchmarkLessorRenew10000(b *testing.B) { benchmarkLessorRenew(10000, b) } +func BenchmarkLessorRenew100000(b *testing.B) { benchmarkLessorRenew(100000, b) } +func BenchmarkLessorRenew1000000(b *testing.B) { benchmarkLessorRenew(1000000, b) } + +func BenchmarkLessorRevoke1(b *testing.B) { benchmarkLessorRevoke(1, b) } +func BenchmarkLessorRevoke10(b *testing.B) { benchmarkLessorRevoke(10, b) } +func BenchmarkLessorRevoke100(b *testing.B) { benchmarkLessorRevoke(100, b) } +func BenchmarkLessorRevoke1000(b *testing.B) { benchmarkLessorRevoke(1000, b) } +func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b) } +func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) } +func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) } + +func benchmarkLessorFindExpired(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + le.Promote(0) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + le.mu.Lock() //Stop the findExpiredLeases call in the runloop + defer le.mu.Unlock() + b.ResetTimer() + for i := 0; i < b.N; i++ { + le.findExpiredLeases(1000) + } +} + +func benchmarkLessorGrant(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + le.Grant(LeaseID(i+size), int64(100+i+size)) + } +} + +func benchmarkLessorRevoke(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + for i := 0; i < b.N; i++ { + le.Grant(LeaseID(i+size), int64(100+i+size)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + le.Revoke(LeaseID(i + size)) + } +} + +func benchmarkLessorRenew(size int, b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + le := newLessor(be, minLeaseTTL) + defer le.Stop() + defer cleanup(be, tmpPath) + for i := 0; i < size; i++ { + le.Grant(LeaseID(i), int64(100+i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + le.Renew(LeaseID(i)) + } +} + +func cleanup(b backend.Backend, path string) { + b.Close() + os.Remove(path) +}