Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicated imports for pkg/apis/kubeflow.org/v1 #1847

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
)

const (

// ReplicaIndexLabel represents the label key for the replica-index, e.g. 0, 1, 2.. etc
ReplicaIndexLabel = "training.kubeflow.org/replica-index"

Expand Down
15 changes: 8 additions & 7 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ import (
"fmt"
"reflect"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/training-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/training-operator/pkg/util"
)

// SatisfiedExpectations returns true if the required adds/dels for the given mxjob have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobKey string, replicaTypes []commonv1.ReplicaType) bool {
func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobKey string, replicaTypes []kubeflowv1.ReplicaType) bool {
satisfied := false
for _, rtype := range replicaTypes {
// Check the expectations of the pods.
Expand All @@ -47,7 +48,7 @@ func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobK
// OnDependentCreateFunc modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func resolveControllerRef(jc *common.JobController, namespace string, controller
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/common/util/reconciler_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"reflect"
"strings"

"github.com/kubeflow/training-operator/pkg/controller.v1/common"
log "github.com/sirupsen/logrus"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -53,7 +53,7 @@ func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry {
// OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent eve
func OnDependentDeleteFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/common/util/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package util
import (
"testing"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
)

func TestOnDependentXXXFunc(t *testing.T) {
Expand All @@ -24,7 +25,7 @@ func TestOnDependentXXXFunc(t *testing.T) {
object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
commonv1.ReplicaTypeLabel: "Worker",
kubeflowv1.ReplicaTypeLabel: "Worker",
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/util/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package util

import commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
import kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, schedulerName string) bool {
func IsGangSchedulerSet(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, schedulerName string) bool {
for _, spec := range replicas {
if spec.Template.Spec.SchedulerName != "" && spec.Template.Spec.SchedulerName == schedulerName {
return true
Expand All @@ -25,7 +25,7 @@ func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
return false
}

func GetSchedulerName(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) string {
func GetSchedulerName(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) string {
for _, spec := range replicas {
if len(spec.Template.Spec.SchedulerName) > 0 {
return spec.Template.Spec.SchedulerName
Expand Down
11 changes: 6 additions & 5 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"fmt"
"time"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
commonutil "github.com/kubeflow/training-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
commonutil "github.com/kubeflow/training-operator/pkg/util"
)

type ObjectFilterFunction func(obj metav1.Object) bool
Expand Down Expand Up @@ -53,16 +54,16 @@ func JobControlledPodList(list []corev1.Pod, job metav1.Object) []*corev1.Pod {
return ret
}

func GetReplicaTypes(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) []commonv1.ReplicaType {
keys := make([]commonv1.ReplicaType, 0, len(specs))
func GetReplicaTypes(specs map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) []kubeflowv1.ReplicaType {
keys := make([]kubeflowv1.ReplicaType, 0, len(specs))
for k := range specs {
keys = append(keys, k)
}
return keys
}

// DurationUntilExpireTime returns the duration until job needs to be cleaned up, or -1 if it's infinite.
func DurationUntilExpireTime(runPolicy *commonv1.RunPolicy, jobStatus commonv1.JobStatus) (time.Duration, error) {
func DurationUntilExpireTime(runPolicy *kubeflowv1.RunPolicy, jobStatus kubeflowv1.JobStatus) (time.Duration, error) {
if !commonutil.IsSucceeded(jobStatus) && !commonutil.IsFailed(jobStatus) {
return -1, nil
}
Expand Down
46 changes: 23 additions & 23 deletions pkg/common/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,79 +22,79 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
)

func TestDurationUntilExpireTime(t *testing.T) {
tests := []struct {
name string
runPolicy *commonv1.RunPolicy
jobStatus commonv1.JobStatus
runPolicy *kubeflowv1.RunPolicy
jobStatus kubeflowv1.JobStatus
want time.Duration
wantErr bool
}{
{
name: "running job",
runPolicy: &commonv1.RunPolicy{},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobRunning)},
runPolicy: &kubeflowv1.RunPolicy{},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobRunning)},
},
want: -1,
wantErr: false,
},
{
name: "succeeded job with remaining time 1s",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: 1,
wantErr: false,
},
{
name: "failed job with remaining time 1s",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobFailed)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobFailed)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: 1,
wantErr: false,
},
{
name: "succeeded job with infinite TTL",
runPolicy: &commonv1.RunPolicy{},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
runPolicy: &kubeflowv1.RunPolicy{},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(4 * time.Second)},
},
want: -1,
wantErr: false,
},
{
name: "succeeded job without remaining time",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
CompletionTime: &metav1.Time{Time: time.Now().Add(6 * time.Second)},
},
want: 0,
wantErr: false,
},
{
name: "succeeded job with nil completion time error",
runPolicy: &commonv1.RunPolicy{
runPolicy: &kubeflowv1.RunPolicy{
TTLSecondsAfterFinished: pointer.Int32(5),
},
jobStatus: commonv1.JobStatus{
Conditions: []commonv1.JobCondition{newJobCondition(commonv1.JobSucceeded)},
jobStatus: kubeflowv1.JobStatus{
Conditions: []kubeflowv1.JobCondition{newJobCondition(kubeflowv1.JobSucceeded)},
},
want: -1,
wantErr: true,
Expand All @@ -116,8 +116,8 @@ func TestDurationUntilExpireTime(t *testing.T) {
}
}

func newJobCondition(t commonv1.JobConditionType) commonv1.JobCondition {
return commonv1.JobCondition{
func newJobCondition(t kubeflowv1.JobConditionType) kubeflowv1.JobCondition {
return kubeflowv1.JobCondition{
Type: t,
Status: corev1.ConditionTrue,
}
Expand Down
40 changes: 19 additions & 21 deletions pkg/controller.v1/mpi/mpijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mpi
import (
"strings"

commonv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -89,25 +88,24 @@ const (
)

// initializeMPIJobStatuses initializes the ReplicaStatuses for MPIJob.
func initializeMPIJobStatuses(mpiJob *kubeflowv1.MPIJob, mtype commonv1.ReplicaType) {
replicaType := commonv1.ReplicaType(mtype)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant type casting.

func initializeMPIJobStatuses(mpiJob *kubeflowv1.MPIJob, rType kubeflowv1.ReplicaType) {
if mpiJob.Status.ReplicaStatuses == nil {
mpiJob.Status.ReplicaStatuses = make(map[commonv1.ReplicaType]*commonv1.ReplicaStatus)
mpiJob.Status.ReplicaStatuses = make(map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaStatus)
}

mpiJob.Status.ReplicaStatuses[replicaType] = &commonv1.ReplicaStatus{}
mpiJob.Status.ReplicaStatuses[rType] = &kubeflowv1.ReplicaStatus{}
}

// updateMPIJobConditions updates the conditions of the given mpiJob.
func updateMPIJobConditions(mpiJob *kubeflowv1.MPIJob, conditionType commonv1.JobConditionType, reason, message string) error {
func updateMPIJobConditions(mpiJob *kubeflowv1.MPIJob, conditionType kubeflowv1.JobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(&mpiJob.Status, condition)
return nil
}

// newCondition creates a new mpiJob condition.
func newCondition(conditionType commonv1.JobConditionType, reason, message string) commonv1.JobCondition {
return commonv1.JobCondition{
func newCondition(conditionType kubeflowv1.JobConditionType, reason, message string) kubeflowv1.JobCondition {
return kubeflowv1.JobCondition{
Type: conditionType,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now(),
Expand All @@ -118,7 +116,7 @@ func newCondition(conditionType commonv1.JobConditionType, reason, message strin
}

// getCondition returns the condition with the provided type.
func getCondition(status commonv1.JobStatus, condType commonv1.JobConditionType) *commonv1.JobCondition {
func getCondition(status kubeflowv1.JobStatus, condType kubeflowv1.JobConditionType) *kubeflowv1.JobCondition {
for _, condition := range status.Conditions {
if condition.Type == condType {
return &condition
Expand All @@ -127,9 +125,9 @@ func getCondition(status commonv1.JobStatus, condType commonv1.JobConditionType)
return nil
}

func isEvicted(status commonv1.JobStatus) bool {
func isEvicted(status kubeflowv1.JobStatus) bool {
for _, condition := range status.Conditions {
if condition.Type == commonv1.JobFailed &&
if condition.Type == kubeflowv1.JobFailed &&
condition.Status == corev1.ConditionTrue &&
condition.Reason == mpiJobEvict {
return true
Expand All @@ -141,7 +139,7 @@ func isEvicted(status commonv1.JobStatus) bool {
// setCondition updates the mpiJob to include the provided condition.
// If the condition that we are about to add already exists
// and has the same status and reason then we are not going to update.
func setCondition(status *commonv1.JobStatus, condition commonv1.JobCondition) {
func setCondition(status *kubeflowv1.JobStatus, condition kubeflowv1.JobCondition) {

currentCond := getCondition(*status, condition.Type)

Expand All @@ -161,13 +159,13 @@ func setCondition(status *commonv1.JobStatus, condition commonv1.JobCondition) {
}

// filterOutCondition returns a new slice of mpiJob conditions without conditions with the provided type.
func filterOutCondition(conditions []commonv1.JobCondition, condType commonv1.JobConditionType) []commonv1.JobCondition {
var newConditions []commonv1.JobCondition
func filterOutCondition(conditions []kubeflowv1.JobCondition, condType kubeflowv1.JobConditionType) []kubeflowv1.JobCondition {
var newConditions []kubeflowv1.JobCondition
for _, c := range conditions {
if condType == commonv1.JobRestarting && c.Type == commonv1.JobRunning {
if condType == kubeflowv1.JobRestarting && c.Type == kubeflowv1.JobRunning {
continue
}
if condType == commonv1.JobRunning && c.Type == commonv1.JobRestarting {
if condType == kubeflowv1.JobRunning && c.Type == kubeflowv1.JobRestarting {
continue
}

Expand All @@ -176,7 +174,7 @@ func filterOutCondition(conditions []commonv1.JobCondition, condType commonv1.Jo
}

// Set the running condition status to be false when current condition failed or succeeded
if (condType == commonv1.JobFailed || condType == commonv1.JobSucceeded) && (c.Type == commonv1.JobRunning || c.Type == commonv1.JobFailed) {
if (condType == kubeflowv1.JobFailed || condType == kubeflowv1.JobSucceeded) && (c.Type == kubeflowv1.JobRunning || c.Type == kubeflowv1.JobFailed) {
c.Status = corev1.ConditionFalse
}

Expand Down Expand Up @@ -242,7 +240,7 @@ func defaultReplicaLabels(genericLabels map[string]string, roleLabelVal string)
replicaLabels[k] = v
}

replicaLabels[commonv1.ReplicaTypeLabel] = roleLabelVal
replicaLabels[kubeflowv1.ReplicaTypeLabel] = roleLabelVal
return replicaLabels
}

Expand Down Expand Up @@ -271,10 +269,10 @@ func workerSelector(genericLabels map[string]string) (labels.Selector, error) {

// initializeReplicaStatuses initializes the ReplicaStatuses for replica.
// originally from pkg/controller.v1/tensorflow/status.go (deleted)
func initializeReplicaStatuses(jobStatus *commonv1.JobStatus, rtype commonv1.ReplicaType) {
func initializeReplicaStatuses(jobStatus *kubeflowv1.JobStatus, rtype kubeflowv1.ReplicaType) {
if jobStatus.ReplicaStatuses == nil {
jobStatus.ReplicaStatuses = make(map[commonv1.ReplicaType]*commonv1.ReplicaStatus)
jobStatus.ReplicaStatuses = make(map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaStatus)
}

jobStatus.ReplicaStatuses[rtype] = &commonv1.ReplicaStatus{}
jobStatus.ReplicaStatuses[rtype] = &kubeflowv1.ReplicaStatus{}
}
Loading