Skip to content

Commit

Permalink
Merge pull request #12962 from ptabor/20210513-write-conf-state
Browse files Browse the repository at this point in the history
Save raftpb.ConfState in the backend.
  • Loading branch information
ptabor authored May 13, 2021
2 parents e2d67f2 + 6ce7f74 commit e44fb40
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 64 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- [Embed Etcd does not override global/grpc logger](https://github.com/etcd-io/etcd/pull/12861) be default any longer. If desired, please call `embed.Config::SetupGlobalLoggers()` explicitly.
- Client errors of `context cancelled` or `context deadline exceeded` are exposed as `codes.Canceled` and `codes.DeadlineExceeded`, instead of `codes.Unknown`.

### Storage format changes
- [WAL log's snapshots persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12735)
- [Backend persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12962) in the `meta` bucket `confState` key.
- Backend persists downgrade in the `cluster` bucket

### Security

- Add [`TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256` and `TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256` to `etcd --cipher-suites`](https://github.com/etcd-io/etcd/pull/11864).
Expand Down
2 changes: 2 additions & 0 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
return semver.Must(semver.NewVersion(*e.Node.Value))
}

// The field is populated since etcd v3.5.
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
ckey := backendClusterVersionKey()
tx := be.ReadTx()
Expand All @@ -712,6 +713,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
return semver.Must(semver.NewVersion(string(vals[0])))
}

// The field is populated since etcd v3.5.
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
tx := be.ReadTx()
Expand Down
63 changes: 63 additions & 0 deletions server/etcdserver/api/membership/confstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package membership

import (
"encoding/json"
"log"

"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
)

var (
confStateKey = []byte("confState")
)

// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
// confState in backend is persisted since etcd v3.5.
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
confStateBytes, err := json.Marshal(confState)
if err != nil {
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}

tx.UnsafePut(mvcc.MetaBucketName, confStateKey, confStateBytes)
}

// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(mvcc.MetaBucketName, confStateKey, nil, 0)
if len(keys) == 0 {
return nil
}

if len(keys) != 1 {
lg.Panic(
"unexpected number of key: "+string(confStateKey)+" when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
var confState raftpb.ConfState
if err := json.Unmarshal(vals[0], &confState); err != nil {
log.Panic("Cannot unmarshal confState json retrieved from the backend",
zap.ByteString("conf-state-json", vals[0]),
zap.Error(err))
}
return &confState
}
79 changes: 79 additions & 0 deletions server/etcdserver/api/membership/confstate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package membership_test

import (
"testing"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap/zaptest"
)

func TestConfStateFromBackendInOneTx(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))

confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)

assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
}

func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

{
tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.Commit()
}

t.Run("missing", func(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
})

confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}

t.Run("save", func(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
tx.Unlock()
tx.Commit()
})

t.Run("read", func(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
})
}
2 changes: 2 additions & 0 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
return nil
}

// The field is populated since etcd v3.5.
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()

Expand All @@ -170,6 +171,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
}

// The field is populated since etcd v3.5.
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
dkey := backendDowngradeKey()
dvalue, err := json.Marshal(downgrade)
Expand Down
24 changes: 23 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ type EtcdServer struct {
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
beHooks backend.Hooks
beHooks *backendHooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore

Expand Down Expand Up @@ -298,10 +298,31 @@ type EtcdServer struct {
type backendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger

// confState to be written in the next submitted backend transaction (if dirty)
confState raftpb.ConfState
// first write changes it to 'dirty'. false by default, so
// not initialized `confState` is meaningless.
confStateDirty bool
confStateLock sync.Mutex
}

func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
}

func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
bh.confState = *confState
bh.confStateDirty = true
}

// NewServer creates a new EtcdServer from the supplied configuration. The
Expand Down Expand Up @@ -2238,6 +2259,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con

lg := s.Logger()
*confState = *s.r.ApplyConfChange(cc)
s.beHooks.SetConfState(confState)
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
confChangeContext := new(membership.ConfigChangeContext)
Expand Down
Loading

0 comments on commit e44fb40

Please sign in to comment.