diff --git a/server/config/config.go b/server/config/config.go index 4c82cfae2e6..575dd96d4ce 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -177,6 +177,10 @@ type ServerConfig struct { // Currently all etcd memory gets mlocked, but in future the flag can // be refined to mlock in-use area of bbolt only. ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"` + + // ExperimentalTxnModeWriteWithSharedBuffer enable write transaction to use + // a shared buffer in its readonly check operations. + ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"` } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/server/embed/config.go b/server/embed/config.go index 0dbaaa6b2e2..83ff1921055 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -355,6 +355,9 @@ type Config struct { // Currently all etcd memory gets mlocked, but in future the flag can // be refined to mlock in-use area of bbolt only. ExperimentalMemoryMlock bool `json:"experimental-memory-mlock"` + + // ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations. + ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"` } // configYAML holds the config suitable for yaml parsing @@ -444,8 +447,9 @@ func NewConfig() *Config { LogLevel: logutil.DefaultLogLevel, EnableGRPCGateway: true, - ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime, - ExperimentalMemoryMlock: false, + ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime, + ExperimentalMemoryMlock: false, + ExperimentalTxnModeWriteWithSharedBuffer: true, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/server/embed/etcd.go b/server/embed/etcd.go index c53450cd8b1..464a253a4c2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -163,56 +163,57 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) srvcfg := config.ServerConfig{ - Name: cfg.Name, - ClientURLs: cfg.ACUrls, - PeerURLs: cfg.APUrls, - DataDir: cfg.Dir, - DedicatedWALDir: cfg.WalDir, - SnapshotCount: cfg.SnapshotCount, - SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, - MaxSnapFiles: cfg.MaxSnapFiles, - MaxWALFiles: cfg.MaxWalFiles, - InitialPeerURLsMap: urlsmap, - InitialClusterToken: token, - DiscoveryURL: cfg.Durl, - DiscoveryProxy: cfg.Dproxy, - NewCluster: cfg.IsNewCluster(), - PeerTLSInfo: cfg.PeerTLSInfo, - TickMs: cfg.TickMs, - ElectionTicks: cfg.ElectionTicks(), - InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, - AutoCompactionRetention: autoCompactionRetention, - AutoCompactionMode: cfg.AutoCompactionMode, - QuotaBackendBytes: cfg.QuotaBackendBytes, - BackendBatchLimit: cfg.BackendBatchLimit, - BackendFreelistType: backendFreelistType, - BackendBatchInterval: cfg.BackendBatchInterval, - MaxTxnOps: cfg.MaxTxnOps, - MaxRequestBytes: cfg.MaxRequestBytes, - SocketOpts: cfg.SocketOpts, - StrictReconfigCheck: cfg.StrictReconfigCheck, - ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, - AuthToken: cfg.AuthToken, - BcryptCost: cfg.BcryptCost, - TokenTTL: cfg.AuthTokenTTL, - CORS: cfg.CORS, - HostWhitelist: cfg.HostWhitelist, - InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, - CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, - PreVote: cfg.PreVote, - Logger: cfg.logger, - LoggerConfig: cfg.loggerConfig, - LoggerCore: cfg.loggerCore, - LoggerWriteSyncer: cfg.loggerWriteSyncer, - ForceNewCluster: cfg.ForceNewCluster, - EnableGRPCGateway: cfg.EnableGRPCGateway, - UnsafeNoFsync: cfg.UnsafeNoFsync, - EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, - CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, - WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, - DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, - WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, - ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock, + Name: cfg.Name, + ClientURLs: cfg.ACUrls, + PeerURLs: cfg.APUrls, + DataDir: cfg.Dir, + DedicatedWALDir: cfg.WalDir, + SnapshotCount: cfg.SnapshotCount, + SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries, + MaxSnapFiles: cfg.MaxSnapFiles, + MaxWALFiles: cfg.MaxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.Durl, + DiscoveryProxy: cfg.Dproxy, + NewCluster: cfg.IsNewCluster(), + PeerTLSInfo: cfg.PeerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.ElectionTicks(), + InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, + AutoCompactionRetention: autoCompactionRetention, + AutoCompactionMode: cfg.AutoCompactionMode, + QuotaBackendBytes: cfg.QuotaBackendBytes, + BackendBatchLimit: cfg.BackendBatchLimit, + BackendFreelistType: backendFreelistType, + BackendBatchInterval: cfg.BackendBatchInterval, + MaxTxnOps: cfg.MaxTxnOps, + MaxRequestBytes: cfg.MaxRequestBytes, + SocketOpts: cfg.SocketOpts, + StrictReconfigCheck: cfg.StrictReconfigCheck, + ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, + AuthToken: cfg.AuthToken, + BcryptCost: cfg.BcryptCost, + TokenTTL: cfg.AuthTokenTTL, + CORS: cfg.CORS, + HostWhitelist: cfg.HostWhitelist, + InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, + CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + PreVote: cfg.PreVote, + Logger: cfg.logger, + LoggerConfig: cfg.loggerConfig, + LoggerCore: cfg.loggerCore, + LoggerWriteSyncer: cfg.loggerWriteSyncer, + ForceNewCluster: cfg.ForceNewCluster, + EnableGRPCGateway: cfg.EnableGRPCGateway, + UnsafeNoFsync: cfg.UnsafeNoFsync, + EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, + WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, + DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, + WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, + ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock, + ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer, } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 3a6cee0362a..64ed7f1cbe7 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -267,6 +267,7 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.") + fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index aaad8aea838..726c18cab68 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -220,6 +220,8 @@ Experimental feature: Duration of periodical watch progress notification. --experimental-warning-apply-duration '100ms' Warning is generated if requests take more than this duration. + --experimental-txn-mode-write-with-shared-buffer 'true' + Enable the write transaction to use a shared buffer in its readonly check operations. Unsafe feature: --force-new-cluster 'false' diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 74a2575074c..cf281d1a9c7 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra resp.Header = &pb.ResponseHeader{} if txn == nil { - txn = a.s.kv.Read(trace) + txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace) defer txn.End() } @@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) + + // When the transaction contains write operations, we use ReadTx instead of + // ConcurrentReadTx to avoid extra overhead of copying buffer. + var txn mvcc.TxnWrite + if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer { + txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace)) + } else { + txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace)) + } var txnPath []bool trace.StepWithFunction( diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da6e..35f108bfc29 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil } func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} } +type ReadTxMode uint32 + +const ( + // Use ConcurrentReadTx and the txReadBuffer is copied + ConcurrentReadTxMode = ReadTxMode(1) + // Use backend ReadTx and txReadBuffer is not copied + SharedBufReadTxMode = ReadTxMode(2) +) + type KV interface { ReadView WriteView // Read creates a read transaction. - Read(trace *traceutil.Trace) TxnRead + Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index 19d6539d833..51f688db7d5 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -50,7 +50,7 @@ var ( return kv.Range(context.TODO(), key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - txn := kv.Read(traceutil.TODO()) + txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer txn.End() return txn.Range(context.TODO(), key, end, ro) } diff --git a/server/mvcc/kv_view.go b/server/mvcc/kv_view.go index 29464c50eab..56260e7599a 100644 --- a/server/mvcc/kv_view.go +++ b/server/mvcc/kv_view.go @@ -24,19 +24,19 @@ import ( type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.Rev() } func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.Range(ctx, key, end, ro) } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 1bb3fae24e3..f6f6313f868 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) // readTx simulates a long read request - readTx1 := s.Read(traceutil.TODO()) + readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO()) // write should not be blocked by reads done := make(chan struct{}, 1) @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } // readTx2 simulates a short read request - readTx2 := s.Read(traceutil.TODO()) + readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO()) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro) if err != nil { @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu.Lock() wKVs := make(kvs, len(committedKVs)) copy(wKVs, committedKVs) - tx := s.Read(traceutil.TODO()) + tx := s.Read(ConcurrentReadTxMode, traceutil.TODO()) mu.Unlock() // get all keys in backend store, and compare with wKVs ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index aaa93d9ab94..287d2a944b7 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -34,12 +34,20 @@ type storeTxnRead struct { trace *traceutil.Trace } -func (s *store) Read(trace *traceutil.Trace) TxnRead { +func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead { s.mu.RLock() s.revMu.RLock() - // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After - // ConcurrentReadTx is created, it will not block write transaction. - tx := s.b.ConcurrentReadTx() + // For read-only workloads, we use shared buffer by copying transaction read buffer + // for higher concurrency with ongoing blocking writes. + // For write/write-read transactions, we use the shared buffer + // rather than duplicating transaction read buffer to avoid transaction overhead. + var tx backend.ReadTx + if mode == ConcurrentReadTxMode { + tx = s.b.ConcurrentReadTx() + } else { + tx = s.b.ReadTx() + } + tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock()