From aa86429bc2ffd9665aa34fb8fb83d4e4c5ae499b Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 11 Sep 2024 11:31:09 -0700 Subject: [PATCH] Add expectation lib to allows us to set and wait on expectations --- .../util/expectation/expectation.go | 233 ++++++++++++++++++ .../util/expectation/expectation_test.go | 178 +++++++++++++ 2 files changed, 411 insertions(+) create mode 100644 pkg/controller/util/expectation/expectation.go create mode 100644 pkg/controller/util/expectation/expectation_test.go diff --git a/pkg/controller/util/expectation/expectation.go b/pkg/controller/util/expectation/expectation.go new file mode 100644 index 00000000..50dde485 --- /dev/null +++ b/pkg/controller/util/expectation/expectation.go @@ -0,0 +1,233 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: The code is synced from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/controller_utils.go + +package expectation + +import ( + "fmt" + "sync/atomic" + "time" + + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + // If a watch drops a delete event for a pod, it'll take this long + // before a dormant controller waiting for those packets is woken up anyway. It is + // specifically targeted at the case where some problem prevents an update + // of expectations, without it the controller could stay asleep forever. This should + // be set based on the expected latency of watch events. + // + // Currently a controller can service (create *and* observe the watch events for said + // creation) about 10 pods a second, so it takes about 1 min to service + // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s + // latency/pod at the scale of 3000 pods over 100 nodes. + ExpectationsTimeout = 5 * time.Minute +) + +// Expectations are a way for controllers to tell the controller manager what they expect. eg: +// ControllerExpectations: { +// controller1: expects 2 adds in 2 minutes +// controller2: expects 2 dels in 2 minutes +// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met +// } +// +// Implementation: +// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion +// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller +// +// * Once set expectations can only be lowered +// * A controller isn't synced till its expectations are either fulfilled, or expire +// * Controllers that don't set expectations will get woken up for every matching controllee + +// ExpKeyFunc to parse out the key from a ControlleeExpectation +var ExpKeyFunc = func(obj interface{}) (string, error) { + if e, ok := obj.(*ControlleeExpectations); ok { + return e.key, nil + } + return "", fmt.Errorf("could not find key for obj %#v", obj) +} + +// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations. +// Only abstracted out for testing. +// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different +// types of controllers, because the keys might conflict across types. +type ControllerExpectationsInterface interface { + GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) + SatisfiedExpectations(controllerKey string) bool + DeleteExpectations(controllerKey string) + SetExpectations(controllerKey string, add, del int) error + ExpectCreations(controllerKey string, adds int) error + ExpectDeletions(controllerKey string, dels int) error + CreationObserved(controllerKey string) + DeletionObserved(controllerKey string) + RaiseExpectations(controllerKey string, add, del int) + LowerExpectations(controllerKey string, add, del int) +} + +// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. +type ControllerExpectations struct { + cache.Store +} + +// GetExpectations returns the ControlleeExpectations of the given controller. +func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) { + exp, exists, err := r.GetByKey(controllerKey) + if err == nil && exists { + return exp.(*ControlleeExpectations), true, nil + } + return nil, false, err +} + +// DeleteExpectations deletes the expectations of the given controller from the TTLStore. +func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { + if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { + if err := r.Delete(exp); err != nil { + + klog.V(2).Info("Error deleting expectations", "controller", controllerKey, "err", err) + } + } +} + +// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. +// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller +// manager. +func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { + if exp, exists, err := r.GetExpectations(controllerKey); exists { + if exp.Fulfilled() { + klog.V(4).Info("Controller expectations fulfilled", "expectations", exp) + return true + } else if exp.isExpired() { + klog.V(4).Info("Controller expectations expired", "expectations", exp) + return true + } else { + klog.V(4).Info("Controller still waiting on expectations", "expectations", exp) + return false + } + } else if err != nil { + klog.V(2).Info("Error encountered while checking expectations, forcing sync", "err", err) + } else { + // When a new controller is created, it doesn't have expectations. + // When it doesn't see expected watch events for > TTL, the expectations expire. + // - In this case it wakes up, creates/deletes controllees, and sets expectations again. + // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. + // - In this case it continues without setting expectations till it needs to create/delete controllees. + klog.V(4).Info("Controller either never recorded expectations, or the ttl expired", "controller", controllerKey) + } + // Trigger a sync if we either encountered and error (which shouldn't happen since we're + // getting from local store) or this controller hasn't established expectations. + return true +} + +// TODO: Extend ExpirationCache to support explicit expiration. +// TODO: Make this possible to disable in tests. +// TODO: Support injection of clock. +func (exp *ControlleeExpectations) isExpired() bool { + return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout +} + +// SetExpectations registers new expectations for the given controller. Forgets existing expectations. +func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { + exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} + klog.V(4).Info("Setting expectations", "expectations", exp) + return r.Add(exp) +} + +func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { + return r.SetExpectations(controllerKey, adds, 0) +} + +func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { + return r.SetExpectations(controllerKey, 0, dels) +} + +// Decrements the expectation counts of the given controller. +func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(-add), int64(-del)) + // The expectations might've been modified since the update on the previous line. + klog.V(4).Info("Lowered expectations", "expectations", exp) + } +} + +// Increments the expectation counts of the given controller. +func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(add), int64(del)) + // The expectations might've been modified since the update on the previous line. + klog.V(4).Info("Raised expectations", "expectations", exp) + } +} + +// CreationObserved atomically decrements the `add` expectation count of the given controller. +func (r *ControllerExpectations) CreationObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 1, 0) +} + +// DeletionObserved atomically decrements the `del` expectation count of the given controller. +func (r *ControllerExpectations) DeletionObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 0, 1) +} + +// ControlleeExpectations track controllee creates/deletes. +type ControlleeExpectations struct { + // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms + // See: https://golang.org/pkg/sync/atomic/ for more information + add int64 + del int64 + key string + timestamp time.Time +} + +// Add increments the add and del counters. +func (e *ControlleeExpectations) Add(add, del int64) { + atomic.AddInt64(&e.add, add) + atomic.AddInt64(&e.del, del) +} + +// Fulfilled returns true if this expectation has been fulfilled. +func (e *ControlleeExpectations) Fulfilled() bool { + // TODO: think about why this line being atomic doesn't matter + return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 +} + +// GetExpectations returns the add and del expectations of the controllee. +func (e *ControlleeExpectations) GetExpectations() (int64, int64) { + return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) +} + +// MarshalLog makes a thread-safe copy of the values of the expectations that +// can be used for logging. +func (e *ControlleeExpectations) MarshalLog() interface{} { + return struct { + add int64 + del int64 + key string + }{ + add: atomic.LoadInt64(&e.add), + del: atomic.LoadInt64(&e.del), + key: e.key, + } +} + +// NewControllerExpectations returns a store for ControllerExpectations. +func NewControllerExpectations() *ControllerExpectations { + return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} +} diff --git a/pkg/controller/util/expectation/expectation_test.go b/pkg/controller/util/expectation/expectation_test.go new file mode 100644 index 00000000..8953e00c --- /dev/null +++ b/pkg/controller/util/expectation/expectation_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: The code is synced from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/controller_utils_test.go +// with minor changes to fix linter issues. + +package expectation + +import ( + "sync" + "testing" + "time" + + "k8s.io/utils/ptr" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/tools/cache" + testingclock "k8s.io/utils/clock/testing" +) + +var ( + // KeyFunc is the short name to DeletionHandlingMetaNamespaceKeyFunc. + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function but it should be just fine for non delete events. + KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + +// NewFakeControllerExpectationsLookup creates a fake store for PodExpectations. +func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *testingclock.FakeClock) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := testingclock.NewFakeClock(fakeTime) + ttlPolicy := &cache.TTLPolicy{TTL: ttl, Clock: fakeClock} + ttlStore := cache.NewFakeExpirationStore( + ExpKeyFunc, nil, ttlPolicy, fakeClock) + return &ControllerExpectations{ttlStore}, fakeClock +} + +func newReplicationController(replicas int) *v1.ReplicationController { + rc := &v1.ReplicationController{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: v1.ReplicationControllerSpec{ + Replicas: ptr.To[int32](int32(replicas)), + Selector: map[string]string{"foo": "bar"}, + Template: &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: "foo/bar", + TerminationMessagePath: v1.TerminationMessagePathDefault, + ImagePullPolicy: v1.PullIfNotPresent, + // skip the security context setting to avoid importing k8s.io/kubernetes/pkg/securitycontext + //SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + RestartPolicy: v1.RestartPolicyAlways, + DNSPolicy: v1.DNSDefault, + NodeSelector: map[string]string{ + "baz": "blah", + }, + }, + }, + }, + } + return rc +} + +func TestControllerExpectations(t *testing.T) { + ttl := 30 * time.Second + e, fakeClock := NewFakeControllerExpectationsLookup(ttl) + // In practice we can't really have add and delete expectations since we only either create or + // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are + // either fulfilled or timeout. + adds, dels := 10, 30 + rc := newReplicationController(1) + + // RC fires off adds and deletes at apiserver, then sets expectations + rcKey, err := KeyFunc(rc) + require.NoError(t, err, "Couldn't get key for object %#v: %v", rc, err) + + err = e.SetExpectations(rcKey, adds, dels) + assert.Nil(t, err) + var wg sync.WaitGroup + for i := 0; i < adds+1; i++ { + wg.Add(1) + go func() { + // In prod this can happen either because of a failed create by the rc + // or after having observed a create via informer + e.CreationObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + // There are still delete expectations + assert.False(t, e.SatisfiedExpectations(rcKey), "Rc will sync before expectations are met") + + for i := 0; i < dels+1; i++ { + wg.Add(1) + go func() { + e.DeletionObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + tests := []struct { + name string + expectationsToSet []int + expireExpectations bool + wantPodExpectations []int64 + wantExpectationsSatisfied bool + }{ + { + name: "Expectations have been surpassed", + expireExpectations: false, + wantPodExpectations: []int64{int64(-1), int64(-1)}, + wantExpectationsSatisfied: true, + }, + { + name: "Old expectations are cleared because of ttl", + expectationsToSet: []int{1, 2}, + expireExpectations: true, + wantPodExpectations: []int64{int64(1), int64(2)}, + wantExpectationsSatisfied: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if len(test.expectationsToSet) > 0 { + err = e.SetExpectations(rcKey, test.expectationsToSet[0], test.expectationsToSet[1]) + assert.Nil(t, err) + } + podExp, exists, err := e.GetExpectations(rcKey) + require.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err) + assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err) + + add, del := podExp.GetExpectations() + assert.Equal(t, test.wantPodExpectations[0], add, "Unexpected pod expectations %#v", podExp) + assert.Equal(t, test.wantPodExpectations[1], del, "Unexpected pod expectations %#v", podExp) + assert.Equal(t, test.wantExpectationsSatisfied, e.SatisfiedExpectations(rcKey), "Expectations are met but the rc will not sync") + + if test.expireExpectations { + fakeClock.Step(ttl + 1) + assert.True(t, e.SatisfiedExpectations(rcKey), "Expectations should have expired but didn't") + } + }) + } +}