Skip to content

Commit

Permalink
feat(mincore): Add page fault limiter
Browse files Browse the repository at this point in the history
This commit adds `mincore.Limiter` which throttles page faults caused
by mmap() data. It works by periodically calling `mincore()` to determine
which pages are not resident in memory and using `rate.Limiter` to
throttle accessing using a token bucket algorithm.
  • Loading branch information
benbjohnson committed Jul 17, 2020
1 parent 009e113 commit 5820678
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 22 deletions.
22 changes: 21 additions & 1 deletion cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
jaegerconfig "github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -353,6 +354,12 @@ func launcherOpts(l *Launcher) []cli.Opt {
Default: 10,
Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected",
},
{
DestP: &l.pageFaultRate,
Flag: "page-fault-rate",
Default: 0,
Desc: "the number of page faults allowed per second in the storage engine",
},
{
DestP: &l.featureFlags,
Flag: "feature-flags",
Expand Down Expand Up @@ -423,6 +430,8 @@ type Launcher struct {
Stdout io.Writer
Stderr io.Writer
apibackend *http.APIBackend

pageFaultRate int
}

type stoppingScheduler interface {
Expand Down Expand Up @@ -692,13 +701,24 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}

// Enable storage layer page fault limiting if rate set above zero.
var pageFaultLimiter *rate.Limiter
if m.pageFaultRate > 0 {
pageFaultLimiter = rate.NewLimiter(rate.Limit(m.pageFaultRate), 1)
}

if m.testing {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc))
flushers = append(flushers, engine)
m.engine = engine
} else {
m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc))
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketSvc),
storage.WithPageFaultLimiter(pageFaultLimiter),
)
}
m.engine.WithLogger(m.log)
if err := m.engine.Open(ctx); err != nil {
Expand Down
175 changes: 175 additions & 0 deletions pkg/mincore/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package mincore

import (
"context"
"os"
"sync"
"time"
"unsafe"

"golang.org/x/time/rate"
)

// Limiter defaults.
const (
DefaultUpdateInterval = 10 * time.Second
)

// Limiter represents a token bucket rate limiter based on
type Limiter struct {
mu sync.Mutex
underlying *rate.Limiter
data []byte // mmap reference
incore []byte // in-core vector
updatedAt time.Time // last incore update

// Frequency of updates of the in-core vector.
// Updates are performed lazily so this is the maximum frequency.
UpdateInterval time.Duration

// OS mincore() function.
Mincore func(data []byte) ([]byte, error)
}

// NewLimiter returns a new instance of Limiter associated with an mmap.
// The underlying limiter can be shared to limit faults across the entire process.
func NewLimiter(underlying *rate.Limiter, data []byte) *Limiter {
return &Limiter{
underlying: underlying,
data: data,

UpdateInterval: DefaultUpdateInterval,
Mincore: Mincore,
}
}

// WaitPointer checks if ptr would cause a page fault and, if so, rate limits its access.
// Once a page access is limited, it's updated to be considered memory resident.
func (l *Limiter) WaitPointer(ctx context.Context, ptr unsafe.Pointer) error {
// Check if the page is in-memory under lock.
// However, we want to exclude the wait from the limiter lock.
if wait, err := func() (bool, error) {
l.mu.Lock()
defer l.mu.Unlock()

// Update incore mapping if data is too stale.
if err := l.checkUpdate(); err != nil {
return false, err
}

return l.wait(ptr), nil
}(); err != nil {
return err
} else if !wait {
return nil
}

return l.underlying.Wait(ctx)
}

// WaitRange checks all pages in b for page faults and, if so, rate limits their access.
// Once a page access is limited, it's updated to be considered memory resident.
func (l *Limiter) WaitRange(ctx context.Context, b []byte) error {
// Empty byte slices will never access memory so skip them.
if len(b) == 0 {
return nil
}

// Check every page for being in-memory under lock.
// However, we want to exclude the wait from the limiter lock.
var n int
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()

// Update incore mapping if data is too stale.
if err := l.checkUpdate(); err != nil {
return err
}

// Iterate over every page within the range.
pageSize := uintptr(os.Getpagesize())
start := (uintptr(unsafe.Pointer(&b[0])) / pageSize) * pageSize
end := (uintptr(unsafe.Pointer(&b[len(b)-1])) / pageSize) * pageSize

for i := start; i <= end; i += pageSize {
if l.wait(unsafe.Pointer(i)) {
n++
}
}

return nil
}(); err != nil {
return err
} else if n == 0 {
return nil
}

for i := 0; i < n; i++ {
if err := l.underlying.Wait(ctx); err != nil {
return err
}
}
return nil
}

