diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index ecf941f1fe73..c1c1ab155f6b 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -102,6 +102,10 @@ type backend struct { batchTx *batchTxBuffered readTx *readTx + // concurrentReadTx is not supposed to write to its txReadBuffer + // so multiple concurrentReadTx instances can share one txReadBuffer + // as long as there is no new write operations done. + cachedPrevReadTxBuf *txReadBuffer stopc chan struct{} donec chan struct{} @@ -183,7 +187,8 @@ func newBackend(bcfg BackendConfig) *backend { readTx: &readTx{ baseReadTx: baseReadTx{ buf: txReadBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + isModified: false, }, buckets: make(map[string]*bolt.Bucket), txWg: new(sync.WaitGroup), @@ -222,9 +227,18 @@ func (b *backend) ConcurrentReadTx() ReadTx { // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). b.readTx.txWg.Add(1) // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. + var modified bool + // check if new data has been committed since last txReadBuffer copying. + modified = b.readTx.buf.IsModified() + if modified || (!modified && b.cachedPrevReadTxBuf == nil) { + tmp := b.readTx.buf.unsafeCopy() + b.cachedPrevReadTxBuf = &tmp + } + b.readTx.buf.ResetModified() + // concurrentReadTx is not supposed to write to its txReadBuffer return &concurrentReadTx{ baseReadTx: baseReadTx{ - buf: b.readTx.buf.unsafeCopy(), + buf: *b.cachedPrevReadTxBuf, txMu: b.readTx.txMu, tx: b.readTx.tx, buckets: b.readTx.buckets, diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 4df6d0c5951d..4b4988d266d7 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -19,6 +19,8 @@ import ( "sort" ) +const bucketBufferInitialSize = 512 + // txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. type txBuffer struct { buckets map[string]*bucketBuffer @@ -69,10 +71,23 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb.merge(wb) } txw.reset() + // mark read buffer modified + txr.isModified = true } // txReadBuffer accesses buffered updates. -type txReadBuffer struct{ txBuffer } +type txReadBuffer struct{ + txBuffer + isModified bool +} + +func (txr *txReadBuffer) IsModified() bool { + return txr.isModified +} + +func (txr *txReadBuffer) ResetModified() { + txr.isModified = false +} func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if b := txr.buckets[string(bucketName)]; b != nil { @@ -94,6 +109,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer { txBuffer: txBuffer{ buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)), }, + isModified: false, } for bucketName, bucket := range txr.txBuffer.buckets { txrCopy.txBuffer.buckets[bucketName] = bucket.Copy() @@ -114,7 +130,7 @@ type bucketBuffer struct { } func newBucketBuffer() *bucketBuffer { - return &bucketBuffer{buf: make([]kv, 512), used: 0} + return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0} } func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {