diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 679993970..38cc95ff2 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -3,8 +3,11 @@ package k8s import ( "context" "fmt" + "strconv" "strings" + "k8s.io/apimachinery/pkg/labels" + "github.com/spotahome/redis-operator/operator/redisfailover/util" appsv1 "k8s.io/api/apps/v1" @@ -107,6 +110,66 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), // we will replace the current namespace state. statefulSet.ResourceVersion = storedStatefulSet.ResourceVersion + // resize pvc + // 1.Get the data already stored internally + // 2.Get the desired data + // 3.Start querying the pvc list when you find data inconsistencies + // 3.1 Comparison using real pvc capacity and desired data + // 3.1.1 Update if you find inconsistencies + // 3.2 Writing successful updates to internal + // 4. Set to old VolumeClaimTemplates to update.Prevent update error reporting + // 5. Set to old annotations to update + annotations := storedStatefulSet.Annotations + if annotations == nil { + annotations = map[string]string{ + "storageCapacity": "0", + } + } + storedCapacity, _ := strconv.ParseInt(annotations["storageCapacity"], 0, 64) + if len(statefulSet.Spec.VolumeClaimTemplates) != 0 { + stateCapacity := statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().Value() + if storedCapacity != stateCapacity { + rfName := strings.TrimPrefix(storedStatefulSet.Name, "rfr-") + listOpt := metav1.ListOptions{ + LabelSelector: labels.FormatLabels( + map[string]string{ + "app.kubernetes.io/component": "redis", + "app.kubernetes.io/name": strings.TrimPrefix(storedStatefulSet.Name, "rfr-"), + "app.kubernetes.io/part-of": "redis-failover", + }, + ), + } + pvcs, err := s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).List(context.Background(), listOpt) + if err != nil { + return err + } + updateFailed := false + realUpdate := false + for _, pvc := range pvcs.Items { + realCapacity := pvc.Spec.Resources.Requests.Storage().Value() + if realCapacity != stateCapacity { + realUpdate = true + pvc.Spec.Resources.Requests = statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests + _, err = s.kubeClient.CoreV1().PersistentVolumeClaims(storedStatefulSet.Namespace).Update(context.Background(), &pvc, metav1.UpdateOptions{}) + if err != nil { + updateFailed = true + s.logger.WithField("namespace", namespace).WithField("pvc", pvc.Name).Warningf("resize pvc failed:%s", err.Error()) + } + } + } + if !updateFailed && len(pvcs.Items) != 0 { + annotations["storageCapacity"] = fmt.Sprintf("%d", stateCapacity) + storedStatefulSet.Annotations = annotations + if realUpdate { + s.logger.WithField("namespace", namespace).WithField("statefulSet", statefulSet.Name).Infof("resize statefulset pvcs from %d to %d Success", storedCapacity, stateCapacity) + } else { + s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Warningf("set annotations,resize nothing") + } + } + } + } + // set stored.volumeClaimTemplates + statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations) return s.UpdateStatefulSet(namespace, statefulSet) } diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index 57b653710..1d6781767 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -4,6 +4,10 @@ import ( "errors" "testing" + "k8s.io/apimachinery/pkg/api/resource" + + v1 "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -116,4 +120,104 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { } }) } + // test resize pvc + { + t.Run("test_Resize_Pvc", func(t *testing.T) { + assert := assert.New(t) + beforeSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "teststatefulSet1", + ResourceVersion: "10", + }, + Spec: appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + { + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("0.5Gi"), + }, + }, + }, + }, + }, + }, + } + afterSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "teststatefulSet1", + ResourceVersion: "10", + }, + Spec: appsv1.StatefulSetSpec{ + VolumeClaimTemplates: []v1.PersistentVolumeClaim{ + { + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + }, + } + pvcList := &v1.PersistentVolumeClaimList{ + Items: []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app.kubernetes.io/component": "redis", + "app.kubernetes.io/name": "teststatefulSet1", + "app.kubernetes.io/part-of": "redis-failover", + }, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "vol-1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("0.5Gi"), + }, + }, + }, + }, + // resized already + { + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "vol-2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + } + // Mock. + mcli := &kubernetes.Clientset{} + mcli.AddReactor("get", "statefulsets", func(action kubetesting.Action) (bool, runtime.Object, error) { + return true, beforeSts, nil + }) + mcli.AddReactor("list", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + return true, pvcList, nil + }) + mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + // update pvc[0] + pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim) + return true, action.(kubetesting.UpdateActionImpl).Object, nil + }) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) + err := service.CreateOrUpdateStatefulSet(testns, afterSts) + assert.NoError(err) + assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources) + // should not call update + mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + panic("shouldn't call update") + }) + service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) + err = service.CreateOrUpdateStatefulSet(testns, afterSts) + assert.NoError(err) + }) + } }