From 6d4b91ccb0f04f34b33cb8181edc21a0fd194a44 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Mon, 10 Feb 2025 22:32:22 +0200 Subject: [PATCH] [k8s] Fix logical race conditions in kubernetes_secrets provider (#6623) * fix: refactor kubernetes_secrets provider to eliminate race conditions * fix: add changelog fragment and unit-tests for kubernetes_secrets provider * fix: replace RWMutex with Mutex * fix: rename newExpirationStore to newExpirationCache * fix: introduce kubernetes_secrets provider name as a const * fix: extend AddConditionally doc string to describe the case of condition is nil * fix: gosec lint --- ...itions-in-kubernetes_secrets-provider.yaml | 32 + .../providers/kubernetessecrets/config.go | 19 +- .../kubernetessecrets/expiration_cache.go | 125 ++ .../kubernetessecrets/kubernetes_secrets.go | 418 +++-- .../kubernetes_secrets_test.go | 1419 +++++++++++------ .../providers/kubernetessecrets/store_test.go | 223 +++ 6 files changed, 1480 insertions(+), 756 deletions(-) create mode 100644 changelog/fragments/1738139927-Fix-logical-race-conditions-in-kubernetes_secrets-provider.yaml create mode 100644 internal/pkg/composable/providers/kubernetessecrets/expiration_cache.go create mode 100644 internal/pkg/composable/providers/kubernetessecrets/store_test.go diff --git a/changelog/fragments/1738139927-Fix-logical-race-conditions-in-kubernetes_secrets-provider.yaml b/changelog/fragments/1738139927-Fix-logical-race-conditions-in-kubernetes_secrets-provider.yaml new file mode 100644 index 00000000000..a73644bf309 --- /dev/null +++ b/changelog/fragments/1738139927-Fix-logical-race-conditions-in-kubernetes_secrets-provider.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix logical race conditions in kubernetes_secrets provider + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6623 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6340 diff --git a/internal/pkg/composable/providers/kubernetessecrets/config.go b/internal/pkg/composable/providers/kubernetessecrets/config.go index 4df48fa5b49..5a92085f360 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/config.go +++ b/internal/pkg/composable/providers/kubernetessecrets/config.go @@ -10,20 +10,23 @@ import ( "github.com/elastic/elastic-agent-autodiscover/kubernetes" ) -// Config for kubernetes provider +// Config for kubernetes_secrets provider type Config struct { KubeConfig string `config:"kube_config"` KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"` - RefreshInterval time.Duration `config:"cache_refresh_interval"` + RefreshInterval time.Duration `config:"cache_refresh_interval" validate:"positive,nonzero"` TTLDelete time.Duration `config:"cache_ttl"` - RequestTimeout time.Duration `config:"cache_request_timeout"` + RequestTimeout time.Duration `config:"cache_request_timeout" validate:"positive,nonzero"` DisableCache bool `config:"cache_disable"` } -func (c *Config) InitDefaults() { - c.RefreshInterval = 60 * time.Second - c.TTLDelete = 1 * time.Hour - c.RequestTimeout = 5 * time.Second - c.DisableCache = false +// defaultConfig returns default configuration for kubernetes_secrets provider +func defaultConfig() *Config { + return &Config{ + RefreshInterval: 60 * time.Second, + TTLDelete: 1 * time.Hour, + RequestTimeout: 5 * time.Second, + DisableCache: false, + } } diff --git a/internal/pkg/composable/providers/kubernetessecrets/expiration_cache.go b/internal/pkg/composable/providers/kubernetessecrets/expiration_cache.go new file mode 100644 index 00000000000..f78298f04ca --- /dev/null +++ b/internal/pkg/composable/providers/kubernetessecrets/expiration_cache.go @@ -0,0 +1,125 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package kubernetessecrets + +import ( + "sync" + "time" +) + +// expirationCache is a store that expires items after time.Now - secret.lastAccess > ttl (if ttl > 0) at Get or List. +// expirationCache works with *cacheEntry, a pointer struct that wraps secret, instead of secret directly because map +// structure in standard go library never removes the buckets from memory even after removing all the elements from it. +// However, since *cacheEntry is a pointer it can be garbage collected when no longer referenced by the GC, such as +// when deleted from the map. More importantly working with a pointer makes the entry in the map bucket, that doesn't +// get deallocated, to utilise only 8 bytes on a 64-bit system. +type expirationCache struct { + sync.Mutex + // ttl is the time-to-live for items in the cache + ttl time.Duration + // items is the underlying cache store. + items map[string]*cacheEntry +} + +type cacheEntry struct { + s secret + lastAccess time.Time +} + +// Get returns the secret associated with the given key from the store if it exists and is not expired. If updateAccess is true +// and the secret exists, essentially the expiration check is skipped and the lastAccess timestamp is updated to time.Now(). +func (c *expirationCache) Get(key string, updateAccess bool) (secret, bool) { + c.Lock() + defer c.Unlock() + + entry, exists := c.items[key] + if !exists { + return secret{}, false + } + if updateAccess { + entry.lastAccess = time.Now() + } else if c.isExpired(entry.lastAccess) { + delete(c.items, key) + return secret{}, false + } + + return entry.s, true +} + +// AddConditionally adds the given secret to the store if the given condition returns true. If there is no existing +// secret, the condition will be called with an empty secret and false. If updateAccess is true and the secret already exists, +// then the lastAccess timestamp is updated to time.Now() independently of the condition result. +// Note: if the given condition is nil, then it is considered as a condition that always returns false. +func (c *expirationCache) AddConditionally(key string, in secret, updateAccess bool, condition conditionFn) { + c.Lock() + defer c.Unlock() + entry, exists := c.items[key] + if !exists { + if condition != nil && condition(secret{}, false) { + c.items[key] = &cacheEntry{in, time.Now()} + } + return + } + + if condition != nil && condition(entry.s, true) { + entry.s = in + entry.lastAccess = time.Now() + } else if updateAccess { + entry.lastAccess = time.Now() + } +} + +// isExpired returns true if the item has expired based on the ttl +func (c *expirationCache) isExpired(lastAccess time.Time) bool { + if c.ttl <= 0 { + // no expiration + return false + } + // we expire if the last access is older than the ttl + return time.Since(lastAccess) > c.ttl +} + +// ListKeys returns a list of all the keys of the secrets in the store without checking for expiration +func (c *expirationCache) ListKeys() []string { + c.Lock() + defer c.Unlock() + + length := len(c.items) + if length == 0 { + return nil + } + list := make([]string, 0, length) + for key := range c.items { + list = append(list, key) + } + return list +} + +// List returns a list of all the secrets in the store that are not expired +func (c *expirationCache) List() []secret { + c.Lock() + defer c.Unlock() + + length := len(c.items) + if length == 0 { + return nil + } + list := make([]secret, 0, length) + for _, entry := range c.items { + if c.isExpired(entry.lastAccess) { + continue + } + list = append(list, entry.s) + } + return list +} + +// newExpirationCache creates and returns an expirationCache +func newExpirationCache(ttl time.Duration) *expirationCache { + return &expirationCache{ + items: make(map[string]*cacheEntry), + ttl: ttl, + } +} diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 6921003cb30..2db9d65813e 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -21,96 +21,186 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) -var _ corecomp.FetchContextProvider = (*contextProviderK8sSecrets)(nil) -var getK8sClientFunc = getK8sClient +var ( + _ corecomp.FetchContextProvider = (*contextProviderK8SSecrets)(nil) + getK8sClientFunc = kubernetes.GetKubernetesClient +) + +const k8sSecretsProviderName = "kubernetes_secrets" //nolint:gosec // G101: False positive on Potential hardcoded credentials func init() { - composable.Providers.MustAddContextProvider("kubernetes_secrets", ContextProviderBuilder) + composable.Providers.MustAddContextProvider(k8sSecretsProviderName, ContextProviderBuilder) } -type contextProviderK8sSecrets struct { - logger *logger.Logger - config *Config - - clientMx sync.Mutex - client k8sclient.Interface +type store interface { + // AddConditionally adds the given secret to the store if the given condition returns true. If there is no existing + // secret, the condition will be called with an empty secret and false. If updateAccess is true and the secret already exists, + // then the lastAccess timestamp is updated to time.Now() independently of the condition result. + // Note: if the given condition is nil, then it is considered as a condition that always returns false. + AddConditionally(key string, sd secret, updateAccess bool, cond conditionFn) + // ListKeys returns a list of all the keys of the secrets in the store without checking for expiration + ListKeys() []string + // List returns a list of all the secrets in the store that are not expired + List() []secret + // Get returns the secret associated with the given key from the store if it exists and is not expired. If updateAccess is true + // and the secret exists, essentially the expiration check is skipped and the lastAccess timestamp is updated to time.Now(). + Get(key string, updateAccess bool) (secret, bool) +} - secretsCacheMx sync.RWMutex - secretsCache map[string]*secretsData +// secret represents the data of a kubernetes secret that is stored in the cache +type secret struct { + // name is the name of the secret, and it derives from key + name string + // namespace is the name of the namespace, and it derives from key + namespace string + // key is the key inside the secret, and it derives from key + key string + // value is the value of key inside the secret + value string + // apiExists is true if the secret was fetched from the API with no errors + apiExists bool + // apiFetchTime is the time the secret was fetched from the API + apiFetchTime time.Time } -type secretsData struct { - value string - lastAccess time.Time +type conditionFn func(existing secret, exists bool) bool + +type contextProviderK8SSecrets struct { + logger *logger.Logger + config *Config + client k8sclient.Interface + clientMtx sync.RWMutex + running chan struct{} + store store } -// ContextProviderBuilder builds the context provider. +// ContextProviderBuilder builds the kubernetes_secrets context provider. By default, this provider employs a cache +// to reduce the number of requests to the API server. The cache refreshes the secrets referenced during each Fetch call +// every Config.RefreshInterval. To maintain only secrets that are actually needed by the agent, each secret reference +// expires based on the Config.TTLDelete. During expiration of secret references or actual changes of secret values, +// the kubernetes_secrets provider calls the ContextProviderComm.Signal() to notify the agent. The cache mechanism +// can be disabled by setting Config.DisableCache to true. func ContextProviderBuilder(logger *logger.Logger, c *config.Config, _ bool) (corecomp.ContextProvider, error) { - var cfg Config + cfg := defaultConfig() + if c == nil { c = config.New() } - err := c.UnpackTo(&cfg) + + err := c.UnpackTo(cfg) if err != nil { return nil, errors.New(err, "failed to unpack configuration") } - return &contextProviderK8sSecrets{ - logger: logger, - config: &cfg, - secretsCache: make(map[string]*secretsData), + return &contextProviderK8SSecrets{ + logger: logger, + config: cfg, + client: nil, + running: make(chan struct{}), + store: newExpirationCache(cfg.TTLDelete), }, nil } -func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) { - if p.config.DisableCache { - valid := p.validateKey(key) - if valid { - return p.fetchSecretWithTimeout(key) - } else { - return "", false - } - } else { - return p.getFromCache(key) - } -} - // Run initializes the k8s secrets context provider. -func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { +func (p *contextProviderK8SSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions) if err != nil { - p.logger.Debugf("kubernetes_secrets provider skipped, unable to connect: %s", err) - return nil + // signal that the provider has initialized + close(p.running) + p.logger.Debug(k8sSecretsProviderName, " provider skipped, unable to connect: ", err.Error()) + return err } - p.clientMx.Lock() + p.clientMtx.Lock() p.client = client - p.clientMx.Unlock() + p.clientMtx.Unlock() if !p.config.DisableCache { - go p.updateSecrets(ctx, comm) + go p.refreshCache(ctx, comm) } + // signal that the provider has initialized + close(p.running) <-comm.Done() - p.clientMx.Lock() + p.clientMtx.Lock() p.client = nil - p.clientMx.Unlock() + p.clientMtx.Unlock() + return comm.Err() } -func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return kubernetes.GetKubernetesClient(kubeconfig, opt) +// Fetch returns the secret value for the given key +func (p *contextProviderK8SSecrets) Fetch(key string) (string, bool) { + // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" + tokens := strings.Split(key, ".") + if len(tokens) > 0 && tokens[0] != k8sSecretsProviderName { + return "", false + } + if len(tokens) != 4 { + p.logger.Warn("Invalid secret key format: ", key, ". Secrets should be of the format kubernetes_secrets.namespace.secret_name.value") + return "", false + } + + ctx := context.Background() + + secretNamespace := tokens[1] + secretName := tokens[2] + secretKey := tokens[3] + + // Wait for the provider to be initialized + <-p.running + + if p.config.DisableCache { + // cache disabled - fetch secret from the API + return p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) + } + + // cache enabled + sd, exists := p.store.Get(key, true) + if exists { + // cache hit + return sd.value, sd.apiExists + } + + // cache miss - fetch secret from the API + apiSecretValue, apiExists := p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) + now := time.Now() + sd = secret{ + name: secretName, + namespace: secretNamespace, + key: secretKey, + value: apiSecretValue, + apiExists: apiExists, + apiFetchTime: now, + } + p.store.AddConditionally(key, sd, true, func(existing secret, exists bool) bool { + if !exists { + // no existing secret in the cache thus add it + return true + } + if existing.value != apiSecretValue && !existing.apiFetchTime.After(now) { + // there is an existing secret in the cache but its value has changed since the last time + // it was fetched from the API thus update it + return true + } + // there is an existing secret in the cache, and it points already to the latest value + // thus do not update it and derive the value and apiExists from the existing secret + apiSecretValue = existing.value + apiExists = existing.apiExists + return false + }) + return apiSecretValue, apiExists } -// Update the secrets in the cache every RefreshInterval -func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context, comm corecomp.ContextProviderComm) { +// refreshCache refreshes the secrets in the cache every p.config.RefreshInterval +func (p *contextProviderK8SSecrets) refreshCache(ctx context.Context, comm corecomp.ContextProviderComm) { timer := time.NewTimer(p.config.RefreshInterval) for { select { case <-ctx.Done(): return case <-timer.C: - updatedCache := p.updateCache() - if updatedCache { + hasUpdate := p.updateSecrets(ctx) + if hasUpdate { p.logger.Info("Secrets cache was updated, the agent will be notified.") comm.Signal() } @@ -119,197 +209,79 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context, comm core } } -// mergeWithCurrent merges the updated map with the cache map. -// This function needs to be called between the mutex lock for the map. -func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) (map[string]*secretsData, bool) { - merged := make(map[string]*secretsData) - updatedCache := false - - for name, data := range p.secretsCache { - diff := time.Since(data.lastAccess) - if diff < p.config.TTLDelete { - merged[name] = data - // Check if this key is part of the updatedMap. If it is not, we know the secrets cache was updated, - // and we need to signal that. - _, ok := updatedMap[name] - if !ok { - updatedCache = true - } +// updateSecrets causes all the non-expired secrets to be re-fetched from the API server and returns true if +// any of the secrets an updated value or has expired +func (p *contextProviderK8SSecrets) updateSecrets(ctx context.Context) bool { + // Keep track whether the cache had updates + hasUpdates := false + + secretKeys := p.store.ListKeys() + for _, key := range secretKeys { + sd, exists := p.store.Get(key, false) + if !exists { + // this item has expired thus mark that the cache has updates and continue + hasUpdates = true + continue } - } - for name, data := range updatedMap { - // We need to check if the key is already in the new map. If it is, lastAccess cannot be overwritten since - // it could have been updated when trying to fetch the secret at the same time we are running update cache. - // In that case, we only update the value. - if _, ok := merged[name]; ok { - if merged[name].value != data.value { - merged[name].value = data.value - updatedCache = true - } + apiSecretValue, apiExists := p.fetchFromAPI(ctx, sd.name, sd.namespace, sd.key) + now := time.Now() + sd = secret{ + name: sd.name, + namespace: sd.namespace, + key: sd.key, + value: apiSecretValue, + apiExists: apiExists, + apiFetchTime: now, } - } - - return merged, updatedCache -} -func (p *contextProviderK8sSecrets) updateCache() bool { - // Keep track whether the cache had values changing, so we can notify the agent - updatedCache := false - - // deleting entries does not free the memory, so we need to create a new map - // to place the secrets we want to keep - cacheTmp := make(map[string]*secretsData) - - // to not hold the lock for long, we copy the current state of the cache map - copyMap := make(map[string]secretsData) - p.secretsCacheMx.RLock() - for name, data := range p.secretsCache { - copyMap[name] = *data - } - p.secretsCacheMx.RUnlock() - - // The only way to update an entry in the cache is through the last access time (to delete the key) - // or if the value gets updated. - for name, data := range copyMap { - diff := time.Since(data.lastAccess) - if diff < p.config.TTLDelete { - value, ok := p.fetchSecretWithTimeout(name) - if ok { - newData := &secretsData{ - value: value, - lastAccess: data.lastAccess, - } - cacheTmp[name] = newData - if value != data.value { - updatedCache = true - } + p.store.AddConditionally(key, sd, false, func(existing secret, exists bool) bool { + if !exists { + // no existing secret which means it has been removed until we fetched it + // from the API. In this case we do not want to update the cache, but we + // mark that the cache has updates + hasUpdates = true + return false } - } else { - updatedCache = true - } - } - - // While the cache was updated, it is possible that some secret was added through another go routine. - // We need to merge the updated map with the current cache map to catch the new entries and avoid - // loss of data. - var updated bool - p.secretsCacheMx.Lock() - p.secretsCache, updated = p.mergeWithCurrent(cacheTmp) - p.secretsCacheMx.Unlock() - - return updatedCache || updated -} - -func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) { - p.secretsCacheMx.RLock() - _, ok := p.secretsCache[key] - p.secretsCacheMx.RUnlock() - - // if value is still not present in cache, it is possible we haven't tried to fetch it yet - if !ok { - value, ok := p.addToCache(key) - // if it was not possible to fetch the secret, return - if !ok { - return value, ok - } - } - - p.secretsCacheMx.Lock() - data, ok := p.secretsCache[key] - data.lastAccess = time.Now() - pass := data.value - p.secretsCacheMx.Unlock() - - return pass, ok -} - -func (p *contextProviderK8sSecrets) validateKey(key string) bool { - // Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value" - tokens := strings.Split(key, ".") - if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" { - return false - } - if len(tokens) != 4 { - p.logger.Debugf( - "not valid secret key: %v. Secrets should be of the following format %v", - key, - "kubernetes_secrets.somenamespace.somesecret.value", - ) - return false - } - return true -} - -func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) { - valid := p.validateKey(key) - if !valid { - return "", false - } - - value, ok := p.fetchSecretWithTimeout(key) - if ok { - p.secretsCacheMx.Lock() - p.secretsCache[key] = &secretsData{value: value} - p.secretsCacheMx.Unlock() + if existing.value != apiSecretValue && !existing.apiFetchTime.After(now) { + // the secret value has changed and the above fetchFromAPI is more recent thus + // add it and mark that the cache has updates + hasUpdates = true + return true + } + // the secret value has not changed + return false + }) } - return value, ok -} -type Result struct { - value string - ok bool + return hasUpdates } -func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) { - ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout) +// fetchFromAPI fetches the secret value from the API +func (p *contextProviderK8SSecrets) fetchFromAPI(ctx context.Context, secretName string, secretNamespace string, secretKey string) (string, bool) { + ctx, cancel := context.WithTimeout(ctx, p.config.RequestTimeout) defer cancel() - resultCh := make(chan Result, 1) - p.fetchSecret(ctxTimeout, key, resultCh) - - select { - case <-ctxTimeout.Done(): - p.logger.Errorf("Could not retrieve value for key %v: %v", key, ctxTimeout.Err()) + p.clientMtx.RLock() + if p.client == nil { + // k8s client is nil most probably due to an error at p.Run + p.clientMtx.RUnlock() return "", false - case result := <-resultCh: - return result.value, result.ok } -} - -func (p *contextProviderK8sSecrets) fetchSecret(context context.Context, key string, resultCh chan Result) { - p.clientMx.Lock() - client := p.client - p.clientMx.Unlock() - if client == nil { - resultCh <- Result{value: "", ok: false} - return - } - - tokens := strings.Split(key, ".") - // key has the format "kubernetes_secrets.somenamespace.somesecret.value" - // This function is only called from: - // - addToCache, where we already validated that the key has the right format. - // - updateCache, where the results are only added to the cache through addToCache - // Because of this we no longer need to validate the key - ns := tokens[1] - secretName := tokens[2] - secretVar := tokens[3] - - secretInterface := client.CoreV1().Secrets(ns) - secret, err := secretInterface.Get(context, secretName, metav1.GetOptions{}) + c := p.client + p.clientMtx.RUnlock() + si := c.CoreV1().Secrets(secretNamespace) + secret, err := si.Get(ctx, secretName, metav1.GetOptions{}) if err != nil { - p.logger.Errorf("Could not retrieve secret from k8s API: %v", err) - resultCh <- Result{value: "", ok: false} - return + p.logger.Warn("Could not retrieve secret ", secretName, " at namespace ", secretNamespace, ": ", err.Error()) + return "", false } - if _, ok := secret.Data[secretVar]; !ok { - p.logger.Errorf("Could not retrieve value %v for secret %v", secretVar, secretName) - resultCh <- Result{value: "", ok: false} - return + + if _, ok := secret.Data[secretKey]; !ok { + p.logger.Warn("Could not retrieve value of key ", secretKey, " for secret ", secretName, " at namespace ", secretNamespace) + return "", false } - secretString := secret.Data[secretVar] - resultCh <- Result{value: string(secretString), ok: true} + return string(secret.Data[secretKey]), true } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index c440147f7b2..4ff4c00a37d 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -6,17 +6,14 @@ package kubernetessecrets import ( "context" + "errors" "fmt" - "strings" "sync" "testing" "time" - ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" - - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sclient "k8s.io/client-go/kubernetes" @@ -24,605 +21,977 @@ import ( "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-libs/logp" + ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" "github.com/elastic/elastic-agent/internal/pkg/config" ) -const ( - ns = "test_namespace" - pass = "testing_passpass" -) +func Test_Fetch(t *testing.T) { + testDataBuilder := secretTestDataBuilder{ + namespace: "default", + name: "secret_name", + key: "secret_key", + } -func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { - client := k8sfake.NewSimpleClientset() - secret := &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", + for _, tc := range []struct { + name string + providerCfg Config + k8sClient k8sclient.Interface + storeInit func(t *testing.T) store + keyToFetch string + expectedValue string + expectedFound bool + expectedCache map[string]*cacheEntry + }{ + { + name: "invalid key format", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + keyToFetch: "secret_name", + expectedValue: "", + expectedFound: false, + expectedCache: nil, }, - ObjectMeta: metav1.ObjectMeta{ - Name: "testing_secret", - Namespace: ns, + { + name: "invalid key format missing tokens", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + keyToFetch: fmt.Sprintf("%s.default.secret_name", k8sSecretsProviderName), + expectedValue: "", + expectedFound: false, + expectedCache: nil, }, - Data: map[string][]byte{ - "secret_value": []byte(pass), + { + name: "invalid key inside secret", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + keyToFetch: fmt.Sprintf("%s.default.secret_name.wrong", k8sSecretsProviderName), + expectedValue: "", + expectedFound: false, + expectedCache: buildCacheMap( + buildCacheEntry("default", "secret_name", "wrong", "", false, time.Now(), time.Now()), + ), }, - } - _, err := client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) - require.NoError(t, err) - - logger := logp.NewLogger("test_k8s_secrets") - cfg, err := config.NewConfigFrom(map[string]string{"a": "b"}) - require.NoError(t, err) - - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) - - fp, _ := p.(*contextProviderK8sSecrets) - - getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return client, nil - } - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - comm := ctesting.NewContextComm(ctx) - - go func() { - _ = fp.Run(ctx, comm) - }() - - for { - fp.clientMx.Lock() - client := fp.client - fp.clientMx.Unlock() - if client != nil { - break - } - <-time.After(10 * time.Millisecond) - } - - val, found := fp.Fetch("kubernetes_secrets.test_namespace.testing_secretHACK.secret_value") - assert.False(t, found) - assert.EqualValues(t, val, "") -} - -func Test_K8sSecretsProvider_Fetch_Cache_Enabled(t *testing.T) { - client := k8sfake.NewSimpleClientset() - - ttlDelete, err := time.ParseDuration("1s") - require.NoError(t, err) - - refreshInterval, err := time.ParseDuration("100ms") - require.NoError(t, err) - - secret := &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", + { + name: "k8s client nil", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: nil, + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + keyToFetch: testDataBuilder.getFetchKey(), + expectedValue: "", + expectedFound: false, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("", false, time.Now(), time.Now()), + ), }, - ObjectMeta: metav1.ObjectMeta{ - Name: "testing_secret", - Namespace: ns, + { + name: "cache-disabled API-hit", + providerCfg: Config{ + RequestTimeout: time.Second, + DisableCache: true, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + expectedValue: "secret_value", + expectedFound: true, }, - Data: map[string][]byte{ - "secret_value": []byte(pass), + { + name: "cache-disabled API-miss", + providerCfg: Config{ + RequestTimeout: time.Second, + DisableCache: true, + }, + k8sClient: k8sfake.NewClientset(), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + expectedValue: "", + expectedFound: false, }, - } - _, err = client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) - require.NoError(t, err) - - logger := logp.NewLogger("test_k8s_secrets") - - c := map[string]interface{}{ - "cache_refresh_interval": refreshInterval, - "cache_ttl": ttlDelete, - "cache_disable": false, - } - cfg, err := config.NewConfigFrom(c) - require.NoError(t, err) - - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) - - fp, _ := p.(*contextProviderK8sSecrets) - - getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return client, nil - } - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - comm := ctesting.NewContextComm(ctx) - - go func() { - _ = fp.Run(ctx, comm) - }() - - for { - fp.clientMx.Lock() - client := fp.client - fp.clientMx.Unlock() - if client != nil { - break - } - <-time.After(10 * time.Millisecond) - } - - // Secret cache should be empty at start - fp.secretsCacheMx.Lock() - assert.Equal(t, 0, len(fp.secretsCache)) - fp.secretsCacheMx.Unlock() - - key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" - - // Secret should be in the cache after this call - val, found := fp.Fetch(key) - assert.True(t, found) - assert.Equal(t, val, pass) - fp.secretsCacheMx.RLock() - assert.Equal(t, len(fp.secretsCache), 1) - assert.NotNil(t, fp.secretsCache[key]) - assert.NotZero(t, fp.secretsCache[key].lastAccess) - fp.secretsCacheMx.RUnlock() - - // Update the secret and check after TTL time, the secret value is correct - newPass := "new-pass" - secret = &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", + { + name: "cache-hit", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { + s := newExpirationCache(time.Minute) + s.Lock() + s.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + s.Unlock() + return s + }, + expectedValue: "secret_value", + expectedFound: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, - ObjectMeta: metav1.ObjectMeta{ - Name: "testing_secret", - Namespace: ns, + { + name: "cache-miss API-hit", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + expectedValue: "secret_value", + expectedFound: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, - Data: map[string][]byte{ - "secret_value": []byte(newPass), + { + name: "cache-miss API-miss", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset(), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { return newExpirationCache(time.Minute) }, + expectedValue: "", + expectedFound: false, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("", false, time.Now(), time.Now()), + ), }, - } - _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) - require.NoError(t, err) - - // wait for ttl update - <-time.After(refreshInterval) - status := &strings.Builder{} - duration := refreshInterval * 3 - assert.Eventuallyf(t, func() bool { - val, found = fp.Fetch(key) - isNewPass := val == newPass - if found && isNewPass { - return true - } - - fmt.Fprintf(status, "found: %t, isNewPass: %t", found, isNewPass) - return false - }, duration, refreshInterval, - "Failed to update the secret value after TTL update has passed. Tried fetching for %d. Last status: %s", - duration, status) - - // After TTL delete, secret should no longer be found in cache since it was never - // fetched during that time - <-time.After(ttlDelete) - assert.Eventuallyf(t, func() bool { - fp.secretsCacheMx.RLock() - size := len(fp.secretsCache) - fp.secretsCacheMx.RUnlock() - return size == 0 - }, ttlDelete*3, ttlDelete, "Failed to delete the secret after TTL delete has passed.") - -} - -func Test_K8sSecretsProvider_Fetch_Cache_Disabled(t *testing.T) { - client := k8sfake.NewSimpleClientset() - - secret := &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", + { + name: "cache-miss contention with newer fetch", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset(), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + ms := newMockStore(t) + ms.On("Get", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + // when Fetch calls Get, we silently insert another secret with the same key + // to simulate another fetch/cache update happening in parallel and thus test + // that the AddConditionally in Fetch works as expected. We also mark it's last + // access to one hour ago to check that Fetch always updates the lastAccess of + // an existing item. + key := args.Get(0).(string) + exps.Lock() + exps.items[key] = testDataBuilder.buildCacheEntry("value_from_cache", true, time.Now().Add(1*time.Hour), time.Now().Add(-time.Hour)) + exps.Unlock() + }).Return(secret{}, false).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedValue: "value_from_cache", + expectedFound: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("value_from_cache", true, time.Now(), time.Now()), + ), }, - ObjectMeta: metav1.ObjectMeta{ - Name: "testing_secret", - Namespace: ns, + { + name: "cache-miss contention same value", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset(), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + ms := newMockStore(t) + ms.On("Get", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + // when Fetch calls Get, we silently insert another secret with the same key + // to simulate another fetch/cache update happening in parallel and thus test + // that the AddConditionally in Fetch works as expected. We also mark it's last + // access to one hour ago to check that Fetch always updates the lastAccess of + // an existing item. + key := args.Get(0).(string) + exps.Lock() + exps.items[key] = testDataBuilder.buildCacheEntry("secret_value", true, time.Now().Add(1*time.Hour), time.Now().Add(-time.Hour)) + exps.Unlock() + }).Return(secret{}, false).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedValue: "secret_value", + expectedFound: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, - Data: map[string][]byte{ - "secret_value": []byte(pass), + { + name: "cache-miss contention with older fetch", + providerCfg: Config{ + RequestTimeout: time.Second, + }, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + keyToFetch: testDataBuilder.getFetchKey(), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + ms := newMockStore(t) + ms.On("Get", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + // when Fetch calls Get, we silently insert another secret with the same key + // to simulate another fetch/cache update happening in parallel and thus test + // that the AddConditionally in Fetch works as expected. We also mark it's last + // access to one hour ago to check that Fetch always updates the lastAccess of + // an existing item. + key := args.Get(0).(string) + exps.Lock() + exps.items[key] = testDataBuilder.buildCacheEntry("value_from_cache", true, time.Now(), time.Now().Add(-time.Hour)) + exps.Unlock() + }).Return(secret{}, false).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedValue: "secret_value", + expectedFound: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, - } - _, err := client.CoreV1().Secrets(ns).Create(context.Background(), secret, metav1.CreateOptions{}) - require.NoError(t, err) - - logger := logp.NewLogger("test_k8s_secrets") - - c := map[string]interface{}{ - "cache_disable": true, - } - cfg, err := config.NewConfigFrom(c) - require.NoError(t, err) + } { + t.Run(tc.name, func(t *testing.T) { + running := make(chan struct{}) + close(running) + p := contextProviderK8SSecrets{ + logger: logp.NewLogger("test_k8s_secrets"), + config: &tc.providerCfg, + client: tc.k8sClient, + running: running, + store: tc.storeInit(t), + } - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) + val, found := p.Fetch(tc.keyToFetch) + require.Equal(t, tc.expectedFound, found) + require.Equal(t, tc.expectedValue, val) - fp, _ := p.(*contextProviderK8sSecrets) + list := p.store.List() + require.Equal(t, len(tc.expectedCache), len(list)) - getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { - return client, nil - } - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - comm := ctesting.NewContextComm(ctx) - - go func() { - _ = fp.Run(ctx, comm) - }() - - for { - fp.clientMx.Lock() - client := fp.client - fp.clientMx.Unlock() - if client != nil { - break - } - <-time.After(10 * time.Millisecond) + cacheMap := make(map[string]secret) + for _, s := range list { + cacheMap[fmt.Sprintf("%s.%s.%s.%s", k8sSecretsProviderName, s.namespace, s.name, s.key)] = s + } + for k, v := range tc.expectedCache { + inCache, exists := cacheMap[k] + require.True(t, exists) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.name, inCache.name) + require.Equal(t, v.s.namespace, inCache.namespace) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.value, inCache.value) + require.Equal(t, v.s.apiExists, inCache.apiExists) + } + }) } +} - key := "kubernetes_secrets.test_namespace.testing_secret.secret_value" - - // Secret should be in the cache after this call - val, found := fp.Fetch(key) - assert.True(t, found) - assert.Equal(t, val, pass) +func Test_UpdateCache(t *testing.T) { + testDataBuilder := secretTestDataBuilder{ + namespace: "default", + name: "secret_name", + key: "secret_key", + } - // Update the secret and check the result - newPass := "new-pass" - secret = &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", + for _, tc := range []struct { + name string + providerCfg Config + k8sClient k8sclient.Interface + storeInit func(t *testing.T) store + expectedUpdate bool + expectedCache map[string]*cacheEntry + }{ + { + // check that cache returns true if a secret is expired + name: "secret-expired", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now().Add(-time.Hour)), + ) + exps.Unlock() + return exps + }, + expectedUpdate: true, }, - ObjectMeta: metav1.ObjectMeta{ - Name: "testing_secret", - Namespace: ns, + { + // check that cache returns false if there is no change in the secret + name: "secret-no-change API-hit", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + exps.Unlock() + return exps + }, + expectedUpdate: false, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, - Data: map[string][]byte{ - "secret_value": []byte(newPass), + { + // check that cache returns true if there is a change in the secret + name: "secret-change API-hit", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value_new"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + exps.Unlock() + return exps + }, + expectedUpdate: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value_new", true, time.Now(), time.Now()), + ), }, - } - _, err = client.CoreV1().Secrets(ns).Update(context.Background(), secret, metav1.UpdateOptions{}) - require.NoError(t, err) - - val, found = fp.Fetch(key) - assert.True(t, found) - assert.Equal(t, val, newPass) - - // Check key that does not exist - val, found = fp.Fetch(key + "doesnotexist") - assert.False(t, found) - assert.Equal(t, "", val) -} + { + // check that cache returns true if the API returns an error at refreshing of an existing secret + name: "secret-change API-miss", + k8sClient: k8sfake.NewClientset(), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + exps.Unlock() + return exps + }, + expectedUpdate: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("", false, time.Now(), time.Now()), + ), + }, + { + // check that lastAccess is updated for expired secrets if they are updated + name: "secret-expired with update", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value_old", true, time.Now(), time.Now()), + ) + exps.Unlock() + ms := newMockStore(t) + getMockCall := ms.On("Get", mock.Anything, mock.Anything) + getMockCall.Run(func(args mock.Arguments) { + key := args.Get(0).(string) + ret, exists := exps.Get(key, args.Get(1).(bool)) + // when updateSecrets calls Get, we silently shift one hour back the lastAccess of an existing secret to test + // that the AddConditionally in updateCache works as expected and that the lastAccess is updated + exps.Lock() + exps.items[key].lastAccess = time.Now().Add(-time.Hour) + exps.Unlock() + getMockCall.Return(ret, exists) + }).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + listKeys := ms.On("ListKeys") + listKeys.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listKeys.Return(exps.ListKeys()) + }).Once() + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedUpdate: true, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), + }, + { + // check that lastAccess is not updated for expired secrets if they are not updated + name: "secret-expired with no update", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + exps.Unlock() + ms := newMockStore(t) + getMockCall := ms.On("Get", mock.Anything, mock.Anything) + getMockCall.Run(func(args mock.Arguments) { + key := args.Get(0).(string) + ret, exists := exps.Get(key, args.Get(1).(bool)) + // when updateSecrets calls Get, we silently shift one hour back the lastAccess of an existing secret to test + // that the AddConditionally in updateCache works as expected and that the lastAccess is not updated + // if there is no update + exps.Lock() + exps.items[key].lastAccess = time.Now().Add(-time.Hour) + exps.Unlock() + getMockCall.Return(ret, exists) + }).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + listKeys := ms.On("ListKeys") + listKeys.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listKeys.Return(exps.ListKeys()) + }).Once() + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedUpdate: false, + expectedCache: map[string]*cacheEntry{}, + }, + { + // check that cache returns true if a secret is removed (aka expired) while it was being fetched from the API + name: "secret-change contention secret removed", + k8sClient: k8sfake.NewClientset(), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ) + exps.Unlock() + ms := newMockStore(t) + getMockCall := ms.On("Get", mock.Anything, mock.Anything) + getMockCall.Run(func(args mock.Arguments) { + key := args.Get(0).(string) + ret, exists := exps.Get(key, args.Get(1).(bool)) + // when updateSecrets calls Get, we silently remove an existing secret to test + // that the AddConditionally in updateCache works as expected when the secret is removed + exps.Lock() + delete(exps.items, key) + exps.Unlock() + getMockCall.Return(ret, exists) + }).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + listKeys := ms.On("ListKeys") + listKeys.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listKeys.Return(exps.ListKeys()) + }).Once() + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedUpdate: true, + expectedCache: nil, + }, + { + // check that cache returns false if a secret is more recently fetched from the API + name: "secret-change contention with newer fetched item", + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + storeInit: func(t *testing.T) store { + exps := newExpirationCache(time.Minute) + exps.Lock() + exps.items = buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value_cached", true, time.Now(), time.Now()), + ) + exps.Unlock() + ms := newMockStore(t) + getMock := ms.On("Get", mock.Anything, mock.Anything) + getMock.Run(func(args mock.Arguments) { + key := args.Get(0).(string) + ret, exists := exps.Get(key, args.Get(1).(bool)) + // when updateSecrets calls Get, we silently mark the existing secret to a newer fetch from API time + // to test that the AddConditionally in updateCache works as expected + exps.Lock() + exps.items[key].s.apiFetchTime = time.Now().Add(time.Hour) + exps.Unlock() + getMock.Return(ret, exists) + }).Once() + ms.On("AddConditionally", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + key := args.Get(0).(string) + obj := args.Get(1).(secret) + updateAccess := args.Get(2).(bool) + condition := args.Get(3).(conditionFn) + exps.AddConditionally(key, obj, updateAccess, condition) + }).Once() + listKeys := ms.On("ListKeys") + listKeys.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listKeys.Return(exps.ListKeys()) + }).Once() + listMock := ms.On("List") + listMock.Run(func(args mock.Arguments) { + // need to evaluate this at runtime + listMock.Return(exps.List()) + }).Once() + return ms + }, + expectedUpdate: false, + expectedCache: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value_cached", true, time.Now(), time.Now()), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + running := make(chan struct{}) + close(running) + p := contextProviderK8SSecrets{ + logger: logp.NewLogger("test_k8s_secrets"), + config: &tc.providerCfg, + client: tc.k8sClient, + running: running, + store: tc.storeInit(t), + } -func Test_MergeWitchCurrent(t *testing.T) { - logger := logp.NewLogger("test_k8s_secrets") + hasUpdates := p.updateSecrets(context.Background()) + require.Equal(t, tc.expectedUpdate, hasUpdates) - c := map[string]interface{}{} - cfg, err := config.NewConfigFrom(c) - require.NoError(t, err) + list := p.store.List() + require.Equal(t, len(tc.expectedCache), len(list)) - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) + cacheMap := make(map[string]secret) + for _, s := range list { + cacheMap[fmt.Sprintf("%s.%s.%s.%s", k8sSecretsProviderName, s.namespace, s.name, s.key)] = s + } + for k, v := range tc.expectedCache { + inCache, exists := cacheMap[k] + require.True(t, exists) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.name, inCache.name) + require.Equal(t, v.s.namespace, inCache.namespace) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.value, inCache.value) + require.Equal(t, v.s.apiExists, inCache.apiExists) + } + }) + } +} - fp, _ := p.(*contextProviderK8sSecrets) +func Test_Run(t *testing.T) { + testDataBuilder := secretTestDataBuilder{ + namespace: "default", + name: "secret_name", + key: "secret_key", + } - ts := time.Now() - var tests = []struct { - secretsCache map[string]*secretsData - updatedMap map[string]*secretsData - mergedMap map[string]*secretsData - updatedCache bool - message string + tests := []struct { + name string + providerCfg Config + expectedSignal bool + waitForSignal time.Duration + k8sClient k8sclient.Interface + k8sClientErr error + preCacheState map[string]*cacheEntry + postCacheState map[string]*cacheEntry + secretToFetch string }{ { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - updatedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one-updated", - lastAccess: ts, - }, - }, - mergedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one-updated", - lastAccess: ts, - }, - }, - updatedCache: true, - message: "When the value of one of the keys in the map gets updated, updatedCache should be true.", + // check that the cache signals ContextComm when the secret is updated + name: "secret-update-value and signal", + providerCfg: Config{ + RefreshInterval: 100 * time.Millisecond, + RequestTimeout: 100 * time.Millisecond, + TTLDelete: 2 * time.Second, + DisableCache: false, + }, + expectedSignal: true, + waitForSignal: time.Second, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + k8sClientErr: nil, + preCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value_old", true, time.Now(), time.Now()), + ), + postCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - updatedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - mergedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - updatedCache: false, - message: "When nothing changes in the cache, updatedCache should be false.", + // check that the cache does not signal ContextComm when the secret is not updated + name: "secret no-update and no-signal", + providerCfg: Config{ + RefreshInterval: 100 * time.Millisecond, + RequestTimeout: 100 * time.Millisecond, + TTLDelete: 2 * time.Second, + DisableCache: false, + }, + expectedSignal: false, + waitForSignal: time.Second, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + k8sClientErr: nil, + preCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), + postCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), }, { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - "kubernetes_secrets.default.secret_two.secret_value": { - value: "value-two", - lastAccess: ts, - }, - }, - updatedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - mergedMap: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - "kubernetes_secrets.default.secret_two.secret_value": { - value: "value-two", - lastAccess: ts, - }, - }, - updatedCache: true, - message: "When secretsCache gets updated at the same time we create updatedMap, then merging the two should" + - "detect the change on updatedCache.", + // check that the cache signals ContextComm when a secret expires + name: "secret-expired and signal", + providerCfg: Config{ + RefreshInterval: 100 * time.Millisecond, + RequestTimeout: 100 * time.Millisecond, + TTLDelete: 500 * time.Millisecond, + DisableCache: false, + }, + expectedSignal: true, + waitForSignal: time.Second, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + k8sClientErr: nil, + preCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), + postCacheState: nil, + }, + { + // check that the cache is populated when a fetch of a non-existing secret runs + name: "fetch populates cache", + providerCfg: Config{ + RefreshInterval: 100 * time.Millisecond, + RequestTimeout: 100 * time.Millisecond, + TTLDelete: 2 * time.Second, + DisableCache: false, + }, + expectedSignal: false, + waitForSignal: time.Second, + k8sClient: k8sfake.NewClientset( + testDataBuilder.buildK8SSecret("secret_value"), + ), + k8sClientErr: nil, + secretToFetch: fmt.Sprintf("%s.default.secret_name.secret_key", k8sSecretsProviderName), + preCacheState: map[string]*cacheEntry{}, + postCacheState: buildCacheMap( + testDataBuilder.buildCacheEntry("secret_value", true, time.Now(), time.Now()), + ), + }, + { + // check that Run returns an error when the k8s client fails to initialize + name: "k8s client error", + providerCfg: Config{ + RefreshInterval: 100 * time.Millisecond, + RequestTimeout: 100 * time.Millisecond, + TTLDelete: 2 * time.Second, + DisableCache: false, + }, + k8sClient: nil, + k8sClientErr: errors.New("k8s client error"), }, } - - for _, test := range tests { - fp.secretsCache = test.secretsCache - merged, updated := fp.mergeWithCurrent(test.updatedMap) - - require.Equalf(t, len(test.mergedMap), len(merged), "Resulting merged map does not have the expected length.") - for key, data1 := range test.mergedMap { - data2, ok := merged[key] - if ok { - require.Equalf(t, data1.value, data2.value, "Resulting merged map values do not equal the expected ones.") - require.Equalf(t, data1.lastAccess, data2.lastAccess, "Resulting merged map values do not equal the expected ones.") - } else { - t.Fatalf("Resulting merged map does not have expecting keys.") + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) { + return tc.k8sClient, tc.k8sClientErr } - } + t.Cleanup(func() { + getK8sClientFunc = kubernetes.GetKubernetesClient + }) - require.Equalf(t, test.updatedCache, updated, test.message) - } -} + cfg, err := config.NewConfigFrom(tc.providerCfg) + require.NoError(t, err) -func Test_UpdateCache(t *testing.T) { - logger := logp.NewLogger("test_k8s_secrets") + log := logp.NewLogger("test_k8s_secrets") + provider, err := ContextProviderBuilder(log, cfg, true) + require.NoError(t, err) - c := map[string]interface{}{} - cfg, err := config.NewConfigFrom(c) - require.NoError(t, err) + p, is := provider.(*contextProviderK8SSecrets) + require.True(t, is) - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) + ec, is := p.store.(*expirationCache) + require.True(t, is) - fp, _ := p.(*contextProviderK8sSecrets) + if tc.k8sClientErr != nil { + require.Error(t, p.Run(context.Background(), ctesting.NewContextComm(context.Background()))) + return + } - ts := time.Now() + ec.Lock() + ec.items = tc.preCacheState + ec.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + comm := ctesting.NewContextComm(ctx) + signal := make(chan struct{}, 10) + comm.CallOnSignal(func() { + select { + case <-comm.Done(): + case signal <- struct{}{}: + } + }) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Run(ctx, comm) + }() + + <-p.running + + if tc.secretToFetch != "" { + p.Fetch(tc.secretToFetch) + } - client := k8sfake.NewSimpleClientset() - secret := &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "apps/v1beta1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "secret_one", - Namespace: "default", - }, - Data: map[string][]byte{ - "secret_value": []byte(pass), - }, - } - _, err = client.CoreV1().Secrets("default").Create(context.Background(), secret, metav1.CreateOptions{}) - require.NoError(t, err) + receivedSignal := false + select { + case <-signal: + receivedSignal = true + case <-time.After(tc.waitForSignal): + } + cancel() + + wg.Wait() + + require.Equal(t, tc.expectedSignal, receivedSignal) - fp.client = client + list := p.store.List() + require.Equal(t, len(tc.postCacheState), len(list)) - var tests = []struct { - secretsCache map[string]*secretsData - expectedCache map[string]*secretsData - updatedCache bool - message string + cacheMap := make(map[string]secret) + for _, s := range list { + cacheMap[fmt.Sprintf("%s.%s.%s.%s", k8sSecretsProviderName, s.namespace, s.name, s.key)] = s + } + for k, v := range tc.postCacheState { + inCache, exists := cacheMap[k] + require.True(t, exists) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.name, inCache.name) + require.Equal(t, v.s.namespace, inCache.namespace) + require.Equal(t, v.s.key, inCache.key) + require.Equal(t, v.s.value, inCache.value) + require.Equal(t, v.s.apiExists, inCache.apiExists) + } + }) + } +} + +func Test_Config(t *testing.T) { + for _, tc := range []struct { + name string + inConfig map[string]interface{} + expectedConfig *Config + expectErr bool }{ { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - expectedCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: pass, - lastAccess: ts, - }, + name: "default config", + inConfig: nil, + expectedConfig: defaultConfig(), + }, + { + name: "invalid config negative refresh interval", + inConfig: map[string]interface{}{ + "cache_refresh_interval": -1, }, - updatedCache: true, - message: "When last access is still within the limits, values should be updated.", + expectErr: true, }, { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts.Add(-fp.config.TTLDelete - time.Minute), - }, + name: "invalid config zero refresh interval", + inConfig: map[string]interface{}{ + "cache_refresh_interval": 0, }, - expectedCache: map[string]*secretsData{}, - updatedCache: true, - message: "When last access is no longer within the limits, the data should be deleted.", + expectErr: true, }, { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: pass, - lastAccess: ts, - }, + name: "invalid config negative cache_request_timeout", + inConfig: map[string]interface{}{ + "cache_request_timeout": -1, }, - expectedCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: pass, - lastAccess: ts, - }, + expectErr: true, + }, + { + name: "invalid config zero cache_request_timeout", + inConfig: map[string]interface{}{ + "cache_request_timeout": 0, }, - updatedCache: false, - message: "When the values did not change and last access is still within limits, no update happens.", + expectErr: true, }, - } + } { + t.Run(tc.name, func(t *testing.T) { + var cfg *config.Config + var err error + + if tc.inConfig != nil { + cfg, err = config.NewConfigFrom(tc.inConfig) + require.NoError(t, err) + } - for _, test := range tests { - fp.secretsCache = test.secretsCache - updated := fp.updateCache() - - require.Equalf(t, len(test.expectedCache), len(fp.secretsCache), "Resulting updated map does not have the expected length.") - for key, data1 := range test.expectedCache { - data2, ok := fp.secretsCache[key] - if ok { - require.Equalf(t, data1.value, data2.value, "Resulting updating map values do not equal the expected ones.") - require.Equalf(t, data1.lastAccess, data2.lastAccess, "Resulting updated map values do not equal the expected ones.") - } else { - t.Fatalf("Resulting updated map does not have expecting keys.") + log := logp.NewLogger("test_k8s_secrets") + provider, err := ContextProviderBuilder(log, cfg, true) + if tc.expectErr { + require.Error(t, err) + return } - } - require.Equalf(t, test.updatedCache, updated, test.message) + p, is := provider.(*contextProviderK8SSecrets) + require.True(t, is) + require.Equal(t, tc.expectedConfig, p.config) + }) } +} +type secretTestDataBuilder struct { + namespace string + name string + key string } -func Test_Signal(t *testing.T) { - // The signal should get triggered every time there is an update on the cache - logger := logp.NewLogger("test_k8s_secrets") +func (b secretTestDataBuilder) buildCacheEntry(value string, exists bool, apiFetchTime time.Time, lastAccess time.Time) *cacheEntry { + return buildCacheEntry(b.namespace, b.name, b.key, value, exists, apiFetchTime, lastAccess) +} - client := k8sfake.NewSimpleClientset() +func (b secretTestDataBuilder) buildK8SSecret(value string) *v1.Secret { + return buildK8SSecret(b.namespace, b.name, b.key, value) +} - secret := &v1.Secret{ +func (b secretTestDataBuilder) getFetchKey() string { + return fmt.Sprintf("%s.%s.%s.%s", k8sSecretsProviderName, b.namespace, b.name, b.key) +} + +func buildSecret(namespace string, name string, key string, value string, exists bool, apiFetchTime time.Time) secret { + return secret{ + name: name, + namespace: namespace, + key: key, + value: value, + apiExists: exists, + apiFetchTime: apiFetchTime, + } +} + +func buildCacheEntry(namespace string, name string, key string, value string, exists bool, apiFetchTime time.Time, lastAccess time.Time) *cacheEntry { + return &cacheEntry{ + s: buildSecret(namespace, name, key, value, exists, apiFetchTime), + lastAccess: lastAccess, + } +} + +func buildCacheEntryKey(e *cacheEntry) string { + return fmt.Sprintf("%s.%s.%s.%s", k8sSecretsProviderName, e.s.namespace, e.s.name, e.s.key) +} + +func buildK8SSecret(namespace string, name string, key string, value string) *v1.Secret { + return &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", APIVersion: "apps/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "secret_one", - Namespace: "default", + Name: name, + Namespace: namespace, }, Data: map[string][]byte{ - "secret_value": []byte(pass), + key: []byte(value), }, } - _, err := client.CoreV1().Secrets("default").Create(context.Background(), secret, metav1.CreateOptions{}) - require.NoError(t, err) - - refreshInterval, err := time.ParseDuration("100ms") - require.NoError(t, err) - - c := map[string]interface{}{ - "cache_refresh_interval": refreshInterval, - } - cfg, err := config.NewConfigFrom(c) - require.NoError(t, err) - - p, err := ContextProviderBuilder(logger, cfg, true) - require.NoError(t, err) - - fp, _ := p.(*contextProviderK8sSecrets) - fp.client = client - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - comm := ctesting.NewContextComm(ctx) - - signalTriggered := new(bool) - *signalTriggered = false - var lock sync.RWMutex - - comm.CallOnSignal(func() { - lock.Lock() - *signalTriggered = true - lock.Unlock() - }) - - go fp.updateSecrets(ctx, comm) - - ts := time.Now() - - var tests = []struct { - secretsCache map[string]*secretsData - updated bool - message string - }{ - { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: "value-one", - lastAccess: ts, - }, - }, - updated: true, - message: "Value of secret should be updated and signal should be triggered.", - }, - { - secretsCache: map[string]*secretsData{ - "kubernetes_secrets.default.secret_one.secret_value": { - value: pass, - lastAccess: ts, - }, - }, - updated: false, - message: "Value of secret should not be updated and signal should not be triggered.", - }, - } - - for _, test := range tests { - fp.secretsCacheMx.Lock() - fp.secretsCache = test.secretsCache - fp.secretsCacheMx.Unlock() - - // wait for cache to be updated - <-time.After(fp.config.RefreshInterval) +} - assert.Eventuallyf(t, func() bool { - lock.RLock() - defer lock.RUnlock() - return *signalTriggered == test.updated - }, fp.config.RefreshInterval*3, fp.config.RefreshInterval, test.message) +func buildCacheMap(entry ...*cacheEntry) map[string]*cacheEntry { + cache := make(map[string]*cacheEntry) - lock.Lock() - *signalTriggered = false - lock.Unlock() + for _, e := range entry { + cache[buildCacheEntryKey(e)] = e } + return cache } diff --git a/internal/pkg/composable/providers/kubernetessecrets/store_test.go b/internal/pkg/composable/providers/kubernetessecrets/store_test.go new file mode 100644 index 00000000000..7cfd4deb8eb --- /dev/null +++ b/internal/pkg/composable/providers/kubernetessecrets/store_test.go @@ -0,0 +1,223 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +// Code generated by mockery v2.51.1. DO NOT EDIT. + +package kubernetessecrets + +import mock "github.com/stretchr/testify/mock" + +// mockStore is an autogenerated mock type for the store type +type mockStore struct { + mock.Mock +} + +type mockStore_Expecter struct { + mock *mock.Mock +} + +func (_m *mockStore) EXPECT() *mockStore_Expecter { + return &mockStore_Expecter{mock: &_m.Mock} +} + +// AddConditionally provides a mock function with given fields: key, sd, updateAccess, cond +func (_m *mockStore) AddConditionally(key string, sd secret, updateAccess bool, cond conditionFn) { + _m.Called(key, sd, updateAccess, cond) +} + +// mockStore_AddConditionally_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddConditionally' +type mockStore_AddConditionally_Call struct { + *mock.Call +} + +// AddConditionally is a helper method to define mock.On call +// - key string +// - sd secret +// - updateAccess bool +// - cond conditionFn +func (_e *mockStore_Expecter) AddConditionally(key interface{}, sd interface{}, updateAccess interface{}, cond interface{}) *mockStore_AddConditionally_Call { + return &mockStore_AddConditionally_Call{Call: _e.mock.On("AddConditionally", key, sd, updateAccess, cond)} +} + +func (_c *mockStore_AddConditionally_Call) Run(run func(key string, sd secret, updateAccess bool, cond conditionFn)) *mockStore_AddConditionally_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(secret), args[2].(bool), args[3].(conditionFn)) + }) + return _c +} + +func (_c *mockStore_AddConditionally_Call) Return() *mockStore_AddConditionally_Call { + _c.Call.Return() + return _c +} + +func (_c *mockStore_AddConditionally_Call) RunAndReturn(run func(string, secret, bool, conditionFn)) *mockStore_AddConditionally_Call { + _c.Run(run) + return _c +} + +// Get provides a mock function with given fields: key, updateAccess +func (_m *mockStore) Get(key string, updateAccess bool) (secret, bool) { + ret := _m.Called(key, updateAccess) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 secret + var r1 bool + if rf, ok := ret.Get(0).(func(string, bool) (secret, bool)); ok { + return rf(key, updateAccess) + } + if rf, ok := ret.Get(0).(func(string, bool) secret); ok { + r0 = rf(key, updateAccess) + } else { + r0 = ret.Get(0).(secret) + } + + if rf, ok := ret.Get(1).(func(string, bool) bool); ok { + r1 = rf(key, updateAccess) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// mockStore_Get_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Get' +type mockStore_Get_Call struct { + *mock.Call +} + +// Get is a helper method to define mock.On call +// - key string +// - updateAccess bool +func (_e *mockStore_Expecter) Get(key interface{}, updateAccess interface{}) *mockStore_Get_Call { + return &mockStore_Get_Call{Call: _e.mock.On("Get", key, updateAccess)} +} + +func (_c *mockStore_Get_Call) Run(run func(key string, updateAccess bool)) *mockStore_Get_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(bool)) + }) + return _c +} + +func (_c *mockStore_Get_Call) Return(_a0 secret, _a1 bool) *mockStore_Get_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockStore_Get_Call) RunAndReturn(run func(string, bool) (secret, bool)) *mockStore_Get_Call { + _c.Call.Return(run) + return _c +} + +// List provides a mock function with no fields +func (_m *mockStore) List() []secret { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []secret + if rf, ok := ret.Get(0).(func() []secret); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]secret) + } + } + + return r0 +} + +// mockStore_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List' +type mockStore_List_Call struct { + *mock.Call +} + +// List is a helper method to define mock.On call +func (_e *mockStore_Expecter) List() *mockStore_List_Call { + return &mockStore_List_Call{Call: _e.mock.On("List")} +} + +func (_c *mockStore_List_Call) Run(run func()) *mockStore_List_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *mockStore_List_Call) Return(_a0 []secret) *mockStore_List_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockStore_List_Call) RunAndReturn(run func() []secret) *mockStore_List_Call { + _c.Call.Return(run) + return _c +} + +// ListKeys provides a mock function with no fields +func (_m *mockStore) ListKeys() []string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ListKeys") + } + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// mockStore_ListKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListKeys' +type mockStore_ListKeys_Call struct { + *mock.Call +} + +// ListKeys is a helper method to define mock.On call +func (_e *mockStore_Expecter) ListKeys() *mockStore_ListKeys_Call { + return &mockStore_ListKeys_Call{Call: _e.mock.On("ListKeys")} +} + +func (_c *mockStore_ListKeys_Call) Run(run func()) *mockStore_ListKeys_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *mockStore_ListKeys_Call) Return(_a0 []string) *mockStore_ListKeys_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockStore_ListKeys_Call) RunAndReturn(run func() []string) *mockStore_ListKeys_Call { + _c.Call.Return(run) + return _c +} + +// newMockStore creates a new instance of mockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockStore(t interface { + mock.TestingT + Cleanup(func()) +}) *mockStore { + mock := &mockStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}