Skip to content

Commit

Permalink
Add cache specialized for access tokens
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Pimenta <matheuscscp@gmail.com>
  • Loading branch information
matheuscscp committed Feb 27, 2025
1 parent e0f6da0 commit 49e912c
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 3 deletions.
23 changes: 23 additions & 0 deletions cache/involved_object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

type InvolvedObject struct {
Kind string
Name string
Namespace string
}
28 changes: 25 additions & 3 deletions cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ type Expirable[T any] interface {
}

type storeOptions struct {
interval time.Duration
registerer prometheus.Registerer
metricsPrefix string
interval time.Duration
registerer prometheus.Registerer
metricsPrefix string
involvedObject *InvolvedObject
}

func (o *storeOptions) apply(opts ...Options) error {
for _, opt := range opts {
if err := opt(o); err != nil {
return err
}
}
return nil
}

// Options is a function that sets the store options.
Expand Down Expand Up @@ -75,3 +85,15 @@ func WithMetricsPrefix(prefix string) Options {
return nil
}
}

// WithInvolvedObject sets the involved object for the cache metrics.
func WithInvolvedObject(kind, name, namespace string) Options {
return func(o *storeOptions) error {
o.involvedObject = &InvolvedObject{
Kind: kind,
Name: name,
Namespace: namespace,
}
return nil
}
}
128 changes: 128 additions & 0 deletions cache/token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"
"time"
)

// Token is an interface that represents an access token that can be used
// to authenticate with a cloud provider. The only common method is to get the
// duration of the token, because different providers may have different ways to
// represent the token. For example, Azure and GCP use an opaque string token,
// while AWS uses the pair of access key id and secret access key. Consumers of
// this token should know what type to cast this interface to.
type Token interface {
// GetDuration returns the duration for which the token is valid relative to
// approximately time.Now(). This is used to determine when the token should
// be refreshed.
GetDuration() time.Duration
}

// TokenCache is a thread-safe cache specialized in storing and retrieving
// access tokens. It uses an LRU cache as the underlying storage and takes
// care of expiring tokens in a pessimistic way by storing both a timestamp
// with a monotonic clock (the Go default) and an absolute timestamp created
// from the Unix timestamp of when the token was created. The token is
// considered expired when either timestamps are older than the current time.
// This strategy ensures that expired tokens aren't kept in the cache for
// longer than their expiration time. Also, tokens expire on 80% of their
// lifetime, which is the same strategy used by kubelet for rotating
// ServiceAccount tokens.
type TokenCache struct {
cache *LRU[*tokenItem]
mu sync.Mutex
}

type tokenItem struct {
token Token
mono time.Time
unix time.Time
}

func (ti *tokenItem) expired() bool {
now := time.Now()
return !ti.mono.After(now) || !ti.unix.After(now)
}

// NewTokenCache returns a new TokenCache with the given capacity.
func NewTokenCache(capacity int, opts ...Options) *TokenCache {
cache, _ := NewLRU[*tokenItem](capacity, opts...)
return &TokenCache{cache: cache}
}

// GetOrSet returns the token for the given key if present and not expired, or
// calls the newToken function to get a new token and stores it in the cache.
// The operation is thread-safe and atomic. The boolean return value indicates
// whether the token was retrieved from the cache (false) or set by the newToken
// function (true). The involvedObject is used to record cache events for
// reconciliation loops.
func (c *TokenCache) GetOrSet(key string, newToken func() (Token, error),
opts ...Options) (Token, bool, error) {

var o storeOptions
o.apply(opts...)
obj := o.involvedObject

c.mu.Lock()

if item, err := c.cache.Get(key); err == nil && !item.expired() {
c.mu.Unlock()
if obj != nil {
c.cache.RecordCacheEvent(CacheEventTypeHit, obj.Kind, obj.Name, obj.Namespace)
}
return item.token, false, nil
}

token, err := newToken()
if err != nil {
c.mu.Unlock()
return nil, false, err
}
c.cache.Set(key, c.newTokenItem(token))

c.mu.Unlock()
if obj != nil {
c.cache.RecordCacheEvent(CacheEventTypeMiss, obj.Kind, obj.Name, obj.Namespace)
}

return token, true, nil
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *TokenCache) DeleteCacheEvent(event, kind, name, namespace string) {
c.cache.DeleteCacheEvent(event, kind, name, namespace)
}

func (c *TokenCache) newTokenItem(token Token) *tokenItem {
// Kubelet rotates ServiceAccount tokens when 80% of their lifetime has
// passed, so we'll use the same threshold to consider tokens expired.
//
// Ref: https://github.com/kubernetes/kubernetes/blob/4032177faf21ae2f99a2012634167def2376b370/pkg/kubelet/token/token_manager.go#L172-L174
d := (token.GetDuration() * 8) / 10

mono := time.Now().Add(d)
unix := time.Unix(mono.Unix(), 0)

return &tokenItem{
token: token,
mono: mono,
unix: unix,
}
}
63 changes: 63 additions & 0 deletions cache/token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2025 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache_test

import (
"testing"
"time"

. "github.com/onsi/gomega"

"github.com/fluxcd/pkg/cache"
)

type testToken struct {
duration time.Duration
}

func (t *testToken) GetDuration() time.Duration {
return t.duration
}

func TestTokenCache_Lifecycle(t *testing.T) {
g := NewWithT(t)

tc := cache.NewTokenCache(1)

token, retrieved, err := tc.GetOrSet("test", func() (cache.Token, error) {
return &testToken{duration: 2 * time.Second}, nil
})
g.Expect(token).To(Equal(&testToken{duration: 2 * time.Second}))
g.Expect(retrieved).To(BeTrue())
g.Expect(err).To(BeNil())

time.Sleep(4 * time.Second)

token, retrieved, err = tc.GetOrSet("test", func() (cache.Token, error) {
return &testToken{duration: 100 * time.Second}, nil
})
g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second}))
g.Expect(retrieved).To(BeTrue())
g.Expect(err).To(BeNil())

time.Sleep(2 * time.Second)

token, retrieved, err = tc.GetOrSet("test", func() (cache.Token, error) { return nil, nil })
g.Expect(token).To(Equal(&testToken{duration: 100 * time.Second}))
g.Expect(retrieved).To(BeFalse())
g.Expect(err).To(BeNil())
}

0 comments on commit 49e912c

Please sign in to comment.