Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend] Use pod namespace as default when not in multi user mode #3009

Merged
merged 5 commits into from
Feb 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/src/apiserver/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/client",
visibility = ["//visibility:public"],
deps = [
"//backend/src/apiserver/common:go_default_library",
"//backend/src/common/util:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library",
Expand Down
8 changes: 0 additions & 8 deletions backend/src/apiserver/client/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ import (
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/pkg/errors"
"k8s.io/client-go/rest"
)

const (
PodNamespace = "POD_NAMESPACE"
)

type ArgoClientInterface interface {
Workflow(namespace string) argoprojv1alpha1.WorkflowInterface
}
Expand All @@ -39,9 +34,6 @@ type ArgoClient struct {
}

func (argoClient *ArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface {
if len(namespace) == 0 {
namespace = common.GetStringConfig(PodNamespace)
}
return argoClient.argoProjClient.Workflows(namespace)
}

Expand Down
4 changes: 4 additions & 0 deletions backend/src/apiserver/client/argo_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client

import (
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/pkg/errors"
)

Expand All @@ -28,6 +29,9 @@ func NewFakeArgoClient() *FakeArgoClient {
}

func (c *FakeArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface {
if len(namespace) == 0 {
panic(util.NewResourceNotFoundError("Namespace", namespace))
}
return c.workflowClientFake
}

Expand Down
4 changes: 4 additions & 0 deletions backend/src/apiserver/client/kubernetes_core_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package client

import (
"github.com/kubeflow/pipelines/backend/src/common/util"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

Expand All @@ -23,6 +24,9 @@ type FakeKuberneteCoreClient struct {
}

func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
if len(namespace) == 0 {
panic(util.NewResourceNotFoundError("Namespace", namespace))
}
return c.podClientFake
}

Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *ClientManager) init() {
c.argoClient = client.NewArgoClientOrFatal(common.GetDurationConfig(initConnectionTimeout))

c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout))
common.GetPodNamespace(), common.GetDurationConfig(initConnectionTimeout))

c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout))

Expand Down Expand Up @@ -397,7 +397,7 @@ func backfillExperimentIDToRunTable(db *gorm.DB) (retError error) {
}

