Skip to content

Commit

Permalink
server: applier uses ReadTx instead of ConcurrentTx
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonwang371 committed Apr 28, 2021
1 parent 9a3aff6 commit 8d8d037
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 12 deletions.
12 changes: 10 additions & 2 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read(trace)
txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End()
}

Expand Down Expand Up @@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))

// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite
if isWrite {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
} else {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
}

var txnPath []bool
trace.StepWithFunction(
Expand Down
11 changes: 10 additions & 1 deletion server/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }

func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }

type ReadTxMode uint32

const (
// Use ConcurrentReadTx and the txReadBuffer is copied
ConcurrentReadTxMode = ReadTxMode(1)
// Use backend ReadTx and txReadBuffer is not copied
SharedBufReadTxMode = ReadTxMode(2)
)

type KV interface {
ReadView
WriteView

// Read creates a read transaction.
Read(trace *traceutil.Trace) TxnRead
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead

// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
return kv.Range(context.TODO(), key, end, ro)
}
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
txn := kv.Read(traceutil.TODO())
txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer txn.End()
return txn.Range(context.TODO(), key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kv_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
type readView struct{ kv KV }

func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.FirstRev()
}

func (rv *readView) Rev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Rev()
}

func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Range(ctx, key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

// readTx simulates a long read request
readTx1 := s.Read(traceutil.TODO())
readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())

// write should not be blocked by reads
done := make(chan struct{}, 1)
Expand All @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
}

// readTx2 simulates a short read request
readTx2 := s.Read(traceutil.TODO())
readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
if err != nil {
Expand Down Expand Up @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
mu.Lock()
wKVs := make(kvs, len(committedKVs))
copy(wKVs, committedKVs)
tx := s.Read(traceutil.TODO())
tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
mu.Unlock()
// get all keys in backend store, and compare with wKVs
ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
Expand Down
10 changes: 8 additions & 2 deletions server/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ type storeTxnRead struct {
trace *traceutil.Trace
}

func (s *store) Read(trace *traceutil.Trace) TxnRead {
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
var tx backend.ReadTx
s.mu.RLock()
s.revMu.RLock()
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
// ConcurrentReadTx is created, it will not block write transaction.
tx := s.b.ConcurrentReadTx()
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}

tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
Expand Down

0 comments on commit 8d8d037

Please sign in to comment.