func (l *Limiter) wait(ptr unsafe.Pointer) bool {
// Check if page access requires page fault. If not, exit immediately.
// If so, mark the page as memory resident afterward.
if l.isInCore(ptr) {
return false
}

// Otherwise mark page as resident in memory and rate limit.
if i := l.index(ptr); i < len(l.incore) {
l.incore[l.index(ptr)] |= 1
}
return true
}

// IsInCore returns true if the address is resident in memory or if the
// address is outside the range of the data the limiter is tracking.
func (l *Limiter) IsInCore(ptr unsafe.Pointer) bool {
l.mu.Lock()
defer l.mu.Unlock()
return l.isInCore(ptr)
}

func (l *Limiter) isInCore(ptr unsafe.Pointer) bool {
if i := l.index(ptr); i < len(l.incore) {
return (l.incore[i] & 1) == 1
}
return true
}

// Update updates the vector of in-core pages. Automatically updated when calling Wait().
func (l *Limiter) Update() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.update()
}

func (l *Limiter) update() error {
vec, err := l.Mincore(l.data)
if err != nil {
return err
}

l.incore = vec
l.updatedAt = time.Now()

return nil
}

// checkUpdate performs an update if one hasn't been done before or the interval has passed.
func (l *Limiter) checkUpdate() error {
if l.incore != nil && time.Since(l.updatedAt) < l.UpdateInterval {
return nil
}
return l.update()
}

// index returns the position in the in-core vector that represents ptr.
func (l *Limiter) index(ptr unsafe.Pointer) int {
return int(int64(uintptr(ptr)-uintptr(unsafe.Pointer(&l.data[0]))) / int64(os.Getpagesize()))
}
131 changes: 131 additions & 0 deletions pkg/mincore/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mincore_test

import (
"context"
"os"
"testing"
"time"
"unsafe"

"github.com/influxdata/influxdb/v2/pkg/mincore"
"golang.org/x/time/rate"
)

func TestLimiter(t *testing.T) {
pageSize := os.Getpagesize()

// Ensure limiter waits long enough between faults
t.Run("WaitPointer", func(t *testing.T) {
t.Parallel()

data := make([]byte, pageSize*2)
l := mincore.NewLimiter(rate.NewLimiter(1, 1), data) // 1 fault per sec
l.Mincore = func(data []byte) ([]byte, error) { return make([]byte, 2), nil }

start := time.Now()
if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[0])); err != nil {
t.Fatal(err)
} else if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[pageSize])); err != nil {
t.Fatal(err)
}

if d := time.Since(start); d < time.Second {
t.Fatalf("not enough time elapsed: %s", d)
}
})

// Ensure limiter waits long enough between faults for a byte slice.
t.Run("WaitRange", func(t *testing.T) {
t.Parallel()

data := make([]byte, 2*pageSize)
l := mincore.NewLimiter(rate.NewLimiter(1, 1), data) // 1 fault per sec
l.Mincore = func(data []byte) ([]byte, error) { return make([]byte, 2), nil }

start := time.Now()
if err := l.WaitRange(context.Background(), data); err != nil {
t.Fatal(err)
}

if d := time.Since(start); d < time.Second {
t.Fatalf("not enough time elapsed: %s", d)
}
})