_, err = db.CommonDB().Exec(`
UPDATE
UPDATE
run_details, resource_references
SET
run_details.ExperimentUUID = resource_references.ReferenceUUID
Expand Down
7 changes: 6 additions & 1 deletion backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
)

const (
MultiUserMode string = "MULTIUSER"
MultiUserMode string = "MULTIUSER"
PodNamespace string = "POD_NAMESPACE"
)

func GetStringConfig(configName string) string {
Expand Down Expand Up @@ -69,3 +70,7 @@ func GetDurationConfig(configName string) time.Duration {
func IsMultiUserMode() bool {
return GetBoolConfigWithDefault(MultiUserMode, false)
}

func GetPodNamespace() string {
return GetStringConfig(PodNamespace)
}
1 change: 1 addition & 0 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_ghodss_yaml//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@io_bazel_rules_go//proto/wkt:timestamp_go_proto",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
Expand Down
23 changes: 22 additions & 1 deletion backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,16 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
}
}

namespace := common.GetNamespaceFromAPIResourceReferences(apiRun.ResourceReferences)
if len(namespace) == 0 {
if multiuserMode {
return nil, util.NewInvalidInputError("Run should specify namespace")
} else {
namespace = common.GetPodNamespace()
}
}
// Create argo workflow CRD resource
newWorkflow, err := r.getWorkflowClient(common.GetNamespaceFromAPIResourceReferences(apiRun.ResourceReferences)).Create(workflow.Get())
newWorkflow, err := r.getWorkflowClient(namespace).Create(workflow.Get())
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a workflow for (%s)", workflow.Name)
}
Expand Down Expand Up @@ -599,6 +607,9 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error
}
runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]
jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty()
if len(workflow.Namespace) == 0 {
return util.NewInvalidInputError("Workflow missing namespace")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?
Are we sure this would work for "default" Kubernetes namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed. But I think it is clearer, because I think persistence agent is the only instance calling this. And it gets data using workflow client, so it will always carry namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified retry works. That includes coverage for persistence agent. Also, e2e tests pass.

}

if workflow.PersistedFinalState() {
// If workflow's final state has being persisted, the workflow should be garbage collected.
Expand Down Expand Up @@ -992,5 +1003,15 @@ func (r *ResourceManager) GetNamespaceFromRunID(runId string) (string, error) {
return "", util.Wrap(err, "Failed to get namespace from run id.")
}
namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences)
if len(namespace) == 0 {
if common.IsMultiUserMode() {
// All runs should have namespace in multi user mode.
return "", errors.New("Invalid db data: run doesn't have a namespace resource reference")
} else {
// When db model doesn't have namespace stored (e.g. legacy runs), use
// pod namespace as default.
return common.GetPodNamespace(), nil
}
}
return namespace, nil
}
46 changes: 32 additions & 14 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/pkg/errors"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

func initEnvVars() {
viper.Set(common.PodNamespace, "test-ns")
}

type FakeBadObjectStore struct{}

func (m *FakeBadObjectStore) AddFile(template []byte, filePath string) error {
Expand All @@ -60,13 +65,14 @@ func (m *FakeBadObjectStore) GetFromYamlFile(o interface{}, filePath string) err

var testWorkflow = util.NewWorkflow(&v1alpha1.Workflow{
TypeMeta: v1.TypeMeta{APIVersion: "argoproj.io/v1alpha1", Kind: "Workflow"},
ObjectMeta: v1.ObjectMeta{Name: "workflow-name", UID: "workflow1"},
ObjectMeta: v1.ObjectMeta{Name: "workflow-name", UID: "workflow1", Namespace: "test-ns"},
Spec: v1alpha1.WorkflowSpec{Arguments: v1alpha1.Arguments{Parameters: []v1alpha1.Parameter{{Name: "param1"}}}},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning},
})

// Util function to create an initial state with pipeline uploaded
func initWithPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Pipeline) {
initEnvVars()
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
manager := NewResourceManager(store)
p, err := manager.CreatePipeline("p1", "", []byte(testWorkflow.ToStringForStore()))
Expand All @@ -75,6 +81,7 @@ func initWithPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *mode
}

func initWithExperiment(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Experiment) {
initEnvVars()
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
manager := NewResourceManager(store)
experiment := &model.Experiment{Name: "e1"}
Expand All @@ -84,6 +91,7 @@ func initWithExperiment(t *testing.T) (*FakeClientManager, *ResourceManager, *mo
}

func initWithExperimentAndPipeline(t *testing.T) (*FakeClientManager, *ResourceManager, *model.Experiment, *model.Pipeline) {
initEnvVars()
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
manager := NewResourceManager(store)
experiment := &model.Experiment{Name: "e1"}
Expand Down Expand Up @@ -159,7 +167,8 @@ func initWithOneTimeFailedRun(t *testing.T) (*FakeClientManager, *ResourceManage
updatedWorkflow.SetLabels(util.LabelKeyWorkflowRunId, runDetail.UUID)
updatedWorkflow.Status.Phase = v1alpha1.NodeFailed
updatedWorkflow.Status.Nodes = map[string]v1alpha1.NodeStatus{"node1": {Name: "pod1", Type: v1alpha1.NodeTypePod, Phase: v1alpha1.NodeFailed}}
manager.ReportWorkflowResource(updatedWorkflow)
err = manager.ReportWorkflowResource(updatedWorkflow)
assert.Nil(t, err)
return store, manager, runDetail
}

Expand Down Expand Up @@ -306,6 +315,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
ExperimentUUID: experiment.UUID,
DisplayName: "run1",
Name: "workflow-name",
Namespace: "test-ns",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
CreatedAtInSec: 3,
Conditions: "Running",
Expand Down Expand Up @@ -352,6 +362,7 @@ func TestCreateRun_ThroughWorkflowSpec(t *testing.T) {
ExperimentUUID: expectedExperimentUUID,
DisplayName: "run1",
Name: "workflow-name",
Namespace: "test-ns",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
CreatedAtInSec: 2,
Conditions: "Running",
Expand Down Expand Up @@ -436,6 +447,7 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) {
ExperimentUUID: experiment.UUID,
DisplayName: "run1",
Name: "workflow-name",
Namespace: "test-ns",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
CreatedAtInSec: 4,
Conditions: "Running",
Expand Down Expand Up @@ -1113,8 +1125,9 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID},
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID},
Namespace: "test-ns",
},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning},
})
Expand All @@ -1127,6 +1140,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) {
ExperimentUUID: expectedExperimentUUID,
DisplayName: "run1",
Name: "workflow-name",
Namespace: "test-ns",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
CreatedAtInSec: 2,
Conditions: "Running",
Expand Down Expand Up @@ -1287,20 +1301,22 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success

func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) {
store, manager, run := initWithOneTimeRun(t)
namespace := "kubeflow"
defer store.Close()
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: run.Name,
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID},
Name: run.Name,
Namespace: namespace,
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID},
},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed},
})
err := manager.ReportWorkflowResource(workflow)
assert.Nil(t, err)

wf, err := store.ArgoClientFake.Workflow("").Get(run.Run.Name, v1.GetOptions{})
wf, err := store.ArgoClientFake.Workflow(namespace).Get(run.Run.Name, v1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true")
}
Expand All @@ -1311,9 +1327,10 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: run.Name,
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"},
Name: run.Name,
Namespace: "test-ns",
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed},
})
Expand All @@ -1328,9 +1345,10 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFail
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: run.Name,
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"},
Name: run.Name,
Namespace: "test-ns",
UID: types.UID(run.UUID),
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed},
})
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_pkg_errors//:go_default_library",
"@com_github_robfig_cron//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",
"@io_bazel_rules_go//proto/wkt:timestamp_go_proto",
Expand Down
11 changes: 11 additions & 0 deletions backend/src/apiserver/server/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,7 +67,13 @@ var validReferencesOfExperimentAndPipelineVersion = []*api.ResourceReference{
},
}

// This automatically runs before all the tests.
func initEnvVars() {
viper.Set(common.PodNamespace, "test-ns")
}

func initWithExperiment(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) {
initEnvVars()
clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
resourceManager := resource.NewResourceManager(clientManager)
experiment := &model.Experiment{Name: "123"}
Expand All @@ -75,6 +83,7 @@ func initWithExperiment(t *testing.T) (*resource.FakeClientManager, *resource.Re
}

func initWithExperiment_KFAM_Unauthorized(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) {
initEnvVars()
clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
clientManager.KfamClientFake = client.NewFakeKFAMClientUnauthorized()
resourceManager := resource.NewResourceManager(clientManager)
Expand All @@ -85,6 +94,7 @@ func initWithExperiment_KFAM_Unauthorized(t *testing.T) (*resource.FakeClientMan
}

func initWithExperimentAndPipelineVersion(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Experiment) {
initEnvVars()
clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
resourceManager := resource.NewResourceManager(clientManager)

Expand Down Expand Up @@ -137,6 +147,7 @@ func initWithOneTimeRun(t *testing.T) (*resource.FakeClientManager, *resource.Re

// Util function to create an initial state with pipeline uploaded
func initWithPipeline(t *testing.T) (*resource.FakeClientManager, *resource.ResourceManager, *model.Pipeline) {
initEnvVars()
store := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
manager := resource.NewResourceManager(store)
p, err := manager.CreatePipeline("p1", "", []byte(testWorkflow.ToStringForStore()))
Expand Down