Skip to content

Commit

Permalink
[k8s] Fix logical race conditions in kubernetes_secrets provider (#6623)
Browse files Browse the repository at this point in the history
* fix: refactor kubernetes_secrets provider to eliminate race conditions

* fix: add changelog fragment and unit-tests for kubernetes_secrets provider

* fix: replace RWMutex with Mutex

* fix: rename newExpirationStore to newExpirationCache

* fix: introduce kubernetes_secrets provider name as a const

* fix: extend AddConditionally doc string to describe the case of condition is nil

* fix: gosec lint
  • Loading branch information
pkoutsovasilis authored Feb 10, 2025
1 parent 346a2fe commit 6d4b91c
Show file tree
Hide file tree
Showing 6 changed files with 1,480 additions and 756 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix logical race conditions in kubernetes_secrets provider

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6623

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6340
19 changes: 11 additions & 8 deletions internal/pkg/composable/providers/kubernetessecrets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
)

// Config for kubernetes provider
// Config for kubernetes_secrets provider
type Config struct {
KubeConfig string `config:"kube_config"`
KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"`

RefreshInterval time.Duration `config:"cache_refresh_interval"`
RefreshInterval time.Duration `config:"cache_refresh_interval" validate:"positive,nonzero"`
TTLDelete time.Duration `config:"cache_ttl"`
RequestTimeout time.Duration `config:"cache_request_timeout"`
RequestTimeout time.Duration `config:"cache_request_timeout" validate:"positive,nonzero"`
DisableCache bool `config:"cache_disable"`
}

func (c *Config) InitDefaults() {
c.RefreshInterval = 60 * time.Second
c.TTLDelete = 1 * time.Hour
c.RequestTimeout = 5 * time.Second
c.DisableCache = false
// defaultConfig returns default configuration for kubernetes_secrets provider
func defaultConfig() *Config {
return &Config{
RefreshInterval: 60 * time.Second,
TTLDelete: 1 * time.Hour,
RequestTimeout: 5 * time.Second,
DisableCache: false,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package kubernetessecrets

import (
"sync"
"time"
)

// expirationCache is a store that expires items after time.Now - secret.lastAccess > ttl (if ttl > 0) at Get or List.
// expirationCache works with *cacheEntry, a pointer struct that wraps secret, instead of secret directly because map
// structure in standard go library never removes the buckets from memory even after removing all the elements from it.
// However, since *cacheEntry is a pointer it can be garbage collected when no longer referenced by the GC, such as
// when deleted from the map. More importantly working with a pointer makes the entry in the map bucket, that doesn't
// get deallocated, to utilise only 8 bytes on a 64-bit system.
type expirationCache struct {
sync.Mutex
// ttl is the time-to-live for items in the cache
ttl time.Duration
// items is the underlying cache store.
items map[string]*cacheEntry
}

type cacheEntry struct {
s secret
lastAccess time.Time
}

// Get returns the secret associated with the given key from the store if it exists and is not expired. If updateAccess is true
// and the secret exists, essentially the expiration check is skipped and the lastAccess timestamp is updated to time.Now().
func (c *expirationCache) Get(key string, updateAccess bool) (secret, bool) {
c.Lock()
defer c.Unlock()

entry, exists := c.items[key]
if !exists {
return secret{}, false
}
if updateAccess {
entry.lastAccess = time.Now()
} else if c.isExpired(entry.lastAccess) {
delete(c.items, key)
return secret{}, false
}

return entry.s, true
}

// AddConditionally adds the given secret to the store if the given condition returns true. If there is no existing
// secret, the condition will be called with an empty secret and false. If updateAccess is true and the secret already exists,
// then the lastAccess timestamp is updated to time.Now() independently of the condition result.
// Note: if the given condition is nil, then it is considered as a condition that always returns false.
func (c *expirationCache) AddConditionally(key string, in secret, updateAccess bool, condition conditionFn) {
c.Lock()
defer c.Unlock()
entry, exists := c.items[key]
if !exists {
if condition != nil && condition(secret{}, false) {
c.items[key] = &cacheEntry{in, time.Now()}
}
return
}

if condition != nil && condition(entry.s, true) {
entry.s = in
entry.lastAccess = time.Now()
} else if updateAccess {
entry.lastAccess = time.Now()
}
}

// isExpired returns true if the item has expired based on the ttl
func (c *expirationCache) isExpired(lastAccess time.Time) bool {
if c.ttl <= 0 {
// no expiration
return false
}
// we expire if the last access is older than the ttl
return time.Since(lastAccess) > c.ttl
}

// ListKeys returns a list of all the keys of the secrets in the store without checking for expiration
func (c *expirationCache) ListKeys() []string {
c.Lock()
defer c.Unlock()

length := len(c.items)
if length == 0 {
return nil
}
list := make([]string, 0, length)
for key := range c.items {
list = append(list, key)
}
return list
}

// List returns a list of all the secrets in the store that are not expired
func (c *expirationCache) List() []secret {
c.Lock()
defer c.Unlock()

length := len(c.items)
if length == 0 {
return nil
}
list := make([]secret, 0, length)
for _, entry := range c.items {
if c.isExpired(entry.lastAccess) {
continue
}
list = append(list, entry.s)
}
return list
}

// newExpirationCache creates and returns an expirationCache
func newExpirationCache(ttl time.Duration) *expirationCache {
return &expirationCache{
items: make(map[string]*cacheEntry),
ttl: ttl,
}
}
Loading

0 comments on commit 6d4b91c

Please sign in to comment.