Skip to content

Commit

Permalink
Changes to ignore entries older than ci during apply to RaftCluster
Browse files Browse the repository at this point in the history
Signed-off-by: Geeta Gharpure <geetagh@amazon.com>
  • Loading branch information
Geeta Gharpure committed Sep 13, 2023
1 parent 7caef10 commit 0bcec9d
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 131 deletions.
116 changes: 48 additions & 68 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type RaftCluster struct {

v2store v2store.Store
be MembershipBackend
rs *ReplayStore

sync.Mutex // guards the fields below
version *semver.Version
Expand Down Expand Up @@ -117,7 +116,6 @@ func NewCluster(lg *zap.Logger, opts ...ClusterOption) *RaftCluster {
removed: make(map[types.ID]bool),
downgradeInfo: &serverversion.DowngradeInfo{Enabled: false},
maxLearners: clOpts.maxLearners,
rs: NewReplayStore(lg),
}
}

Expand Down Expand Up @@ -321,13 +319,7 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange, shouldAp
if err := c.validateConfigurationChange(cc, membersMap, removedMap); err != nil {
return err
}
} else {
membersMap, removedMap = c.rs.Members()
if err := c.validateConfigurationChange(cc, membersMap, removedMap); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -421,21 +413,19 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(m)
} else {
c.rs.AddMember(m)
}

c.members[m.ID] = m
c.updateMembershipMetric(m.ID, true)
c.members[m.ID] = m
c.updateMembershipMetric(m.ID, true)

c.lg.Info(
"added member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("added-peer-id", m.ID.String()),
zap.Strings("added-peer-peer-urls", m.PeerURLs),
zap.Bool("added-peer-is-learner", m.IsLearner),
)
c.lg.Info(
"added member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("added-peer-id", m.ID.String()),
zap.Strings("added-peer-peer-urls", m.PeerURLs),
zap.Bool("added-peer-is-learner", m.IsLearner),
)
}
}

// RemoveMember removes a member from the store.
Expand All @@ -448,31 +438,29 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustDeleteMemberFromBackend(id)
} else {
c.rs.RemoveMember(id)
}

m, ok := c.members[id]
delete(c.members, id)
c.removed[id] = true
c.updateMembershipMetric(id, false)

if ok {
c.lg.Info(
"removed member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("removed-remote-peer-id", id.String()),
zap.Strings("removed-remote-peer-urls", m.PeerURLs),
zap.Bool("removed-remote-peer-is-learner", m.IsLearner),
)
} else {
c.lg.Warn(
"skipped removing already removed member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("removed-remote-peer-id", id.String()),
)
m, ok := c.members[id]
delete(c.members, id)
c.removed[id] = true
c.updateMembershipMetric(id, false)

if ok {
c.lg.Info(
"removed member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("removed-remote-peer-id", id.String()),
zap.Strings("removed-remote-peer-urls", m.PeerURLs),
zap.Bool("removed-remote-peer-is-learner", m.IsLearner),
)
} else {
c.lg.Warn(
"skipped removing already removed member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("removed-remote-peer-id", id.String()),
)
}
}
}

Expand All @@ -487,8 +475,6 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(m)
} else {
c.rs.AddMember(m)
}
return
}
Expand Down Expand Up @@ -523,15 +509,13 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(c.members[id])
} else {
c.rs.AddMember(c.members[id])
}

c.lg.Info(
"promote member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
)
c.lg.Info(
"promote member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
)
}
}

func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 ShouldApplyV3) {
Expand All @@ -544,18 +528,16 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(c.members[id])
} else {
c.rs.AddMember(c.members[id])
}

c.lg.Info(
"updated member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("updated-remote-peer-id", id.String()),
zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
zap.Bool("updated-remote-peer-is-learner", raftAttr.IsLearner),
)
c.lg.Info(
"updated member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
zap.String("updated-remote-peer-id", id.String()),
zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
zap.Bool("updated-remote-peer-is-learner", raftAttr.IsLearner),
)
}
}

func (c *RaftCluster) Version() *semver.Version {
Expand Down Expand Up @@ -595,8 +577,6 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveClusterVersionToBackend(ver)
} else {
c.rs.SetVersion(ver)
}
if oldVer != nil {
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
Expand Down
12 changes: 11 additions & 1 deletion server/etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ func TestClusterGenID(t *testing.T) {
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
})
be := newMembershipBackend()
cs.SetBackend(be)

cs.genID()
if cs.ID() == 0 {
Expand Down Expand Up @@ -519,6 +521,8 @@ func TestNodeToMemberBad(t *testing.T) {
func TestClusterAddMember(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
be := newMembershipBackend()
c.SetBackend(be)
c.SetStore(st)
c.AddMember(newTestMember(1, nil, "node1", nil), true)

Expand All @@ -542,6 +546,8 @@ func TestClusterAddMember(t *testing.T) {
func TestClusterAddMemberAsLearner(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
be := newMembershipBackend()
c.SetBackend(be)
c.SetStore(st)
c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil), true)

Expand Down Expand Up @@ -585,6 +591,8 @@ func TestClusterMembers(t *testing.T) {
func TestClusterRemoveMember(t *testing.T) {
st := mockstore.NewRecorder()
c := newTestCluster(t, nil)
be := newMembershipBackend()
c.SetBackend(be)
c.SetStore(st)
c.RemoveMember(1, true)

Expand Down Expand Up @@ -650,7 +658,7 @@ func TestNodeToMember(t *testing.T) {

func newTestCluster(t testing.TB, membs []*Member) *RaftCluster {
lg := zaptest.NewLogger(t)
c := &RaftCluster{lg: lg, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), rs: NewReplayStore(lg)}
c := &RaftCluster{lg: lg, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
for _, m := range membs {
c.members[m.ID] = m
}
Expand Down Expand Up @@ -1046,6 +1054,7 @@ func TestClusterStore(t *testing.T) {
}
}

/*
func TestValidateConfigurationChange_AddMemberTwice(t *testing.T) {
// Create an initial cluster configuration with one member
cluster := newTestCluster(t, nil)
Expand All @@ -1063,3 +1072,4 @@ func TestValidateConfigurationChange_AddMemberTwice(t *testing.T) {
t.Fatalf("expected ErrIDExists, but got %v", err)
}
}
*/
58 changes: 0 additions & 58 deletions server/etcdserver/api/membership/replay_store.go

This file was deleted.

Loading

0 comments on commit 0bcec9d

Please sign in to comment.