Skip to content

Commit

Permalink
bugfix: fix cron DataMigrate for Kubernetes 1.20 (fluid-cloudnative#3285
Browse files Browse the repository at this point in the history
)

* Refactor cron DataMigrate for backward compatibility

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Install cronjob resource according to kube version

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Fix cronjob indentation error

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Move discovery of batch api group out of init func

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Move discovery of batch api group out of init func

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Add unit test for func GetCronJobStatus

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Skip discover batch api group when running unit tests

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Add apache licenses

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Use helm function .Capabilities.APIVersions.Has to determine API group compatibility

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

---------

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>
  • Loading branch information
TrafalgarZZZ authored Jun 13, 2023
1 parent 307f8e2 commit 62c8293
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ all: build

# Run tests
test: generate fmt vet
CGO_ENABLED=0 GOOS=linux GOARCH=${ARCH} GO111MODULE=${GO_MODULE} go list ./... | grep -v controller | grep -v e2etest | xargs go test ${CI_TEST_FLAGS} ${LOCAL_FLAGS}
CGO_ENABLED=0 GOOS=linux GOARCH=${ARCH} GO111MODULE=${GO_MODULE} go list ./... | grep -v controller | grep -v e2etest | FLUID_UNIT_TEST=true xargs go test ${CI_TEST_FLAGS} ${LOCAL_FLAGS}

