Skip to content

Commit

Permalink
Remove duplicated imports for pkg/apis/kubeflow.org/v1 (#1847)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y authored Jul 4, 2023
1 parent bb628c4 commit 59cc98c
Show file tree
Hide file tree
Showing 46 changed files with 481 additions and 501 deletions.
1 change: 0 additions & 1 deletion pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

const (

// ReplicaIndexLabel represents the label key for the replica-index, e.g. 0, 1, 2.. etc
ReplicaIndexLabel = "training.kubeflow.org/replica-index"

Expand Down
15 changes: 8 additions & 7 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ import (
"fmt"
"reflect"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/training-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/training-operator/pkg/util"
)

// SatisfiedExpectations returns true if the required adds/dels for the given mxjob have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobKey string, replicaTypes []commonv1.ReplicaType) bool {
func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobKey string, replicaTypes []kubeflowv1.ReplicaType) bool {
satisfied := false
for _, rtype := range replicaTypes {
// Check the expectations of the pods.
Expand All @@ -47,7 +48,7 @@ func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobK
// OnDependentCreateFunc modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func resolveControllerRef(jc *common.JobController, namespace string, controller
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/common/util/reconciler_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"reflect"
"strings"

"github.com/kubeflow/training-operator/pkg/controller.v1/common"
log "github.com/sirupsen/logrus"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -53,7 +53,7 @@ func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry {
// OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent eve
func OnDependentDeleteFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/common/util/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package util
import (
"testing"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
)

func TestOnDependentXXXFunc(t *testing.T) {
Expand All @@ -24,7 +25,7 @@ func TestOnDependentXXXFunc(t *testing.T) {
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
commonv1.ReplicaTypeLabel: "Worker",
kubeflowv1.ReplicaTypeLabel: "Worker",
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/util/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package util

import commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
import kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, schedulerName string) bool {
func IsGangSchedulerSet(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, schedulerName string) bool {
for _, spec := range replicas {
if spec.Template.Spec.SchedulerName != "" && spec.Template.Spec.SchedulerName == schedulerName {
return true
Expand All @@ -25,7 +25,7 @@ func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
return false
}

func GetSchedulerName(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) string {
func GetSchedulerName(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) string {
for _, spec := range replicas {
if len(spec.Template.Spec.SchedulerName) > 0 {
return spec.Template.Spec.SchedulerName
Expand Down
11 changes: 6 additions & 5 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"fmt"
"time"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
commonutil "github.com/kubeflow/training-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
commonutil "github.com/kubeflow/training-operator/pkg/util"
)

type ObjectFilterFunction func(obj metav1.Object) bool
Expand Down Expand Up @@ -53,16 +54,16 @@ func JobControlledPodList(list []corev1.Pod, job metav1.Object) []*corev1.Pod {
return ret
}

func GetReplicaTypes(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) []commonv1.ReplicaType {
keys := make([]commonv1.ReplicaType, 0, len(specs))
func GetReplicaTypes(specs map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) []kubeflowv1.ReplicaType {
keys := make([]kubeflowv1.ReplicaType, 0, len(specs))
for k := range specs {
keys = append(keys, k)
}
return keys
}

// DurationUntilExpireTime returns the duration until job needs to be cleaned up, or -1 if it's infinite.
func DurationUntilExpireTime(runPolicy *commonv1.RunPolicy, jobStatus commonv1.JobStatus) (time.Duration, error) {
func DurationUntilExpireTime(runPolicy *kubeflowv1.RunPolicy, jobStatus kubeflowv1.JobStatus) (time.Duration, error) {
if !commonutil.IsSucceeded(jobStatus) && !commonutil.IsFailed(jobStatus) {
return -1, nil
}
Expand Down
46 changes: 23 additions & 23 deletions pkg/common/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,79 +22,79 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
)

func TestDurationUntilExpireTime(t *testing.T) {
tests := []struct {
name string
runPolicy *commonv1.RunPolicy
jobStatus commonv1.JobStatus
runPolicy *kubeflowv1.RunPolicy
jobStatus kubeflowv1.JobStatus
want time.Duration
wantErr bool
}{
{
name: "running job",
runPolicy: &commonv1.RunPolicy{},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobRunning)},
runPolicy: &kubeflowv1.RunPolicy{},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobRunning)},
},
want: -1,
wantErr: false,
},
{
name: "succeeded job with remaining time 1s",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: 1,
wantErr: false,
},
{
name: "failed job with remaining time 1s",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobFailed)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobFailed)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: 1,
wantErr: false,
},
{
name: "succeeded job with infinite TTL",
runPolicy: &commonv1.RunPolicy{},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
runPolicy: &kubeflowv1.RunPolicy{},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: -1,
wantErr: false,
},
{
name: "succeeded job without remaining time",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(6 * time.Second)},
},
want: 0,
wantErr: false,
},
{
name: "succeeded job with nil completion time error",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
},
want: -1,
wantErr: true,
Expand All @@ -116,8 +116,8 @@ func TestDurationUntilExpireTime(t *testing.T) {
}
}

func newJobCondition(t commonv1.JobConditionType) commonv1.JobCondition {
return commonv1.JobCondition{
func newJobCondition(t kubeflowv1.JobConditionType) kubeflowv1.JobCondition {
return kubeflowv1.JobCondition{
Type: t,
Status: corev1.ConditionTrue,
}
Expand Down
40 changes: 19 additions & 21 deletions pkg/controller.v1/mpi/mpijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mpi
import (
"strings"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -89,25 +88,24 @@ const (
)

// initializeMPIJobStatuses initializes the ReplicaStatuses for MPIJob.
func initializeMPIJobStatuses(mpiJob *kubeflowv1.MPIJob, mtype commonv1.ReplicaType) {
replicaType := commonv1.ReplicaType(mtype)
func initializeMPIJobStatuses(mpiJob *kubeflowv1.MPIJob, rType kubeflowv1.ReplicaType) {
if mpiJob.Status.ReplicaStatuses == nil {
mpiJob.Status.ReplicaStatuses = make(map[commonv1.ReplicaType]*commonv1.ReplicaStatus)
mpiJob.Status.ReplicaStatuses = make(map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaStatus)
}

mpiJob.Status.ReplicaStatuses[replicaType] = &commonv1.ReplicaStatus{}
mpiJob.Status.ReplicaStatuses[rType] = &kubeflowv1.ReplicaStatus{}
}

// updateMPIJobConditions updates the conditions of the given mpiJob.
func updateMPIJobConditions(mpiJob *kubeflowv1.MPIJob, conditionType commonv1.JobConditionType, reason, message string) error {
func updateMPIJobConditions(mpiJob *kubeflowv1.MPIJob, conditionType kubeflowv1.JobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(&mpiJob.Status, condition)
return nil
}

// newCondition creates a new mpiJob condition.
func newCondition(conditionType commonv1.JobConditionType, reason, message string) commonv1.JobCondition {
return commonv1.JobCondition{
func newCondition(conditionType kubeflowv1.JobConditionType, reason, message string) kubeflowv1.JobCondition {
return kubeflowv1.JobCondition{
Type: conditionType,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now(),
Expand All @@ -118,7 +116,7 @@ func newCondition(conditionType commonv1.JobConditionType, reason, message strin
}

// getCondition returns the condition with the provided type.
func getCondition(status commonv1.JobStatus, condType commonv1.JobConditionType) *commonv1.JobCondition {
func getCondition(status kubeflowv1.JobStatus, condType kubeflowv1.JobConditionType) *kubeflowv1.JobCondition {
for _, condition := range status.Conditions {
if condition.Type == condType {
return &condition
Expand All @@ -127,9 +125,9 @@ func getCondition(status commonv1.JobStatus, condType commonv1.JobConditionType)
return nil
}

func isEvicted(status commonv1.JobStatus) bool {
func isEvicted(status kubeflowv1.JobStatus) bool {
for _, condition := range status.Conditions {
if condition.Type == commonv1.JobFailed &&
if condition.Type == kubeflowv1.JobFailed &&
condition.Status == corev1.ConditionTrue &&
condition.Reason == mpiJobEvict {
return true
Expand All @@ -141,7 +139,7 @@ func isEvicted(status commonv1.JobStatus) bool {
// setCondition updates the mpiJob to include the provided condition.
// If the condition that we are about to add already exists
// and has the same status and reason then we are not going to update.
func setCondition(status *commonv1.JobStatus, condition commonv1.JobCondition) {
func setCondition(status *kubeflowv1.JobStatus, condition kubeflowv1.JobCondition) {

currentCond := getCondition(*status, condition.Type)

Expand All @@ -161,13 +159,13 @@ func setCondition(status *commonv1.JobStatus, condition commonv1.JobCondition) {
}

// filterOutCondition returns a new slice of mpiJob conditions without conditions with the provided type.
func filterOutCondition(conditions []commonv1.JobCondition, condType commonv1.JobConditionType) []commonv1.JobCondition {
var newConditions []commonv1.JobCondition
func filterOutCondition(conditions []kubeflowv1.JobCondition, condType kubeflowv1.JobConditionType) []kubeflowv1.JobCondition {
var newConditions []kubeflowv1.JobCondition
for _, c := range conditions {
if condType == commonv1.JobRestarting && c.Type == commonv1.JobRunning {
if condType == kubeflowv1.JobRestarting && c.Type == kubeflowv1.JobRunning {
continue
}
if condType == commonv1.JobRunning && c.Type == commonv1.JobRestarting {
if condType == kubeflowv1.JobRunning && c.Type == kubeflowv1.JobRestarting {
continue
}

Expand All @@ -176,7 +174,7 @@ func filterOutCondition(conditions []commonv1.JobCondition, condType commonv1.Jo
}

// Set the running condition status to be false when current condition failed or succeeded
if (condType == commonv1.JobFailed || condType == commonv1.JobSucceeded) && (c.Type == commonv1.JobRunning || c.Type == commonv1.JobFailed) {
if (condType == kubeflowv1.JobFailed || condType == kubeflowv1.JobSucceeded) && (c.Type == kubeflowv1.JobRunning || c.Type == kubeflowv1.JobFailed) {
c.Status = corev1.ConditionFalse
}

Expand Down Expand Up @@ -242,7 +240,7 @@ func defaultReplicaLabels(genericLabels map[string]string, roleLabelVal string)
replicaLabels[k] = v
}

replicaLabels[commonv1.ReplicaTypeLabel] = roleLabelVal
replicaLabels[kubeflowv1.ReplicaTypeLabel] = roleLabelVal
return replicaLabels
}

Expand Down Expand Up @@ -271,10 +269,10 @@ func workerSelector(genericLabels map[string]string) (labels.Selector, error) {

// initializeReplicaStatuses initializes the ReplicaStatuses for replica.
// originally from pkg/controller.v1/tensorflow/status.go (deleted)
func initializeReplicaStatuses(jobStatus *commonv1.JobStatus, rtype commonv1.ReplicaType) {
func initializeReplicaStatuses(jobStatus *kubeflowv1.JobStatus, rtype kubeflowv1.ReplicaType) {
if jobStatus.ReplicaStatuses == nil {
jobStatus.ReplicaStatuses = make(map[commonv1.ReplicaType]*commonv1.ReplicaStatus)
jobStatus.ReplicaStatuses = make(map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaStatus)
}

jobStatus.ReplicaStatuses[rtype] = &commonv1.ReplicaStatus{}
jobStatus.ReplicaStatuses[rtype] = &kubeflowv1.ReplicaStatus{}
}
Loading

0 comments on commit 59cc98c

Please sign in to comment.