Skip to content

Commit

Permalink
k8s store: patch pod annotations
Browse files Browse the repository at this point in the history
Use k8s client mpatch method to update pod annotations (in some versions
replacing the whole pod object will return an error)
  • Loading branch information
sandhose authored and sgotti committed Dec 11, 2019
1 parent 59e4880 commit de8eddd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 51 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/docker/libkv v0.2.1
github.com/emicklei/go-restful v2.5.0+incompatible // indirect
github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/go-openapi/jsonpointer v0.0.0-20170102174223-779f45308c19 // indirect
github.com/go-openapi/jsonreference v0.0.0-20161105162150-36d33bfe519e // indirect
github.com/go-openapi/spec v0.0.0-20180131233152-f3499b5df538 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/docker/leadership v0.1.0/go.mod h1:6yL2hg00l43fYEJagcF7eIS4PootU7TAO1
github.com/docker/libkv v0.2.1/go.mod h1:r5hEwHwW8dr0TFBYGCarMNbrQOiwL1xoqDYZ/JqoTK0=
github.com/emicklei/go-restful v2.5.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful-swagger12 v0.0.0-20170208215640-dcef7f557305/go.mod h1:qr0VowGBT4CS4Q8vFF8BSeKz34PuqKGxs/L0IAQA9DQ=
github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M=
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down
93 changes: 42 additions & 51 deletions internal/store/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"github.com/sorintlab/stolon/internal/cluster"
"github.com/sorintlab/stolon/internal/util"

jsonpatch "github.com/evanphx/json-patch"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -72,6 +74,43 @@ func (s *KubeStore) labelSelector(componentLabel ComponentLabelValue) labels.Sel
return labels.SelectorFromSet(selector)
}

func (s *KubeStore) patchKubeStatusAnnotation(annotationData []byte) error {
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}

oldPodJSON, err := json.Marshal(pod)
if err != nil {
return fmt.Errorf("failed to marshal pod: %v", err)
}

if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.KubeStatusAnnnotation] = string(annotationData)

newPodJSON, err := json.Marshal(pod)
if err != nil {
return fmt.Errorf("failed to marshal pod: %v", err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldPodJSON, newPodJSON)
if err != nil {
return fmt.Errorf("failed to create pod merge patch: %v", err)
}

_, err = podsClient.Patch(s.podName, types.MergePatchType, patchBytes)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
}

func (s *KubeStore) AtomicPutClusterData(ctx context.Context, cd *cluster.ClusterData, previous *KVPair) (*KVPair, error) {
cdj, err := json.Marshal(cd)
if err != nil {
Expand Down Expand Up @@ -210,23 +249,7 @@ func (s *KubeStore) SetKeeperInfo(ctx context.Context, id string, ms *cluster.Ke
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(msj)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(msj)
}

func (s *KubeStore) GetKeepersInfo(ctx context.Context) (cluster.KeepersInfo, error) {
Expand Down Expand Up @@ -261,23 +284,7 @@ func (s *KubeStore) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInf
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(sij)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(sij)
}

func (s *KubeStore) GetSentinelsInfo(ctx context.Context) (cluster.SentinelsInfo, error) {
Expand Down Expand Up @@ -312,23 +319,7 @@ func (s *KubeStore) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl
if err != nil {
return err
}
podsClient := s.client.CoreV1().Pods(s.namespace)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
result, err := podsClient.Get(s.podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of pod: %v", err)
}
if result.Annotations == nil {
result.Annotations = map[string]string{}
}
result.Annotations[util.KubeStatusAnnnotation] = string(pij)
_, err = podsClient.Update(result)
return err
})
if retryErr != nil {
return fmt.Errorf("update failed: %v", retryErr)
}
return nil
return s.patchKubeStatusAnnotation(pij)
}

func (s *KubeStore) GetProxiesInfo(ctx context.Context) (cluster.ProxiesInfo, error) {
Expand Down

0 comments on commit de8eddd

Please sign in to comment.