Skip to content

Commit

Permalink
server: set multiple concurrentReadTx instances share one txReadBuffe…
Browse files Browse the repository at this point in the history
…r. Address issues mentioned by Gyuho Lee
  • Loading branch information
wilsonwang371 committed May 13, 2021
1 parent bcc8aae commit acaf5ae
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 37 deletions.
72 changes: 41 additions & 31 deletions server/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type backend struct {
// concurrentReadTx is not supposed to write to its txReadBuffer
// so multiple concurrentReadTx instances can share one txReadBuffer
// as long as there is no new write operations done.
cachedReadTxBuf txReadBufferCache
txReadBufferCache txReadBufferCache

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -201,7 +201,7 @@ func newBackend(bcfg BackendConfig) *backend {
txMu: new(sync.RWMutex),
},
},
cachedReadTxBuf: txReadBufferCache{
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
Expand Down Expand Up @@ -240,38 +240,48 @@ func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.txWg.Add(1)

// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
var modifiedBufVersion uint64
var buf *txReadBuffer

// we are using a mutex to protect the critical region where only one concurrent read tx can set the cache
b.cachedReadTxBuf.mu.Lock()
modifiedBufVersion = b.readTx.buf.Version()
if b.cachedReadTxBuf.buf == nil {
// initialize empty cache
bufCopy := b.readTx.buf.unsafeCopy()
b.cachedReadTxBuf.buf = &bufCopy
b.cachedReadTxBuf.bufVersion = modifiedBufVersion
buf = &bufCopy
} else {
// check if new data has been committed since last txReadBuffer copying.
if modifiedBufVersion != b.cachedReadTxBuf.bufVersion {
// we need to release the lock to maximize parallelism.
b.cachedReadTxBuf.mu.Unlock()
bufCopy := b.readTx.buf.unsafeCopy()
b.cachedReadTxBuf.mu.Lock()
buf = &bufCopy
} else {
buf = b.cachedReadTxBuf.buf
}
// it is possible we have buffer version updated during a unsafeCopy()
// if someone else has modified it, we will give up.
if modifiedBufVersion == b.readTx.buf.Version() {
b.cachedReadTxBuf.buf = buf
b.cachedReadTxBuf.bufVersion = modifiedBufVersion
}
// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update
b.txReadBufferCache.mu.Lock()

curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion

isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer

var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}

b.cachedReadTxBuf.mu.Unlock()
b.txReadBufferCache.mu.Unlock()

// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
Expand Down
6 changes: 0 additions & 6 deletions server/mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ type txReadBuffer struct {
bufVersion uint64
}

// Version returns the current buffer version which is updated each time after
// a write transaction
func (txr *txReadBuffer) Version() uint64 {
return txr.bufVersion
}

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
return b.Range(key, endKey, limit)
Expand Down

0 comments on commit acaf5ae

Please sign in to comment.