From dc0e388acacc729cd5c7901e346c89db57a3803e Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Sat, 22 Feb 2025 19:14:01 +0000 Subject: [PATCH] Add cache specialized for access tokens Signed-off-by: Matheus Pimenta --- cache/token.go | 122 ++++++++++++++++++++++++++++++++++++++++++++ cache/token_test.go | 98 +++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 cache/token.go create mode 100644 cache/token_test.go diff --git a/cache/token.go b/cache/token.go new file mode 100644 index 00000000..bdfe185f --- /dev/null +++ b/cache/token.go @@ -0,0 +1,122 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" + "time" +) + +// Token is an interface that represents an access token that can be used +// to authenticate with a cloud provider. The only common method is to get the +// duration of the token, because different providers may have different ways to +// represent the token. For example, Azure and GCP use an opaque string token, +// while AWS uses the pair of access key id and secret access key. Consumers of +// this token should know what type to cast this interface to. +type Token interface { + // GetDuration returns the duration for which the token is valid relative to + // approximately time.Now(). This is used to determine when the token should + // be refreshed. + GetDuration() time.Duration +} + +// TokenCache is a thread-safe cache specialized in storing and retrieving +// access tokens. It uses an LRU cache as the underlying storage and takes +// care of expiring tokens in a pessimistic way by storing both a timestamp +// with a monotonic clock (the Go default) and an absolute timestamp created +// from the Unix timestamp of when the token was created. The token is +// considered expired when either timestamps are older than the current time. +// This strategy ensures that expired tokens aren't kept in the cache for +// longer than their expiration time. Also, tokens expire on 80% of their +// lifetime, which is the same strategy used by kubelet for rotating +// ServiceAccount tokens. +type TokenCache struct { + cache *LRU[*tokenItem] + mu sync.Mutex +} + +type tokenItem struct { + token Token + mono time.Time + unix time.Time +} + +func newTokenItem(token Token) *tokenItem { + // Kubelet rotates ServiceAccount tokens when 80% of their lifetime has + // passed, so we'll use the same threshold to consider tokens expired. + // + // Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174 + d := (token.GetDuration() * 8) / 10 + + mono := time.Now().Add(d) + unix := time.Unix(mono.Unix(), 0) + + return &tokenItem{ + token: token, + mono: mono, + unix: unix, + } +} + +func (ti *tokenItem) expired() bool { + now := time.Now() + return ti.mono.Before(now) || ti.unix.Before(now) +} + +// NewTokenCache returns a new TokenCache with the given capacity. +func NewTokenCache(capacity int, opts ...Options) *TokenCache { + cache, _ := NewLRU[*tokenItem](capacity, opts...) + return &TokenCache{cache: cache} +} + +// Get returns the token for the given key, or nil if the key is not in the cache. +func (c *TokenCache) Get(key string) Token { + c.mu.Lock() + defer c.mu.Unlock() + + item, err := c.cache.Get(key) + if err != nil { + return nil + } + + if item.expired() { + c.cache.Delete(key) + return nil + } + + return item.token +} + +// Set adds a token to the cache with the given key. +func (c *TokenCache) Set(key string, token Token) { + item := newTokenItem(token) + c.mu.Lock() + c.cache.Set(key, item) + c.mu.Unlock() +} + +// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind, +// name and namespace of the associated object being reconciled. +func (c *TokenCache) RecordCacheEvent(event, kind, name, namespace string) { + c.cache.RecordCacheEvent(event, kind, name, namespace) +} + +// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for +// the associated object being reconciled, given their kind, name and namespace. +func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) { + c.cache.DeleteCacheEvent(event, kind, name, namespace) +} diff --git a/cache/token_test.go b/cache/token_test.go new file mode 100644 index 00000000..08fb773a --- /dev/null +++ b/cache/token_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache_test + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/fluxcd/pkg/cache" +) + +type testToken struct { + duration time.Duration +} + +func (t *testToken) GetDuration() time.Duration { + return t.duration +} + +func TestTokenCache_Lifecycle(t *testing.T) { + g := NewWithT(t) + + tc := cache.NewTokenCache(1) + + retrieved := tc.Get("test") + g.Expect(retrieved).To(BeNil()) + + token := &testToken{duration: 100 * time.Second} + tc.Set("test", token) + retrieved = tc.Get("test") + g.Expect(retrieved).To(Equal(token)) + + token2 := &testToken{duration: 3 * time.Second} + tc.Set("test", token2) + retrieved = tc.Get("test") + g.Expect(retrieved).To(Equal(token2)) + g.Expect(retrieved).NotTo(Equal(token)) + + time.Sleep(3 * time.Second) + retrieved = tc.Get("test") + g.Expect(retrieved).To(BeNil()) +} + +func TestTokenCache_Expiration(t *testing.T) { + for _, tt := range []struct { + name string + tokenDuration time.Duration + sleepDuration time.Duration + expected bool + }{ + { + name: "token does not expire before 80 percent of its duration", + tokenDuration: 5 * time.Second, + sleepDuration: 3 * time.Second, + expected: true, + }, + { + name: "token expires after 80 percent of its duration", + tokenDuration: 1 * time.Second, + sleepDuration: 810 * time.Millisecond, + expected: false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + tc := cache.NewTokenCache(1) + + token := &testToken{duration: tt.tokenDuration} + tc.Set("test", token) + + time.Sleep(tt.sleepDuration) + + retrieved := tc.Get("test") + if tt.expected { + g.Expect(retrieved).NotTo(BeNil()) + } else { + g.Expect(retrieved).To(BeNil()) + } + }) + } +}