From a0eede965723985a442aa26b99b315ad160a9a38 Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Sat, 22 Feb 2025 19:14:01 +0000 Subject: [PATCH] Add cache specialized for access tokens Signed-off-by: Matheus Pimenta --- cache/go.mod | 1 + cache/involved_object.go | 23 +++++++ cache/lru.go | 80 ++++++++++++++++++++++++ cache/lru_test.go | 94 ++++++++++++++++++++++++++++ cache/store.go | 30 ++++++++- cache/token.go | 128 +++++++++++++++++++++++++++++++++++++++ cache/token_test.go | 66 ++++++++++++++++++++ 7 files changed, 419 insertions(+), 3 deletions(-) create mode 100644 cache/involved_object.go create mode 100644 cache/token.go create mode 100644 cache/token_test.go diff --git a/cache/go.mod b/cache/go.mod index aeca8fbb..3b0411e7 100644 --- a/cache/go.mod +++ b/cache/go.mod @@ -3,6 +3,7 @@ module github.com/fluxcd/pkg/cache go 1.23.0 require ( + github.com/go-logr/logr v1.4.2 github.com/onsi/gomega v1.36.2 github.com/prometheus/client_golang v1.20.5 ) diff --git a/cache/involved_object.go b/cache/involved_object.go new file mode 100644 index 00000000..4befc56e --- /dev/null +++ b/cache/involved_object.go @@ -0,0 +1,23 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +type InvolvedObject struct { + Kind string + Name string + Namespace string +} diff --git a/cache/lru.go b/cache/lru.go index 61fc5801..1e7e48d8 100644 --- a/cache/lru.go +++ b/cache/lru.go @@ -17,8 +17,11 @@ limitations under the License. package cache import ( + "context" "fmt" "sync" + + "github.com/go-logr/logr" ) // node is a node in a doubly linked list @@ -124,6 +127,83 @@ func (c *LRU[T]) Set(key string, value T) error { return nil } +// GetIfOrSet returns an item in the cache for the given key if present and +// if the condition is satisfied, or calls the fetch function to get a new +// item and stores it in the cache. The operation is thread-safe and atomic. +// The boolean return value indicates whether the item was retrieved from +// the cache. +func (c *LRU[T]) GetIfOrSet(ctx context.Context, + key string, + condition func(T) bool, + fetch func(context.Context) (T, error), + opts ...Options, +) (value T, ok bool, err error) { + + var evicted bool + + c.mu.Lock() + defer func() { + c.mu.Unlock() + + var o storeOptions + o.apply(opts...) + + // Record metrics. + status := StatusSuccess + event := CacheEventTypeMiss + switch { + case ok: + event = CacheEventTypeHit + case evicted: + recordEviction(c.metrics) + case err == nil: + recordItemIncrement(c.metrics) + default: + status = StatusFailure + } + recordRequest(c.metrics, status) + if obj := o.involvedObject; obj != nil { + c.RecordCacheEvent(event, obj.Kind, obj.Name, obj.Namespace) + } + + // Print debug logs. The involved object should already be set in the context logger. + switch l := logr.FromContextOrDiscard(ctx).V(1).WithValues("key", key); { + case err != nil: + l.Info("item refresh failed", "error", err) + case !ok: + l := l + if o.debugKey != "" { + l = l.WithValues(o.debugKey, o.debugValueFunc(value)) + } + l.Info("item refreshed") + } + }() + + var curNode *node[T] + curNode, ok = c.cache[key] + + if ok { + c.delete(curNode) + if condition(curNode.value) { + _ = c.add(curNode) + value = curNode.value + return + } + ok = false + } + + value, err = fetch(ctx) + if err != nil { + var zero T + value = zero + return + } + + evicted = c.add(&node[T]{key: key, value: value}) + + return +} + func (c *LRU[T]) add(node *node[T]) (evicted bool) { prev := c.tail.prev prev.addNext(node) diff --git a/cache/lru_test.go b/cache/lru_test.go index 645f0fac..c7242601 100644 --- a/cache/lru_test.go +++ b/cache/lru_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand/v2" "sync" @@ -326,3 +327,96 @@ func TestLRU_int(t *testing.T) { g.Expect(err).To(Succeed()) g.Expect(got).To(Equal(4)) } + +func TestLRU_GetIfOrSet(t *testing.T) { + for _, tt := range []struct { + name string + cap int + seed map[string]int + key string + condition func(int) bool + fetch func(context.Context) (int, error) + expectedValue int + expectedOk bool + expectedErr string + }{ + { + name: "cache hit", + cap: 1, + seed: map[string]int{"key": 42}, + key: "key", + condition: func(int) bool { return true }, + expectedValue: 42, + expectedOk: true, + }, + { + name: "cache hit but condition not satisfied, refresh fails", + cap: 1, + seed: map[string]int{"key": 42}, + key: "key", + condition: func(int) bool { return false }, + fetch: func(context.Context) (int, error) { return 0, fmt.Errorf("failed") }, + expectedErr: "failed", + }, + { + name: "cache hit but condition not satisfied, refresh succeeds", + cap: 1, + seed: map[string]int{"key": 42}, + key: "key", + condition: func(int) bool { return false }, + fetch: func(context.Context) (int, error) { return 53, nil }, + expectedValue: 53, + expectedOk: false, + }, + { + name: "cache miss, refresh fails", + cap: 1, + seed: map[string]int{}, + key: "key", + fetch: func(context.Context) (int, error) { return 0, fmt.Errorf("failed") }, + expectedErr: "failed", + }, + { + name: "cache miss, refresh succeeds", + cap: 1, + seed: map[string]int{}, + key: "key", + fetch: func(context.Context) (int, error) { return 42, nil }, + expectedValue: 42, + expectedOk: false, + }, + { + name: "cache miss, refresh succeeds, key evicted", + cap: 1, + seed: map[string]int{"key": 42}, + key: "key2", + fetch: func(context.Context) (int, error) { return 53, nil }, + expectedValue: 53, + expectedOk: false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + cache, err := NewLRU[int](tt.cap) + g.Expect(err).NotTo(HaveOccurred()) + + for k, v := range tt.seed { + g.Expect(cache.Set(k, v)).To(Succeed()) + } + + ctx := context.Background() + value, ok, err := cache.GetIfOrSet(ctx, tt.key, tt.condition, tt.fetch) + + if tt.expectedErr == "" { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(value).To(Equal(tt.expectedValue)) + g.Expect(ok).To(Equal(tt.expectedOk)) + } else { + g.Expect(err).To(MatchError(tt.expectedErr)) + g.Expect(value).To(BeZero()) + g.Expect(ok).To(BeFalse()) + } + }) + } +} diff --git a/cache/store.go b/cache/store.go index a5207bf9..699402d2 100644 --- a/cache/store.go +++ b/cache/store.go @@ -44,9 +44,21 @@ type Expirable[T any] interface { } type storeOptions struct { - interval time.Duration - registerer prometheus.Registerer - metricsPrefix string + interval time.Duration + registerer prometheus.Registerer + metricsPrefix string + involvedObject *InvolvedObject + debugKey string + debugValueFunc func(any) any +} + +func (o *storeOptions) apply(opts ...Options) error { + for _, opt := range opts { + if err := opt(o); err != nil { + return err + } + } + return nil } // Options is a function that sets the store options. @@ -75,3 +87,15 @@ func WithMetricsPrefix(prefix string) Options { return nil } } + +// WithInvolvedObject sets the involved object for the cache metrics. +func WithInvolvedObject(kind, name, namespace string) Options { + return func(o *storeOptions) error { + o.involvedObject = &InvolvedObject{ + Kind: kind, + Name: name, + Namespace: namespace, + } + return nil + } +} diff --git a/cache/token.go b/cache/token.go new file mode 100644 index 00000000..aeadd19e --- /dev/null +++ b/cache/token.go @@ -0,0 +1,128 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "time" +) + +// Token is an interface that represents an access token that can be used +// to authenticate with a cloud provider. The only common method is to get the +// duration of the token, because different providers may have different ways to +// represent the token. For example, Azure and GCP use an opaque string token, +// while AWS uses the pair of access key id and secret access key. Consumers of +// this token should know what type to cast this interface to. +type Token interface { + // GetDuration returns the duration for which the token is valid relative to + // approximately time.Now(). This is used to determine when the token should + // be refreshed. + GetDuration() time.Duration +} + +// TokenCache is a thread-safe cache specialized in storing and retrieving +// access tokens. It uses an LRU cache as the underlying storage and takes +// care of expiring tokens in a pessimistic way by storing both a timestamp +// with a monotonic clock (the Go default) and an absolute timestamp created +// from the Unix timestamp of when the token was created. The token is +// considered expired when either timestamps are older than the current time. +// This strategy ensures that expired tokens aren't kept in the cache for +// longer than their expiration time. Also, tokens expire on 80% of their +// lifetime, which is the same strategy used by kubelet for rotating +// ServiceAccount tokens. +type TokenCache struct { + cache *LRU[*tokenItem] +} + +type tokenItem struct { + token Token + mono time.Time + unix time.Time +} + +// NewTokenCache returns a new TokenCache with the given capacity. +func NewTokenCache(capacity int, opts ...Options) *TokenCache { + cache, _ := NewLRU[*tokenItem](capacity, opts...) + return &TokenCache{cache: cache} +} + +// GetOrSet returns the token for the given key if present and not expired, or +// calls the newToken function to get a new token and stores it in the cache. +// The operation is thread-safe and atomic. The boolean return value indicates +// whether the token was retrieved from the cache. +func (c *TokenCache) GetOrSet(ctx context.Context, + key string, + newToken func(context.Context) (Token, error), + opts ...Options, +) (Token, bool, error) { + + condition := func(token *tokenItem) bool { + return !token.expired() + } + + fetch := func(ctx context.Context) (*tokenItem, error) { + token, err := newToken(ctx) + if err != nil { + return nil, err + } + return c.newItem(token), nil + } + + opts = append(opts, func(so *storeOptions) error { + so.debugKey = "token" + so.debugValueFunc = func(v any) any { + return map[string]any{ + "duration": v.(*tokenItem).token.GetDuration(), + } + } + return nil + }) + + item, ok, err := c.cache.GetIfOrSet(ctx, key, condition, fetch, opts...) + if err != nil { + return nil, false, err + } + return item.token, ok, nil +} + +// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for +// the associated object being reconciled, given their kind, name and namespace. +func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) { + c.cache.DeleteCacheEvent(event, kind, name, namespace) +} + +func (c *TokenCache) newItem(token Token) *tokenItem { + // Kubelet rotates ServiceAccount tokens when 80% of their lifetime has + // passed, so we'll use the same threshold to consider tokens expired. + // + // Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174 + d := (token.GetDuration() * 8) / 10 + + mono := time.Now().Add(d) + unix := time.Unix(mono.Unix(), 0) + + return &tokenItem{ + token: token, + mono: mono, + unix: unix, + } +} + +func (ti *tokenItem) expired() bool { + now := time.Now() + return !ti.mono.After(now) || !ti.unix.After(now) +} diff --git a/cache/token_test.go b/cache/token_test.go new file mode 100644 index 00000000..e2243756 --- /dev/null +++ b/cache/token_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/fluxcd/pkg/cache" +) + +type testToken struct { + duration time.Duration +} + +func (t *testToken) GetDuration() time.Duration { + return t.duration +} + +func TestTokenCache_Lifecycle(t *testing.T) { + g := NewWithT(t) + + ctx := context.Background() + + tc := cache.NewTokenCache(1) + + token, retrieved, err := tc.GetOrSet(ctx, "test", func(context.Context) (cache.Token, error) { + return &testToken{duration: 2 * time.Second}, nil + }) + g.Expect(token).To(Equal(&testToken{duration: 2 * time.Second})) + g.Expect(retrieved).To(BeFalse()) + g.Expect(err).To(BeNil()) + + time.Sleep(4 * time.Second) + + token, retrieved, err = tc.GetOrSet(ctx, "test", func(context.Context) (cache.Token, error) { + return &testToken{duration: 100 * time.Second}, nil + }) + g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second})) + g.Expect(retrieved).To(BeFalse()) + g.Expect(err).To(BeNil()) + + time.Sleep(2 * time.Second) + + token, retrieved, err = tc.GetOrSet(ctx, "test", func(context.Context) (cache.Token, error) { return nil, nil }) + g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second})) + g.Expect(retrieved).To(BeTrue()) + g.Expect(err).To(BeNil()) +}