Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Removed SecretChecksumAnnotation from cluster secret annotations
Browse files Browse the repository at this point in the history
This commit removes SecretChecksumAnnotation annotation. In the current
implementation this annotation is a must-have item that ensures that the
cluster was provisioned correctly and notifies clusterclientstore on
secret changes.

In this commit we remove this annotation and re-compute cluster secret
checksum every time syncSecret/1 is being invoked. This approach ensures
no preliminary config is needed to get a cluster object to an operating
state. Every time a cluster secret changes, the cache invalidates the
stored cluster object and re-creates it.

Signed-off-by: Oleg Sidorov <oleg.sidorov@booking.com>
  • Loading branch information
Oleg Sidorov authored and juliogreff committed Mar 12, 2020
1 parent 0b1a2f5 commit 8f75e3f
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 72 deletions.
9 changes: 0 additions & 9 deletions cmd/shipperctl/configurator/cluster.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package configurator

import (
"encoding/hex"
"fmt"
"hash/crc32"
"time"

homedir "github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -185,17 +183,10 @@ func (c *Cluster) FetchCluster(clusterName string) (*shipper.Cluster, error) {
}

func (c *Cluster) CopySecret(cluster *shipper.Cluster, newNamespace string, secret *corev1.Secret) error {
hash := crc32.NewIEEE()
hash.Write(secret.Data["ca.crt"])
hash.Write(secret.Data["token"])

newSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: newNamespace,
Annotations: map[string]string{
shipper.SecretChecksumAnnotation: hex.EncodeToString(hash.Sum(nil)),
},
OwnerReferences: []metav1.OwnerReference{
metav1.OwnerReference{
APIVersion: "shipper.booking.com/v1",
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/shipper/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
ReleaseTemplateIterationAnnotation = "shipper.booking.com/release.template.iteration"
ReleaseClustersAnnotation = "shipper.booking.com/release.clusters"

SecretChecksumAnnotation = "shipper.booking.com/cluster-secret.checksum"
SecretClusterSkipTlsVerifyAnnotation = "shipper.booking.com/cluster-secret.insecure-tls-skip-verify"

RolloutBlocksOverrideAnnotation = "shipper.booking.com/rollout-block.override"
Expand Down
2 changes: 2 additions & 0 deletions pkg/clusterclientstore/cache/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type server struct {
ch ch
}

var _ CacheServer = (*server)(nil)

type ch struct {
stop chan struct{}

Expand Down
44 changes: 16 additions & 28 deletions pkg/clusterclientstore/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,26 @@ func (s *Store) secretWorker() {

func (s *Store) bindEventHandlers() {
enqueueSecret := func(obj interface{}) { enqueueWorkItem(s.secretWorkqueue, obj) }
s.secretInformer.Informer().AddEventHandler(kubecache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
s.secretInformer.Informer().AddEventHandler(kubecache.ResourceEventHandlerFuncs{
AddFunc: enqueueSecret,
UpdateFunc: func(_, newObj interface{}) {
enqueueSecret(newObj)
},
DeleteFunc: func(obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
return false
}
// This is a bit aggressive, but I think it makes sense; otherwise we get
// logs about the service account token.
_, ok = secret.GetAnnotations()[shipper.SecretChecksumAnnotation]
return ok
},
Handler: kubecache.ResourceEventHandlerFuncs{
AddFunc: enqueueSecret,
UpdateFunc: func(_, newObj interface{}) {
enqueueSecret(newObj)
},
DeleteFunc: func(obj interface{}) {
secret, ok := obj.(*corev1.Secret)
tombstone, ok := obj.(kubecache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(kubecache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
secret, ok = tombstone.Obj.(*corev1.Secret)
if !ok {
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a Secret %#v", obj))
return
}
runtime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
enqueueSecret(secret)
},
secret, ok = tombstone.Obj.(*corev1.Secret)
if !ok {
runtime.HandleError(fmt.Errorf("tombstone contained object that is not a Secret %#v", obj))
return
}
}
enqueueSecret(secret)
},
})

Expand Down
46 changes: 23 additions & 23 deletions pkg/clusterclientstore/store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package clusterclientstore

import (
"encoding/hex"
"fmt"
"hash/crc32"
"strconv"
"time"

Expand Down Expand Up @@ -231,23 +233,17 @@ func (s *Store) syncSecret(key string) error {
WithShipperKind("Cluster")
}

checksum, ok := secret.GetAnnotations()[shipper.SecretChecksumAnnotation]
if !ok {
err := fmt.Errorf("secret %q looks like a cluster secret but doesn't have a checksum", key)
return shippererrors.NewUnrecoverableError(err)
}

cachedCluster, ok := s.cache.Fetch(secret.Name)
if ok {
existingChecksum, err := cachedCluster.GetChecksum()
// We don't want to regenerate the client if we already have one with the
// right properties (host or secret checksum) that's either ready (err == nil)
// or in the process of getting ready. Otherwise we'll refill the cache
// needlessly, or could even end up in a livelock where waiting for informer
// cache to fill takes longer than the resync period, and resync resets the
// informer.
if cachedCluster, ok := s.cache.Fetch(secret.Name); ok {
secretChecksum := computeSecretChecksum(secret)
clusterChecksum, err := cachedCluster.GetChecksum()
if err == nil || shippererrors.IsClusterNotReadyError(err) {
if existingChecksum == checksum {
// We don't want to regenerate the client if we already have one with the
// right properties (host or secret checksum) that's either ready (err == nil)
// or in the process of getting ready. Otherwise we'll refill the cache
// needlessly, or could even end up in a livelock where waiting for informer
// cache to fill takes longer than the resync period, and resync resets the
// informer.
if secretChecksum == clusterChecksum {
klog.Infof("Secret %q syncing but we already have a client based on the same checksum in the cache", key)
return nil
}
Expand All @@ -258,13 +254,6 @@ func (s *Store) syncSecret(key string) error {
}

func (s *Store) create(cluster *shipper.Cluster, secret *corev1.Secret) error {
checksum, ok := secret.GetAnnotations()[shipper.SecretChecksumAnnotation]
// Programmer error: this is filtered for at the informer level.
if !ok {
return shippererrors.NewUnrecoverableError(fmt.Errorf(
"secret %q looks like a cluster secret but doesn't have a checksum", secret.Name))
}

config, err := buildConfig(cluster.Spec.APIMaster, secret, s.restTimeout)
if err != nil {
return shippererrors.NewClusterClientBuild(cluster.Name, err)
Expand All @@ -291,6 +280,7 @@ func (s *Store) create(cluster *shipper.Cluster, secret *corev1.Secret) error {
}

clusterName := cluster.Name
checksum := computeSecretChecksum(secret)
newCachedCluster := cache.NewCluster(
clusterName,
checksum,
Expand Down Expand Up @@ -363,3 +353,13 @@ func buildConfig(host string, secret *corev1.Secret, restTimeout *time.Duration)

return config, nil
}

func computeSecretChecksum(secret *corev1.Secret) string {
hash := crc32.NewIEEE()
for k, v := range secret.Data {
hash.Write([]byte(k))
hash.Write(v)
}
sum := hex.EncodeToString(hash.Sum(nil))
return sum
}
81 changes: 70 additions & 11 deletions pkg/clusterclientstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func TestClientCreation(t *testing.T) {
t.Errorf("expected exactly %d clusters, found %d", len(clusterList), s.cache.Count())
}
})

}

func TestNoClientGeneration(t *testing.T) {
Expand Down Expand Up @@ -193,7 +192,34 @@ func TestInvalidClientCredentials(t *testing.T) {
f := newFixture(t)

f.addCluster(testClusterName)
f.addSecret(newSecret(testClusterName, []byte("crt"), []byte("key"), []byte("checksum")))
// This secret key is invalid and therefore the cache won't preserve the
// cluster
f.addSecret(newSecret(testClusterName, []byte("crt"), []byte("key")))

store := f.run()

wait.PollUntil(
10*time.Millisecond,
func() (bool, error) { return true, nil },
stopAfter(3*time.Second),
)

expErr := fmt.Errorf(
"cannot create client for cluster %q: tls: failed to find any PEM data in certificate input",
testClusterName)
err := store.syncCluster(testClusterName)
if !errEqual(err, expErr) {
t.Fatalf("unexpected error returned by `store.SyncCluster/0`: got: %s, want: %s",
err, expErr)
}
}

func TestReCacheClusterOnSecretUpdate(t *testing.T) {
f := newFixture(t)

f.addCluster(testClusterName)

f.addSecret(newSecret(testClusterName, []byte("crt"), []byte("key")))

store := f.run()

Expand All @@ -203,9 +229,38 @@ func TestInvalidClientCredentials(t *testing.T) {
stopAfter(3*time.Second),
)

_, err := store.GetConfig(testClusterName)
if !shippererrors.IsClusterNotInStoreError(err) {
t.Errorf("expected NoSuchCluster for cluster called %q for invalid client credentials; instead got %v", testClusterName, err)
_, ok := store.cache.Fetch(testClusterName)
if ok {
t.Fatalf("did not expect to fetch a cluster from the cache")
}

secret, err := store.secretInformer.Lister().Secrets(store.ns).Get(testClusterName)
if err != nil {
t.Fatalf("failed to fetch secret from lister: %s", err)
}
validSecret := newValidSecret(testClusterName)
secret.Data = validSecret.Data

client := f.kubeClient
_, err = client.CoreV1().Secrets(store.ns).Update(secret)
if err != nil {
t.Fatalf("failed to update secret: %s", err)
}

key := fmt.Sprintf("%s/%s", store.ns, testClusterName)
if err := store.syncSecret(key); err != nil {
t.Fatalf("unexpected error returned by `store.syncSecret/1`: %s", err)
}

cluster, ok := store.cache.Fetch(testClusterName)
if !ok {
t.Fatalf("expected to fetch a cluster from the cache")
}

secretChecksum := computeSecretChecksum(secret)
if clusterChecksum, _ := cluster.GetChecksum(); clusterChecksum != secretChecksum {
t.Fatalf("inconsistent cluster checksum: got: %s, want: %s",
clusterChecksum, secretChecksum)
}
}

Expand Down Expand Up @@ -309,21 +364,18 @@ func (f *fixture) addCluster(name string) {
}

func newValidSecret(name string) *corev1.Secret {
crt, key, checksum, err := tlsPair.GetAll()
crt, key, _, err := tlsPair.GetAll()
if err != nil {
panic(fmt.Sprintf("could not read test TLS data from paths: %v: %v", tlsPair, err))
}
return newSecret(name, crt, key, checksum)
return newSecret(name, crt, key)
}

func newSecret(name string, crt, key, checksum []byte) *corev1.Secret {
func newSecret(name string, crt, key []byte) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: shipper.ShipperNamespace,
Annotations: map[string]string{
shipper.SecretChecksumAnnotation: string(checksum),
},
},
Data: map[string][]byte{
corev1.TLSCertKey: crt,
Expand All @@ -341,3 +393,10 @@ func stopAfter(t time.Duration) <-chan struct{} {
}()
return stopCh
}

func errEqual(e1, e2 error) bool {
if e1 == nil || e2 == nil {
return e1 == e2
}
return e1.Error() == e2.Error()
}

0 comments on commit 8f75e3f

Please sign in to comment.