diff --git a/backend/src/apiserver/client/BUILD.bazel b/backend/src/apiserver/client/BUILD.bazel index 2c77ce87295..34f43f794cf 100644 --- a/backend/src/apiserver/client/BUILD.bazel +++ b/backend/src/apiserver/client/BUILD.bazel @@ -18,7 +18,6 @@ go_library( importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/client", visibility = ["//visibility:public"], deps = [ - "//backend/src/apiserver/common:go_default_library", "//backend/src/common/util:go_default_library", "//backend/src/crd/pkg/client/clientset/versioned:go_default_library", "//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library", diff --git a/backend/src/apiserver/client/argo.go b/backend/src/apiserver/client/argo.go index 6fe15530fb2..0fb78360437 100644 --- a/backend/src/apiserver/client/argo.go +++ b/backend/src/apiserver/client/argo.go @@ -21,15 +21,10 @@ import ( argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/cenkalti/backoff" "github.com/golang/glog" - "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/pkg/errors" "k8s.io/client-go/rest" ) -const ( - PodNamespace = "POD_NAMESPACE" -) - type ArgoClientInterface interface { Workflow(namespace string) argoprojv1alpha1.WorkflowInterface } @@ -39,9 +34,6 @@ type ArgoClient struct { } func (argoClient *ArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface { - if len(namespace) == 0 { - namespace = common.GetStringConfig(PodNamespace) - } return argoClient.argoProjClient.Workflows(namespace) } diff --git a/backend/src/apiserver/client/argo_fake.go b/backend/src/apiserver/client/argo_fake.go index 597d9715f87..5a2b212984b 100644 --- a/backend/src/apiserver/client/argo_fake.go +++ b/backend/src/apiserver/client/argo_fake.go @@ -16,6 +16,7 @@ package client import ( argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" ) @@ -28,6 +29,9 @@ func NewFakeArgoClient() *FakeArgoClient { } func (c *FakeArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface { + if len(namespace) == 0 { + panic(util.NewResourceNotFoundError("Namespace", namespace)) + } return c.workflowClientFake } diff --git a/backend/src/apiserver/client/kubernetes_core_fake.go b/backend/src/apiserver/client/kubernetes_core_fake.go index 2c40849ebda..1ea06ac5c08 100644 --- a/backend/src/apiserver/client/kubernetes_core_fake.go +++ b/backend/src/apiserver/client/kubernetes_core_fake.go @@ -15,6 +15,7 @@ package client import ( + "github.com/kubeflow/pipelines/backend/src/common/util" v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -23,6 +24,9 @@ type FakeKuberneteCoreClient struct { } func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface { + if len(namespace) == 0 { + panic(util.NewResourceNotFoundError("Namespace", namespace)) + } return c.podClientFake } diff --git a/backend/src/apiserver/client_manager.go b/backend/src/apiserver/client_manager.go index 81402594870..8528947da9a 100644 --- a/backend/src/apiserver/client_manager.go +++ b/backend/src/apiserver/client_manager.go @@ -149,7 +149,7 @@ func (c *ClientManager) init() { c.argoClient = client.NewArgoClientOrFatal(common.GetDurationConfig(initConnectionTimeout)) c.swfClient = client.CreateScheduledWorkflowClientOrFatal( - common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout)) + common.GetPodNamespace(), common.GetDurationConfig(initConnectionTimeout)) c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout)) @@ -397,7 +397,7 @@ func backfillExperimentIDToRunTable(db *gorm.DB) (retError error) { } _, err = db.CommonDB().Exec(` - UPDATE + UPDATE run_details, resource_references SET run_details.ExperimentUUID = resource_references.ReferenceUUID diff --git a/backend/src/apiserver/common/config.go b/backend/src/apiserver/common/config.go index 5e7b79d259b..b3078254c8f 100644 --- a/backend/src/apiserver/common/config.go +++ b/backend/src/apiserver/common/config.go @@ -23,7 +23,8 @@ import ( ) const ( - MultiUserMode string = "MULTIUSER" + MultiUserMode string = "MULTIUSER" + PodNamespace string = "POD_NAMESPACE" ) func GetStringConfig(configName string) string { @@ -69,3 +70,7 @@ func GetDurationConfig(configName string) time.Duration { func IsMultiUserMode() bool { return GetBoolConfigWithDefault(MultiUserMode, false) } + +func GetPodNamespace() string { + return GetStringConfig(PodNamespace) +} diff --git a/backend/src/apiserver/resource/BUILD.bazel b/backend/src/apiserver/resource/BUILD.bazel index 40e6ce78148..7bcce3a9a48 100644 --- a/backend/src/apiserver/resource/BUILD.bazel +++ b/backend/src/apiserver/resource/BUILD.bazel @@ -53,6 +53,7 @@ go_test( "@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", "@com_github_ghodss_yaml//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_spf13_viper//:go_default_library", "@com_github_stretchr_testify//assert:go_default_library", "@io_bazel_rules_go//proto/wkt:timestamp_go_proto", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 4457adeb8c6..fa0a963ae78 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -304,8 +304,16 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) { } } + namespace := common.GetNamespaceFromAPIResourceReferences(apiRun.ResourceReferences) + if len(namespace) == 0 { + if multiuserMode { + return nil, util.NewInvalidInputError("Run should specify namespace") + } else { + namespace = common.GetPodNamespace() + } + } // Create argo workflow CRD resource - newWorkflow, err := r.getWorkflowClient(common.GetNamespaceFromAPIResourceReferences(apiRun.ResourceReferences)).Create(workflow.Get()) + newWorkflow, err := r.getWorkflowClient(namespace).Create(workflow.Get()) if err != nil { return nil, util.NewInternalServerError(err, "Failed to create a workflow for (%s)", workflow.Name) } @@ -599,6 +607,9 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error } runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId] jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() + if len(workflow.Namespace) == 0 { + return util.NewInvalidInputError("Workflow missing namespace") + } if workflow.PersistedFinalState() { // If workflow's final state has being persisted, the workflow should be garbage collected. @@ -992,5 +1003,15 @@ func (r *ResourceManager) GetNamespaceFromRunID(runId string) (string, error) { return "", util.Wrap(err, "Failed to get namespace from run id.") } namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences) + if len(namespace) == 0 { + if common.IsMultiUserMode() { + // All runs should have namespace in multi user mode. + return "", errors.New("Invalid db data: run doesn't have a namespace resource reference") + } else { + // When db model doesn't have namespace stored (e.g. legacy runs), use + // pod namespace as default. + return common.GetPodNamespace(), nil + } + } return namespace, nil } diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index ae3cefed628..743aa3530ce 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -30,12 +30,17 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" "github.com/pkg/errors" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) +func initEnvVars() { + viper.Set(common.PodNamespace, "test-ns") +} + type FakeBadObjectStore struct{} func (m *FakeBadObjectStore) AddFile(template []byte, filePath string) error { @@ -60,13 +65,14 @@ func (m *FakeBadObjectStore) GetFromYamlFile(o interface{}, filePath string) err var testWorkflow = util.NewWorkflow(&v1alpha1.Workflow{ TypeMeta: v1.TypeMeta{APIVersion: "argoproj.io/v1alpha1", Kind: "Workflow"}, - ObjectMeta: v1.ObjectMeta{Name: "workflow-name", UID: "workflow1"}, + ObjectMeta: v1.ObjectMeta{Name: "workflow-name", UID: "workflow1", Namespace: "test-ns"}, Spec: v1alpha1.WorkflowSpec{Arguments: v1alpha1.Arguments{Parameters: []v1alpha1.Parameter{{Name: "param1"}}}}, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning}, }) // Util function to create an initial state with pipeline uploaded func initWithPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Pipeline) { + initEnvVars() store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) manager := NewResourceManager(store) p, err := manager.CreatePipeline("p1", "", []byte(testWorkflow.ToStringForStore())) @@ -75,6 +81,7 @@ func initWithPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *mode } func initWithExperiment(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Experiment) { + initEnvVars() store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) manager := NewResourceManager(store) experiment := &model.Experiment{Name: "e1"} @@ -84,6 +91,7 @@ func initWithExperiment(t *testing.T) (*FakeClientManager, *ResourceManager, *mo } func initWithExperimentAndPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Experiment, *model.Pipeline) { + initEnvVars() store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) manager := NewResourceManager(store) experiment := &model.Experiment{Name: "e1"} @@ -159,7 +167,8 @@ func initWithOneTimeFailedRun(t *testing.T) (*FakeClientManager, *ResourceManage updatedWorkflow.SetLabels(util.LabelKeyWorkflowRunId, runDetail.UUID) updatedWorkflow.Status.Phase = v1alpha1.NodeFailed updatedWorkflow.Status.Nodes = map[string]v1alpha1.NodeStatus{"node1": {Name: "pod1", Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeFailed}} - manager.ReportWorkflowResource(updatedWorkflow) + err = manager.ReportWorkflowResource(updatedWorkflow) + assert.Nil(t, err) return store, manager, runDetail } @@ -306,6 +315,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) { ExperimentUUID: experiment.UUID, DisplayName: "run1", Name: "workflow-name", + Namespace: "test-ns", StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), CreatedAtInSec: 3, Conditions: "Running", @@ -352,6 +362,7 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) { ExperimentUUID: expectedExperimentUUID, DisplayName: "run1", Name: "workflow-name", + Namespace: "test-ns", StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), CreatedAtInSec: 2, Conditions: "Running", @@ -436,6 +447,7 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) { ExperimentUUID: experiment.UUID, DisplayName: "run1", Name: "workflow-name", + Namespace: "test-ns", StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), CreatedAtInSec: 4, Conditions: "Running", @@ -1113,8 +1125,9 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + Namespace: "test-ns", }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning}, }) @@ -1127,6 +1140,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { ExperimentUUID: expectedExperimentUUID, DisplayName: "run1", Name: "workflow-name", + Namespace: "test-ns", StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), CreatedAtInSec: 2, Conditions: "Running", @@ -1287,20 +1301,22 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { store, manager, run := initWithOneTimeRun(t) + namespace := "kubeflow" defer store.Close() // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - Name: run.Name, - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + Name: run.Name, + Namespace: namespace, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) err := manager.ReportWorkflowResource(workflow) assert.Nil(t, err) - wf, err := store.ArgoClientFake.Workflow("").Get(run.Run.Name, v1.GetOptions{}) + wf, err := store.ArgoClientFake.Workflow(namespace).Get(run.Run.Name, v1.GetOptions{}) assert.Nil(t, err) assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true") } @@ -1311,9 +1327,10 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - Name: run.Name, - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, + Name: run.Name, + Namespace: "test-ns", + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) @@ -1328,9 +1345,10 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFail // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - Name: run.Name, - UID: types.UID(run.UUID), - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, + Name: run.Name, + Namespace: "test-ns", + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, }) diff --git a/backend/src/apiserver/server/BUILD.bazel b/backend/src/apiserver/server/BUILD.bazel index 2ea70466472..b053f6cd331 100644 --- a/backend/src/apiserver/server/BUILD.bazel +++ b/backend/src/apiserver/server/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "@com_github_golang_protobuf//jsonpb:go_default_library_gen", "@com_github_pkg_errors//:go_default_library", "@com_github_robfig_cron//:go_default_library", + "@com_github_spf13_viper//:go_default_library", "@com_github_stretchr_testify//assert:go_default_library", "@io_bazel_rules_go//proto/wkt:empty_go_proto", "@io_bazel_rules_go//proto/wkt:timestamp_go_proto", diff --git a/backend/src/apiserver/server/test_util.go b/backend/src/apiserver/server/test_util.go index a130f1e80c8..420baaabc9d 100644 --- a/backend/src/apiserver/server/test_util.go +++ b/backend/src/apiserver/server/test_util.go @@ -20,9 +20,11 @@ import ( "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/apiserver/client" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/kubeflow/pipelines/backend/src/apiserver/model" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -65,7 +67,13 @@ var validReferencesOfExperimentAndPipelineVersion = []*api.ResourceReference{ }, } +// This automatically runs before all the tests. +func initEnvVars() { + viper.Set(common.PodNamespace, "test-ns") +} + func initWithExperiment(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) { + initEnvVars() clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) resourceManager := resource.NewResourceManager(clientManager) experiment := &model.Experiment{Name: "123"} @@ -75,6 +83,7 @@ func initWithExperiment(t *testing.T) (*resource.FakeClientManager, *resource.Re } func initWithExperiment_KFAM_Unauthorized(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) { + initEnvVars() clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) clientManager.KfamClientFake = client.NewFakeKFAMClientUnauthorized() resourceManager := resource.NewResourceManager(clientManager) @@ -85,6 +94,7 @@ func initWithExperiment_KFAM_Unauthorized(t *testing.T) (*resource.FakeClientMan } func initWithExperimentAndPipelineVersion(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) { + initEnvVars() clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) resourceManager := resource.NewResourceManager(clientManager) @@ -137,6 +147,7 @@ func initWithOneTimeRun(t *testing.T) (*resource.FakeClientManager, *resource.Re // Util function to create an initial state with pipeline uploaded func initWithPipeline(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Pipeline) { + initEnvVars() store := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) manager := resource.NewResourceManager(store) p, err := manager.CreatePipeline("p1", "", []byte(testWorkflow.ToStringForStore()))