Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-3.5] server: set multiple concurrentReadTx instances share one txReadBuffer. #13036

Merged
merged 1 commit into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions server/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ type Snapshot interface {
Close() error
}

type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}

type backend struct {
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
Expand All @@ -102,6 +108,11 @@ type backend struct {
batchTx *batchTxBuffered

readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
txReadBufferCache txReadBufferCache

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -183,19 +194,26 @@ func newBackend(bcfg BackendConfig) *backend {
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},

stopc: make(chan struct{}),
donec: make(chan struct{}),

lg: bcfg.Logger,
}

b.batchTx = newBatchTxBuffered(b)
// We set it after newBatchTxBuffered to skip the 'empty' commit.
b.hooks = bcfg.Hooks
Expand All @@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx {
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)

// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.

// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update

// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
// which requires write lock to update "readTx.baseReadTx.buf".
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
// by avoiding copying "readTx.baseReadTx.buf".
b.txReadBufferCache.mu.Lock()

curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion

isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer

var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
// then the cache became stale while copying "readTx.baseReadTx.buf".
// It is safe to not update "txReadBufferCache.buf", because the next following
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}

b.txReadBufferCache.mu.Unlock()

// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: b.readTx.buf.unsafeCopy(),
buf: *buf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
Expand Down
13 changes: 11 additions & 2 deletions server/mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
)

const bucketBufferInitialSize = 512

// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
Expand Down Expand Up @@ -69,10 +71,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}

// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
Expand All @@ -94,6 +102,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
Expand All @@ -114,7 +123,7 @@ type bucketBuffer struct {
}

func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
Expand Down