Skip to content

Commit

Permalink
capacity: use v1 API exclusively
Browse files Browse the repository at this point in the history
  • Loading branch information
pohly committed Mar 3, 2022
1 parent 8b957ca commit 830dcda
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 40 deletions.
4 changes: 2 additions & 2 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,15 @@ func main() {
capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset.StorageV1beta1().CSIStorageCapacities,
clientset.StorageV1().CSIStorageCapacities,
// Metrics for the queue is available in the default registry.
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
controller,
managedByID,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
factoryForNamespace.Storage().V1beta1().CSIStorageCapacities(),
factoryForNamespace.Storage().V1().CSIStorageCapacities(),
*capacityPollInterval,
*capacityImmediateBinding,
*operationTimeout,
Expand Down
40 changes: 19 additions & 21 deletions pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import (
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
storagev1beta1typed "k8s.io/client-go/kubernetes/typed/storage/v1beta1"
storagev1typed "k8s.io/client-go/kubernetes/typed/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics"
Expand Down Expand Up @@ -81,14 +79,14 @@ type Controller struct {

csiController CSICapacityClient
driverName string
clientFactory func(namespace string) storagev1beta1typed.CSIStorageCapacityInterface
clientFactory func(namespace string) storagev1typed.CSIStorageCapacityInterface
queue workqueue.RateLimitingInterface
owner *metav1.OwnerReference
managedByID string
ownerNamespace string
topologyInformer topology.Informer
scInformer storageinformersv1.StorageClassInformer
cInformer storageinformersv1beta1.CSIStorageCapacityInformer
cInformer storageinformersv1.CSIStorageCapacityInformer
pollPeriod time.Duration
immediateBinding bool
timeout time.Duration
Expand All @@ -98,7 +96,7 @@ type Controller struct {
// have a non-nil pointer. Those get added and updated
// exclusively through the informer event handler to avoid
// races.
capacities map[workItem]*storagev1beta1.CSIStorageCapacity
capacities map[workItem]*storagev1.CSIStorageCapacity
capacitiesLock sync.Mutex
}

Expand All @@ -107,7 +105,7 @@ type workItem struct {
storageClassName string
}

func (w workItem) equals(capacity *storagev1beta1.CSIStorageCapacity) bool {
func (w workItem) equals(capacity *storagev1.CSIStorageCapacity) bool {
return w.storageClassName == capacity.StorageClassName &&
reflect.DeepEqual(w.segment.GetLabelSelector(), capacity.NodeTopology)
}
Expand Down Expand Up @@ -155,14 +153,14 @@ type CSICapacityClient interface {
func NewCentralCapacityController(
csiController CSICapacityClient,
driverName string,
clientFactory func(namespace string) storagev1beta1typed.CSIStorageCapacityInterface,
clientFactory func(namespace string) storagev1typed.CSIStorageCapacityInterface,
queue workqueue.RateLimitingInterface,
owner *metav1.OwnerReference,
managedByID string,
ownerNamespace string,
topologyInformer topology.Informer,
scInformer storageinformersv1.StorageClassInformer,
cInformer storageinformersv1beta1.CSIStorageCapacityInformer,
cInformer storageinformersv1.CSIStorageCapacityInformer,
pollPeriod time.Duration,
immediateBinding bool,
timeout time.Duration,
Expand All @@ -181,7 +179,7 @@ func NewCentralCapacityController(
pollPeriod: pollPeriod,
immediateBinding: immediateBinding,
timeout: timeout,
capacities: map[workItem]*storagev1beta1.CSIStorageCapacity{},
capacities: map[workItem]*storagev1.CSIStorageCapacity{},
}

// Now register for changes. Depending on the implementation of the informers,
Expand Down Expand Up @@ -281,15 +279,15 @@ func (c *Controller) prepare(ctx context.Context) {
klog.V(3).Info("Checking for existing CSIStorageCapacity objects")
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csc, ok := obj.(*storagev1beta1.CSIStorageCapacity)
csc, ok := obj.(*storagev1.CSIStorageCapacity)
if !ok {
klog.Errorf("added object: expected CSIStorageCapacity, got %T -> ignoring it", obj)
return
}
c.onCAddOrUpdate(ctx, csc)
},
UpdateFunc: func(_ interface{}, newObj interface{}) {
csc, ok := newObj.(*storagev1beta1.CSIStorageCapacity)
csc, ok := newObj.(*storagev1.CSIStorageCapacity)
if !ok {
klog.Errorf("updated object: expected CSIStorageCapacity, got %T -> ignoring it", newObj)
return
Expand All @@ -301,7 +299,7 @@ func (c *Controller) prepare(ctx context.Context) {
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
csc, ok := obj.(*storagev1beta1.CSIStorageCapacity)
csc, ok := obj.(*storagev1.CSIStorageCapacity)
if !ok {
klog.Errorf("deleted object: expected CSIStorageCapacity, got %T -> ignoring it", obj)
return
Expand Down Expand Up @@ -529,7 +527,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
switch obj := obj.(type) {
case workItem:
return c.syncCapacity(ctx, obj)
case *storagev1beta1.CSIStorageCapacity:
case *storagev1.CSIStorageCapacity:
return c.deleteCapacity(ctx, obj)
default:
klog.Warningf("unexpected work item %#v", obj)
Expand Down Expand Up @@ -614,7 +612,7 @@ func (c *Controller) syncCapacity(ctx context.Context, item workItem) error {

if capacity == nil {
// Create new object.
capacity = &storagev1beta1.CSIStorageCapacity{
capacity = &storagev1.CSIStorageCapacity{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "csisc-",
Labels: map[string]string{
Expand Down Expand Up @@ -669,7 +667,7 @@ func (c *Controller) syncCapacity(ctx context.Context, item workItem) error {
}

// deleteCapacity ensures that the object is gone when done.
func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1beta1.CSIStorageCapacity) error {
func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1.CSIStorageCapacity) error {
klog.V(5).Infof("Capacity Controller: removing CSIStorageCapacity %s", capacity.Name)
err := c.clientFactory(capacity.Namespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{})
if err != nil && apierrs.IsNotFound(err) {
Expand All @@ -682,7 +680,7 @@ func (c *Controller) deleteCapacity(ctx context.Context, capacity *storagev1beta
// and either remembers the pointer to it for future updates or
// ensures that it gets deleted if no longer needed. Foreign objects
// are ignored.
func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1beta1.CSIStorageCapacity) {
func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1.CSIStorageCapacity) {
if !c.isManaged(capacity) {
// Not ours (anymore?). For the unlikely case that someone removed our owner reference,
// we also must remove our reference to the object.
Expand Down Expand Up @@ -723,7 +721,7 @@ func (c *Controller) onCAddOrUpdate(ctx context.Context, capacity *storagev1beta
c.queue.Add(capacity)
}

func (c *Controller) onCDelete(ctx context.Context, capacity *storagev1beta1.CSIStorageCapacity) {
func (c *Controller) onCDelete(ctx context.Context, capacity *storagev1.CSIStorageCapacity) {
c.capacitiesLock.Lock()
defer c.capacitiesLock.Unlock()
for item, capacity2 := range c.capacities {
Expand Down Expand Up @@ -805,7 +803,7 @@ func (c *Controller) getObjectsObsolete() int64 {
return obsolete
}

func (c *Controller) isObsolete(capacity *storagev1beta1.CSIStorageCapacity) bool {
func (c *Controller) isObsolete(capacity *storagev1.CSIStorageCapacity) bool {
for item, _ := range c.capacities {
if item.equals(capacity) {
return false
Expand All @@ -816,7 +814,7 @@ func (c *Controller) isObsolete(capacity *storagev1beta1.CSIStorageCapacity) boo

// isOwnedByUs implements the same logic as https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1?tab=doc#IsControlledBy,
// just with the expected owner identified directly with the UID.
func (c *Controller) isOwnedByUs(capacity *storagev1beta1.CSIStorageCapacity) bool {
func (c *Controller) isOwnedByUs(capacity *storagev1.CSIStorageCapacity) bool {
for _, owner := range capacity.OwnerReferences {
if owner.Controller != nil && *owner.Controller && owner.UID == c.owner.UID {
return true
Expand All @@ -828,7 +826,7 @@ func (c *Controller) isOwnedByUs(capacity *storagev1beta1.CSIStorageCapacity) bo
// isManaged checks the labels to determine whether this capacity object is managed by
// the controller instance. With server-side filtering via the informer, this
// function becomes a simple safe-guard and should always return true.
func (c *Controller) isManaged(capacity *storagev1beta1.CSIStorageCapacity) bool {
func (c *Controller) isManaged(capacity *storagev1.CSIStorageCapacity) bool {
return capacity.Labels[DriverNameLabel] == c.driverName &&
capacity.Labels[ManagedByLabel] == c.managedByID
}
Expand Down
33 changes: 16 additions & 17 deletions pkg/capacity/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -770,12 +769,12 @@ func TestCapacityController(t *testing.T) {
},
},
modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) {
capacities, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
capacities, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
capacity := capacities.Items[0]
if err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil {
if err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil {
return nil, err
}
expected[0].uid = "CSISC-UID-2"
Expand All @@ -789,7 +788,7 @@ func TestCapacityController(t *testing.T) {
"delete redundant capacity": {
modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) {
capacity := makeCapacity(testCapacity{quantity: "1Gi"})
if _, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}); err != nil {
if _, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}); err != nil {
return nil, err
}
return expected, nil
Expand Down Expand Up @@ -819,7 +818,7 @@ func TestCapacityController(t *testing.T) {
},
},
modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) {
capacities, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
capacities, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
Expand All @@ -828,7 +827,7 @@ func TestCapacityController(t *testing.T) {
// - the now "foreign" object must be left alone
// - an entry must be created anew
capacity.Labels = nil
if _, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).Update(ctx, &capacity, metav1.UpdateOptions{}); err != nil {
if _, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).Update(ctx, &capacity, metav1.UpdateOptions{}); err != nil {
return nil, err
}
expected[0].managedByID = noManager
Expand Down Expand Up @@ -871,19 +870,19 @@ func TestCapacityController(t *testing.T) {
},
},
modify: func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) ([]testCapacity, error) {
capacities, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
capacities, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
capacity := capacities.Items[0]
// Delete and recreate with wrong capacity. This changes the UID while keeping the name
// the same. The capacity then must get corrected by the controller.
if err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil {
if err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).Delete(ctx, capacity.Name, metav1.DeleteOptions{}); err != nil {
return nil, err
}
capacity.UID = "CSISC-UID-2"
capacity.Capacity = &mb
if _, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).Create(ctx, &capacity, metav1.CreateOptions{}); err != nil {
if _, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).Create(ctx, &capacity, metav1.CreateOptions{}); err != nil {
return nil, err
}
expected[0].uid = capacity.UID
Expand Down Expand Up @@ -1188,7 +1187,7 @@ func TestCapacityController(t *testing.T) {
}

func validateCapacities(ctx context.Context, clientSet *fakeclientset.Clientset, expectedCapacities []testCapacity) error {
actualCapacities, err := clientSet.StorageV1beta1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
actualCapacities, err := clientSet.StorageV1().CSIStorageCapacities(ownerNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1307,7 +1306,7 @@ func createCSIStorageCapacityReactor() func(action ktesting.Action) (handled boo
var uidCounter int
var mutex sync.Mutex
return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
s := action.(ktesting.CreateAction).GetObject().(*storagev1beta1.CSIStorageCapacity)
s := action.(ktesting.CreateAction).GetObject().(*storagev1.CSIStorageCapacity)
if s.Name == "" && s.GenerateName != "" {
s.Name = fmt.Sprintf("%s-%s", s.GenerateName, krand.String(16))
}
Expand All @@ -1326,7 +1325,7 @@ func createCSIStorageCapacityReactor() func(action ktesting.Action) (handled boo
// the fake client. Add it with client.PrependReactor to your fake client.
func updateCSIStorageCapacityReactor() func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
s := action.(ktesting.UpdateAction).GetObject().(*storagev1beta1.CSIStorageCapacity)
s := action.(ktesting.UpdateAction).GetObject().(*storagev1.CSIStorageCapacity)
if !strings.HasPrefix(s.ResourceVersion, csiscRev) {
return false, nil, fmt.Errorf("resource version %q should have prefix %s", s.ResourceVersion, csiscRev)
}
Expand All @@ -1347,13 +1346,13 @@ func fakeController(ctx context.Context, client *fakeclientset.Clientset, owner
resyncPeriod := time.Hour
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
scInformer := informerFactory.Storage().V1().StorageClasses()
cInformer := informerFactory.Storage().V1beta1().CSIStorageCapacities()
cInformer := informerFactory.Storage().V1().CSIStorageCapacities()
queue := &rateLimitingQueue{}

c := NewCentralCapacityController(
storage,
driverName,
client.StorageV1beta1().CSIStorageCapacities,
client.StorageV1().CSIStorageCapacities,
queue,
owner,
managedByID,
Expand Down Expand Up @@ -1611,7 +1610,7 @@ func str2quantity(str string) *resource.Quantity {

var capacityCounter int

func makeCapacity(in testCapacity) *storagev1beta1.CSIStorageCapacity {
func makeCapacity(in testCapacity) *storagev1.CSIStorageCapacity {
capacityCounter++
var owners []metav1.OwnerReference
switch in.owner {
Expand All @@ -1635,7 +1634,7 @@ func makeCapacity(in testCapacity) *storagev1beta1.CSIStorageCapacity {
ManagedByLabel: in.managedByID,
}
}
return &storagev1beta1.CSIStorageCapacity{
return &storagev1.CSIStorageCapacity{
ObjectMeta: metav1.ObjectMeta{
UID: in.uid,
ResourceVersion: in.resourceVersion,
Expand Down Expand Up @@ -1994,7 +1993,7 @@ func itemsAsSortedStringSlice(queue *rateLimitingQueue) []string {
switch item := item.(type) {
case workItem:
content = append(content, fmt.Sprintf("%s, %v", item.storageClassName, *item.segment))
case *storagev1beta1.CSIStorageCapacity:
case *storagev1.CSIStorageCapacity:
content = append(content, fmt.Sprintf("csc for %s, %v", item.StorageClassName, item.NodeTopology))
default:
content = append(content, fmt.Sprintf("%v", item))
Expand Down

0 comments on commit 830dcda

Please sign in to comment.