# used in CI and simply ignore controller tests which need k8s now.
# maybe incompatible if more end to end tests are added.
Expand Down
6 changes: 3 additions & 3 deletions charts/fluid-datamigrate/juicefs/templates/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{{- if eq (lower .Values.datamigrate.policy) "cron" }}
apiVersion: batch/v1
apiVersion: {{ ternary "batch/v1" "batch/v1beta1" (.Capabilities.APIVersions.Has "batch/v1/CronJob") }}
kind: CronJob
metadata:
name: {{ printf "%s-migrate" .Release.Name }}
Expand Down Expand Up @@ -100,7 +100,7 @@ spec:
- mountPath: /scripts
name: data-migrate-script
{{- with .Values.datamigrate.nativeVolumeMounts }}
{{ toYaml . | nindent 12 }}
{{ toYaml . | nindent 16 }}
{{- end }}
volumes:
- name: data-migrate-script
Expand All @@ -111,6 +111,6 @@ spec:
path: juicefs_datamigrate.sh
mode: 365
{{- with .Values.datamigrate.nativeVolumes }}
{{ toYaml . | nindent 8 }}
{{ toYaml . | nindent 12 }}
{{- end }}
{{- end }}
8 changes: 5 additions & 3 deletions pkg/controllers/v1alpha1/datamigrate/status_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datamigrate
import (
"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
Expand All @@ -27,6 +28,7 @@ import (
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

type OnceStatusHandler struct {
Expand Down Expand Up @@ -106,7 +108,7 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont
// 1. Check running status of the DataMigrate job
releaseName := utils.GetDataMigrateReleaseName(object.GetName())
cronjobName := utils.GetDataMigrateJobName(releaseName)
cronjob, err := utils.GetDataMigrateCronjob(c.Client, cronjobName, object.GetNamespace())
cronjobStatus, err := kubeclient.GetCronJobStatus(c.Client, types.NamespacedName{Namespace: object.GetNamespace(), Name: cronjobName})

if err != nil {
// helm release found but cronjob missing, delete the helm release and requeue
Expand All @@ -123,7 +125,7 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont
return
}

jobs, err := utils.ListDataMigrateJobByCronjob(c.Client, cronjob)
jobs, err := utils.ListDataMigrateJobByCronjob(c.Client, types.NamespacedName{Namespace: object.GetNamespace(), Name: cronjobName})
if err != nil {
ctx.Log.Error(err, "can't list DataMigrate job by cronjob", "namespace", ctx.Namespace, "cronjobName", cronjobName)
return
Expand All @@ -132,7 +134,7 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont
// get the newest job
var currentJob *batchv1.Job
for _, job := range jobs {
if job.CreationTimestamp == *cronjob.Status.LastScheduleTime || job.CreationTimestamp.After(cronjob.Status.LastScheduleTime.Time) {
if job.CreationTimestamp == *cronjobStatus.LastScheduleTime || job.CreationTimestamp.After(cronjobStatus.LastScheduleTime.Time) {
currentJob = &job
break
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/utils/compatibility/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package compatibility
import (
"log"

"github.com/fluid-cloudnative/fluid/pkg/utils/testutil"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/discovery"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -27,6 +28,14 @@ import (
var batchV1CronJobCompatible = false

func init() {
if testutil.IsUnitTest() {
return
}
discoverBatchAPICompatibility()
}

// DiscoverBatchAPICompatibility discovers compatibility of the batch API group in the cluster and set in batchV1CronJobCompatible variable.
func discoverBatchAPICompatibility() {
restConfig := ctrl.GetConfigOrDie()

discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
Expand Down
19 changes: 3 additions & 16 deletions pkg/utils/datamigrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,16 @@ func GetDataMigrateJob(client client.Client, name, namespace string) (*batchv1.J
return &job, nil
}

// GetDataMigrateCronjob gets the DataMigrate cronjob given its name and namespace
func GetDataMigrateCronjob(client client.Client, name, namespace string) (*batchv1.CronJob, error) {
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
var cronjob batchv1.CronJob
if err := client.Get(context.TODO(), key, &cronjob); err != nil {
return nil, err
}
return &cronjob, nil
}

// ListDataMigrateJobByCronjob gets the DataMigrate job by cronjob given its name and namespace
func ListDataMigrateJobByCronjob(c client.Client, cronjob *batchv1.CronJob) ([]batchv1.Job, error) {
jobLabelSelector, err := labels.Parse(fmt.Sprintf("cronjob=%s", cronjob.Name))
func ListDataMigrateJobByCronjob(c client.Client, cronjobNamespacedName types.NamespacedName) ([]batchv1.Job, error) {
jobLabelSelector, err := labels.Parse(fmt.Sprintf("cronjob=%s", cronjobNamespacedName.Name))
if err != nil {
return nil, err
}
var jobList batchv1.JobList
if err := c.List(context.TODO(), &jobList, &client.ListOptions{
LabelSelector: jobLabelSelector,
Namespace: cronjob.Namespace,
Namespace: cronjobNamespacedName.Namespace,
}); err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/utils/kubeclient/cronjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubeclient

import (
"context"

"github.com/fluid-cloudnative/fluid/pkg/utils/compatibility"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// GetCronJobStatus gets CronJob's status given its namespace and name. It converts batchv1beta1.CronJobStatus
// to batchv1.CronJobStatus when batchv1.CronJob is not supported by the cluster.
func GetCronJobStatus(client client.Client, key types.NamespacedName) (*batchv1.CronJobStatus, error) {
if compatibility.IsBatchV1CronJobSupported() {
var cronjob batchv1.CronJob
if err := client.Get(context.TODO(), key, &cronjob); err != nil {
return nil, err
}
return &cronjob.Status, nil
}

var cronjob batchv1beta1.CronJob
if err := client.Get(context.TODO(), key, &cronjob); err != nil {
return nil, err
}
// Convert batchv1beta1.CronJobStatus to batchv1.CronJobStatus and return
return &batchv1.CronJobStatus{
Active: cronjob.Status.Active,
LastScheduleTime: cronjob.Status.LastScheduleTime,
LastSuccessfulTime: cronjob.Status.LastSuccessfulTime,
}, nil
}
110 changes: 110 additions & 0 deletions pkg/utils/kubeclient/cronjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubeclient

import (
"reflect"
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
"github.com/fluid-cloudnative/fluid/pkg/utils/compatibility"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)

func TestGetCronJobStatus(t *testing.T) {
nowTime := time.Now()
testDate := metav1.NewTime(time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), nowTime.Hour(), 0, 0, 0, nowTime.Location()))

namespace := "default"
testCronJobInputs := []*batchv1.CronJob{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test1",
Namespace: namespace,
},
Status: batchv1.CronJobStatus{
LastScheduleTime: &testDate,
},
},
}

testCronJobs := []runtime.Object{}

for _, cj := range testCronJobInputs {
testCronJobs = append(testCronJobs, cj.DeepCopy())
}

client := fake.NewFakeClientWithScheme(testScheme, testCronJobs...)

type args struct {
key types.NamespacedName
}
tests := []struct {
name string
args args
want *batchv1.CronJobStatus
wantErr bool
}{
{
name: "CronJob exists",
args: args{
key: types.NamespacedName{
Namespace: namespace,
Name: "test1",
},
},
want: &batchv1.CronJobStatus{
LastScheduleTime: &testDate,
},
wantErr: false,
},
{
name: "CronJob exists",
args: args{
key: types.NamespacedName{
Namespace: namespace,
Name: "test-notexist",
},
},
want: nil,
wantErr: true,
},
}

patch := gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool {
return true
})
defer patch.Reset()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetCronJobStatus(client, tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetCronJobStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetCronJobStatus() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/utils/kubeclient/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,6 +26,7 @@ func init() {
testScheme = runtime.NewScheme()
_ = corev1.AddToScheme(testScheme)
_ = appsv1.AddToScheme(testScheme)
_ = batchv1.AddToScheme(testScheme)
}

func TestGetPVCNamesFromPod(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/utils/testutil/unit_test_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2023 The Fluid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package testutil

import "os"

const FluidUnitTestEnv = "FLUID_UNIT_TEST"

func IsUnitTest() bool {
_, exists := os.LookupEnv(FluidUnitTestEnv)
return exists
}

0 comments on commit 62c8293

Please sign in to comment.