// Ensure pages are marked as in-core after calling Wait() on them.
t.Run("MoveToInMemoryAfterUse", func(t *testing.T) {
t.Parallel()

data := make([]byte, pageSize*10)
l := mincore.NewLimiter(rate.NewLimiter(1, 1), data)
l.Mincore = func(data []byte) ([]byte, error) {
return make([]byte, 10), nil
}
if err := l.Update(); err != nil {
t.Fatal(err)
} else if l.IsInCore(unsafe.Pointer(&data[0])) {
t.Fatal("expected page to not be in-memory")
}

if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[0])); err != nil {
t.Fatal(err)
} else if !l.IsInCore(unsafe.Pointer(&data[0])) {
t.Fatal("expected page to be in-memory")
}
})

// Ensure fresh in-core data is pulled after the update interval.
t.Run("UpdateAfterInterval", func(t *testing.T) {
t.Parallel()

data := make([]byte, pageSize*10)
l := mincore.NewLimiter(rate.NewLimiter(1, 1), data)
l.UpdateInterval = 100 * time.Millisecond

var n int
l.Mincore = func(data []byte) ([]byte, error) {
n++
return make([]byte, 10), nil
}

// Wait for two pages to pull them in-memory.
if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[0])); err != nil {
t.Fatal(err)
} else if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[pageSize])); err != nil {
t.Fatal(err)
} else if !l.IsInCore(unsafe.Pointer(&data[0])) {
t.Fatal("expected page to be in-memory")
} else if !l.IsInCore(unsafe.Pointer(&data[pageSize])) {
t.Fatal("expected page to be in-memory")
}

// Wait for interval to pass.
time.Sleep(l.UpdateInterval)

// Fetch one of the previous pages and ensure the other one has been flushed from the update.
if err := l.WaitPointer(context.Background(), unsafe.Pointer(&data[0])); err != nil {
t.Fatal(err)
} else if !l.IsInCore(unsafe.Pointer(&data[0])) {
t.Fatal("expected page to be in-memory")
} else if l.IsInCore(unsafe.Pointer(&data[pageSize])) {
t.Fatal("expected page to not be in-memory")
}

if got, want := n, 2; got != want {
t.Fatalf("refreshed %d times, expected %d times", got, want)
}
})

// Ensure referencing data outside the limiter's data shows as in-memory.
t.Run("OutOfBounds", func(t *testing.T) {
l := mincore.NewLimiter(rate.NewLimiter(1, 1), make([]byte, pageSize))
l.Mincore = func(data []byte) ([]byte, error) {
return make([]byte, 1), nil
}

data := make([]byte, pageSize)
if !l.IsInCore(unsafe.Pointer(&data[0])) {
t.Fatal("expected out-of-bounds page to be resident")
}
})
}
24 changes: 24 additions & 0 deletions pkg/mincore/mincore_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// +build darwin dragonfly freebsd linux nacl netbsd openbsd

package mincore

import (
"os"
"unsafe"

"golang.org/x/sys/unix"
)

// Mincore is a wrapper function for mincore(2).
func Mincore(data []byte) ([]byte, error) {
vec := make([]byte, (int64(len(data))+int64(os.Getpagesize())-1)/int64(os.Getpagesize()))

if ret, _, err := unix.Syscall(
unix.SYS_MINCORE,
uintptr(unsafe.Pointer(&data[0])),
uintptr(len(data)),
uintptr(unsafe.Pointer(&vec[0]))); ret != 0 {
return nil, err
}
return vec, nil
}
8 changes: 8 additions & 0 deletions pkg/mincore/mincore_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// +build windows

package mincore

// Mincore returns a zero-length vector.
func Mincore(data []byte) ([]byte, error) {
return make([]byte, 0), nil
}
Loading

0 comments on commit 5820678

Please sign in to comment.