diff --git a/cache/involved_object.go b/cache/involved_object.go new file mode 100644 index 00000000..4befc56e --- /dev/null +++ b/cache/involved_object.go @@ -0,0 +1,23 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +type InvolvedObject struct { + Kind string + Name string + Namespace string +} diff --git a/cache/store.go b/cache/store.go index a5207bf9..fa7cfd99 100644 --- a/cache/store.go +++ b/cache/store.go @@ -44,9 +44,19 @@ type Expirable[T any] interface { } type storeOptions struct { - interval time.Duration - registerer prometheus.Registerer - metricsPrefix string + interval time.Duration + registerer prometheus.Registerer + metricsPrefix string + involvedObject *InvolvedObject +} + +func (o *storeOptions) apply(opts ...Options) error { + for _, opt := range opts { + if err := opt(o); err != nil { + return err + } + } + return nil } // Options is a function that sets the store options. @@ -75,3 +85,15 @@ func WithMetricsPrefix(prefix string) Options { return nil } } + +// WithInvolvedObject sets the involved object for the cache metrics. +func WithInvolvedObject(kind, name, namespace string) Options { + return func(o *storeOptions) error { + o.involvedObject = &InvolvedObject{ + Kind: kind, + Name: name, + Namespace: namespace, + } + return nil + } +} diff --git a/cache/token.go b/cache/token.go new file mode 100644 index 00000000..0781e2a4 --- /dev/null +++ b/cache/token.go @@ -0,0 +1,128 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" + "time" +) + +// Token is an interface that represents an access token that can be used +// to authenticate with a cloud provider. The only common method is to get the +// duration of the token, because different providers may have different ways to +// represent the token. For example, Azure and GCP use an opaque string token, +// while AWS uses the pair of access key id and secret access key. Consumers of +// this token should know what type to cast this interface to. +type Token interface { + // GetDuration returns the duration for which the token is valid relative to + // approximately time.Now(). This is used to determine when the token should + // be refreshed. + GetDuration() time.Duration +} + +// TokenCache is a thread-safe cache specialized in storing and retrieving +// access tokens. It uses an LRU cache as the underlying storage and takes +// care of expiring tokens in a pessimistic way by storing both a timestamp +// with a monotonic clock (the Go default) and an absolute timestamp created +// from the Unix timestamp of when the token was created. The token is +// considered expired when either timestamps are older than the current time. +// This strategy ensures that expired tokens aren't kept in the cache for +// longer than their expiration time. Also, tokens expire on 80% of their +// lifetime, which is the same strategy used by kubelet for rotating +// ServiceAccount tokens. +type TokenCache struct { + cache *LRU[*tokenItem] + mu sync.Mutex +} + +type tokenItem struct { + token Token + mono time.Time + unix time.Time +} + +func (ti *tokenItem) expired() bool { + now := time.Now() + return !ti.mono.After(now) || !ti.unix.After(now) +} + +// NewTokenCache returns a new TokenCache with the given capacity. +func NewTokenCache(capacity int, opts ...Options) *TokenCache { + cache, _ := NewLRU[*tokenItem](capacity, opts...) + return &TokenCache{cache: cache} +} + +// GetOrSet returns the token for the given key if present and not expired, or +// calls the newToken function to get a new token and stores it in the cache. +// The operation is thread-safe and atomic. The boolean return value indicates +// whether the token was retrieved from the cache (false) or set by the newToken +// function (true). The involvedObject is used to record cache events for +// reconciliation loops. +func (c *TokenCache) GetOrSet(key string, newToken func() (Token, error), + opts ...Options) (Token, bool, error) { + + var o storeOptions + o.apply(opts...) + obj := o.involvedObject + + c.mu.Lock() + + if item, err := c.cache.Get(key); err == nil && !item.expired() { + c.mu.Unlock() + if obj != nil { + c.cache.RecordCacheEvent(CacheEventTypeHit, obj.Kind, obj.Name, obj.Namespace) + } + return item.token, false, nil + } + + token, err := newToken() + if err != nil { + c.mu.Unlock() + return nil, false, err + } + c.cache.Set(key, c.newTokenItem(token)) + + c.mu.Unlock() + if obj != nil { + c.cache.RecordCacheEvent(CacheEventTypeMiss, obj.Kind, obj.Name, obj.Namespace) + } + + return token, true, nil +} + +// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for +// the associated object being reconciled, given their kind, name and namespace. +func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) { + c.cache.DeleteCacheEvent(event, kind, name, namespace) +} + +func (c *TokenCache) newTokenItem(token Token) *tokenItem { + // Kubelet rotates ServiceAccount tokens when 80% of their lifetime has + // passed, so we'll use the same threshold to consider tokens expired. + // + // Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174 + d := (token.GetDuration() * 8) / 10 + + mono := time.Now().Add(d) + unix := time.Unix(mono.Unix(), 0) + + return &tokenItem{ + token: token, + mono: mono, + unix: unix, + } +} diff --git a/cache/token_test.go b/cache/token_test.go new file mode 100644 index 00000000..efe3b8c9 --- /dev/null +++ b/cache/token_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache_test + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/fluxcd/pkg/cache" +) + +type testToken struct { + duration time.Duration +} + +func (t *testToken) GetDuration() time.Duration { + return t.duration +} + +func TestTokenCache_Lifecycle(t *testing.T) { + g := NewWithT(t) + + tc := cache.NewTokenCache(1) + + token, retrieved, err := tc.GetOrSet("test", func() (cache.Token, error) { + return &testToken{duration: 2 * time.Second}, nil + }) + g.Expect(token).To(Equal(&testToken{duration: 2 * time.Second})) + g.Expect(retrieved).To(BeTrue()) + g.Expect(err).To(BeNil()) + + time.Sleep(4 * time.Second) + + token, retrieved, err = tc.GetOrSet("test", func() (cache.Token, error) { + return &testToken{duration: 100 * time.Second}, nil + }) + g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second})) + g.Expect(retrieved).To(BeTrue()) + g.Expect(err).To(BeNil()) + + time.Sleep(2 * time.Second) + + token, retrieved, err = tc.GetOrSet("test", func() (cache.Token, error) { return nil, nil }) + g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second})) + g.Expect(retrieved).To(BeFalse()) + g.Expect(err).To(BeNil()) +}