Skip to content

Commit

Permalink
ETCD-706: Skip Backup Pods for Etcd Learning Members
Browse files Browse the repository at this point in the history
  • Loading branch information
Elbehery committed Dec 15, 2024
1 parent 011991c commit 0e4093a
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 5 deletions.
76 changes: 76 additions & 0 deletions pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"strings"
"time"

"github.com/openshift/cluster-etcd-operator/pkg/etcdcli"
"github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"

Expand Down Expand Up @@ -61,6 +65,7 @@ const (
nodeNameEnvVar = "NODE_NAME"
nodeNameFieldPath = "spec.nodeName"
masterNodeSelector = "node-role.kubernetes.io/master"
votingNodeSelector = "node-role.kubernetes.io/voting"
backupDSLabelKey = "app"
backupDSLabelValue = "etcd-auto-backup"
)
Expand All @@ -74,6 +79,7 @@ type PeriodicBackupController struct {
backupVarGetter backuphelpers.BackupVar
featureGateAccessor featuregates.FeatureGateAccess
kubeInformers v1helpers.KubeInformersForNamespaces
etcdClient etcdcli.EtcdClient
}

func NewPeriodicBackupController(
Expand All @@ -86,6 +92,7 @@ func NewPeriodicBackupController(
accessor featuregates.FeatureGateAccess,
backupVarGetter backuphelpers.BackupVar,
backupsInformer factory.Informer,
etcdClient etcdcli.EtcdClient,
kubeInformers v1helpers.KubeInformersForNamespaces) factory.Controller {

c := &PeriodicBackupController{
Expand All @@ -96,6 +103,7 @@ func NewPeriodicBackupController(
operatorImagePullSpec: operatorImagePullSpec,
backupVarGetter: backupVarGetter,
featureGateAccessor: accessor,
etcdClient: etcdClient,
kubeInformers: kubeInformers,
}

Expand Down Expand Up @@ -131,6 +139,10 @@ func (c *PeriodicBackupController) sync(ctx context.Context, syncContext factory
if item.Name == defaultBackupCRName {
defaultFound = true

err = ensureVotingNodesLabeled(ctx, c.kubeClient, c.etcdClient)
if err != nil {
return fmt.Errorf("PeriodicBackupController could not label voting master nodes: %w", err)
}
currentEtcdBackupDS, err := c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(ctx, backupServerDaemonSet, v1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("PeriodicBackupController could not retrieve [defaultBackupDeployment]: %w", err)
Expand Down Expand Up @@ -179,6 +191,10 @@ func (c *PeriodicBackupController) sync(ctx context.Context, syncContext factory
}
klog.V(4).Infof("PeriodicBackupController deleted DaemonSet [%v] successfully", backupServerDaemonSet)
}
err = ensureVotingNodesUnLabeled(ctx, c.kubeClient)
if err != nil {
return fmt.Errorf("PeriodicBackupController could not unlabel voting master nodes: %w", err)
}
} else {
terminationReasons, err := checkBackupServerPodsStatus(c.podLister)
if err != nil {
Expand Down Expand Up @@ -511,3 +527,63 @@ func etcdBackupServerDSDiffers(l, r appv1.DaemonSetSpec) bool {

return false
}

func ensureVotingNodesLabeled(ctx context.Context, client kubernetes.Interface, etcdClient etcdcli.EtcdClient) error {
members, err := etcdClient.VotingMemberList(ctx)
if err != nil {
return fmt.Errorf("failed to list voting members: %w", err)
}

votingMemberIPs := sets.NewString()
for _, m := range members {
memberIP, mErr := ceohelpers.MemberToNodeInternalIP(m)
if mErr != nil {
return mErr
}
votingMemberIPs.Insert(memberIP)
}

masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{
LabelSelector: labels.Set{masterNodeSelector: ""}.String(),
})
if err != nil {
return fmt.Errorf("failed to list master nodes: %w", err)
}

for _, node := range masterNodes.Items {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if votingMemberIPs.Has(addr.Address) {
// update node's labels
node.Labels[votingNodeSelector] = ""
_, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update master node [%v] with label [%v]", node, votingNodeSelector)
}
}
}
}
}

return nil
}

func ensureVotingNodesUnLabeled(ctx context.Context, client kubernetes.Interface) error {
masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{
LabelSelector: labels.Set{masterNodeSelector: ""}.String(),
})
if err != nil {
return fmt.Errorf("failed to list master nodes: %w", err)
}

for _, node := range masterNodes.Items {
delete(node.Labels, votingNodeSelector)

_, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update master node [%v] with deleting label [%v]", node, votingNodeSelector)
}
}

return nil
}
155 changes: 150 additions & 5 deletions pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"testing"
"time"

"github.com/openshift/cluster-etcd-operator/pkg/etcdcli"
"go.etcd.io/etcd/api/v3/etcdserverpb"

testing2 "k8s.io/utils/clock/testing"

"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -93,6 +96,14 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) {
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
Expand All @@ -101,6 +112,7 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) {
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
etcdClient: fakeEtcdClient,
kubeInformers: fakeKubeInformerForNamespace,
}

Expand All @@ -117,7 +129,7 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) {
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), syncCtx)
err = controller.sync(context.TODO(), syncCtx)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())

Expand Down Expand Up @@ -181,6 +193,14 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) {
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
Expand All @@ -189,6 +209,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) {
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
etcdClient: fakeEtcdClient,
kubeInformers: fakeKubeInformerForNamespace,
}

Expand All @@ -205,7 +226,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) {
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), syncCtx)
err = controller.sync(context.TODO(), syncCtx)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())

Expand Down Expand Up @@ -269,6 +290,14 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) {
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
Expand All @@ -277,6 +306,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) {
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
etcdClient: fakeEtcdClient,
kubeInformers: fakeKubeInformerForNamespace,
}

Expand All @@ -293,7 +323,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) {
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), syncCtx)
err = controller.sync(context.TODO(), syncCtx)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())

Expand Down Expand Up @@ -358,6 +388,14 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) {
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
Expand All @@ -366,6 +404,7 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) {
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
etcdClient: fakeEtcdClient,
kubeInformers: fakeKubeInformerForNamespace,
}

Expand All @@ -382,7 +421,7 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) {
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), syncCtx)
err = controller.sync(context.TODO(), syncCtx)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())

Expand Down Expand Up @@ -473,6 +512,14 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) {
backups.Items = append(backups.Items, backup)
operatorFake := fake.NewClientset([]runtime.Object{&backups}...)

defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
Expand All @@ -481,6 +528,7 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) {
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
etcdClient: fakeEtcdClient,
kubeInformers: fakeKubeInformerForNamespace,
}

Expand All @@ -497,7 +545,7 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) {
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), syncCtx)
err = controller.sync(context.TODO(), syncCtx)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())
requireOperatorStatus(t, fakeOperatorClient, false)
Expand Down Expand Up @@ -801,3 +849,100 @@ func extractEtcdBackupServerArgVal(t testing.TB, argName string, args []string)
t.Errorf("expected [etcd-backup-server] arg [%v], but found none", argName)
return ""
}

func TestEnsureVotingNodesLabeled(t *testing.T) {
// arrange
defaultEtcdMembers := []*etcdserverpb.Member{
u.FakeEtcdMemberWithoutServer(0),
u.FakeEtcdMemberWithoutServer(1),
u.FakeEtcdMemberWithoutServer(2),
}
fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0}))
require.NoError(t, err)

allClusterNodes := defaultClusterNodes()
var objects []runtime.Object
for _, n := range allClusterNodes {
objects = append(objects, n)
}
client := k8sfakeclient.NewClientset(objects...)

// act
err = ensureVotingNodesLabeled(context.TODO(), client, fakeEtcdClient)
require.NoError(t, err)

// assert
masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{
LabelSelector: labels.Set{masterNodeSelector: ""}.String(),
})
require.NoError(t, err)
for _, m := range masterNodes.Items {
require.Contains(t, m.Labels, votingNodeSelector)
}
}

func TestEnsureVotingNodesUnLabeled(t *testing.T) {
// arrange
allClusterNodes := defaultClusterNodes()
for _, n := range allClusterNodes {
if _, ok := n.Labels[masterNodeSelector]; ok {
n.Labels[votingNodeSelector] = ""
}
}

var objects []runtime.Object
for _, n := range allClusterNodes {
objects = append(objects, n)
}
client := k8sfakeclient.NewClientset(objects...)

// act
err := ensureVotingNodesUnLabeled(context.TODO(), client)
require.NoError(t, err)

// assert
masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{
LabelSelector: labels.Set{masterNodeSelector: ""}.String(),
})
require.NoError(t, err)
for _, m := range masterNodes.Items {
require.NotContains(t, m.Labels, votingNodeSelector)
}
}

func defaultClusterNodes() []*corev1.Node {
var nodes []*corev1.Node

for i := 1; i <= 6; i++ {
isMaster := false
if i <= 3 {
isMaster = true
}
nodes = append(nodes, createNode(i, isMaster))
}
return nodes
}

func createNode(idx int, isMaster bool) *corev1.Node {
node := &corev1.Node{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("n-%d", idx),
},
}

if isMaster {
node.ObjectMeta.Labels = map[string]string{
masterNodeSelector: "",
}

if node.Status.Addresses == nil {
node.Status.Addresses = []corev1.NodeAddress{}
}
node.Status.Addresses = append(node.Status.Addresses, corev1.NodeAddress{
Type: corev1.NodeInternalIP,
Address: fmt.Sprintf("10.0.0.%d", idx),
})
}

return node
}
Loading

0 comments on commit 0e4093a

Please sign in to comment.