Skip to content

Commit

Permalink
[Backend][Multi-user] support multi-user mode for job APIs (kubeflow#…
Browse files Browse the repository at this point in the history
…3384)

* Backend multi-user support for job

* Fix UT

* Clean up unused code.

* cleanup, merge duplicate code
  • Loading branch information
chensun committed Apr 3, 2020
1 parent 1faf721 commit 25ff7be
Show file tree
Hide file tree
Showing 14 changed files with 656 additions and 190 deletions.
1 change: 0 additions & 1 deletion backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//backend/src/apiserver/server:go_default_library",
"//backend/src/apiserver/storage:go_default_library",
"//backend/src/common/util:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_fsnotify_fsnotify//:go_default_library",
"@com_github_golang_glog//:go_default_library",
Expand Down
6 changes: 5 additions & 1 deletion backend/src/apiserver/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ go_library(
"kubernetes_core_fake.go",
"minio.go",
"pod_fake.go",
"scheduled_workflow.go",
"scheduled_workflow_fake.go",
"sql.go",
"swf.go",
"swf_fake.go",
"workflow_fake.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/client",
visibility = ["//visibility:public"],
deps = [
"//backend/src/apiserver/common:go_default_library",
"//backend/src/common/util:go_default_library",
"//backend/src/crd/pkg/apis/scheduledworkflow/v1beta1:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package resource
package client

import (
"errors"

"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)

type FakeScheduledWorkflowClient struct {
workflows map[string]*v1beta1.ScheduledWorkflow
scheduledWorkflows map[string]*v1beta1.ScheduledWorkflow
}

func NewScheduledWorkflowClientFake() *FakeScheduledWorkflowClient {
return &FakeScheduledWorkflowClient{
workflows: make(map[string]*v1beta1.ScheduledWorkflow),
scheduledWorkflows: make(map[string]*v1beta1.ScheduledWorkflow),
}
}

func (c *FakeScheduledWorkflowClient) Create(workflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
workflow.UID = "123"
workflow.Namespace = "default"
workflow.Name = workflow.GenerateName
c.workflows[workflow.Name] = workflow
return workflow, nil
func (c *FakeScheduledWorkflowClient) Create(scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
scheduledWorkflow.UID = "123e4567-e89b-12d3-a456-426655440000"
scheduledWorkflow.Namespace = "ns1"
scheduledWorkflow.Name = scheduledWorkflow.GenerateName
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
return scheduledWorkflow, nil
}

func (c *FakeScheduledWorkflowClient) Delete(name string, options *v1.DeleteOptions) error {
delete(c.workflows, name)
delete(c.scheduledWorkflows, name)
return nil
}

Expand All @@ -52,9 +52,9 @@ func (c *FakeScheduledWorkflowClient) Patch(name string, pt types.PatchType, dat
}

func (c *FakeScheduledWorkflowClient) Get(name string, options v1.GetOptions) (*v1beta1.ScheduledWorkflow, error) {
workflow, ok := c.workflows[name]
scheduledWorkflow, ok := c.scheduledWorkflows[name]
if ok {
return workflow, nil
return scheduledWorkflow, nil
}
return nil, errors.New("not found")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@ import (
"k8s.io/client-go/rest"
)

type SwfClientInterface interface {
ScheduledWorkflow(namespace string) v1beta1.ScheduledWorkflowInterface
}

type SwfClient struct {
swfV1beta1Client v1beta1.ScheduledworkflowV1beta1Interface
}

func (swfClient *SwfClient) ScheduledWorkflow(namespace string) v1beta1.ScheduledWorkflowInterface {
return swfClient.swfV1beta1Client.ScheduledWorkflows(namespace)
}

// creates a new client for the Kubernetes ScheduledWorkflow CRD.
func CreateScheduledWorkflowClientOrFatal(namespace string, initConnectionTimeout time.Duration) v1beta1.ScheduledWorkflowInterface {
var swfClient v1beta1.ScheduledWorkflowInterface
func NewScheduledWorkflowClientOrFatal(initConnectionTimeout time.Duration) *SwfClient {
var swfClient v1beta1.ScheduledworkflowV1beta1Interface
var operation = func() error {
restConfig, err := rest.InClusterConfig()
if err != nil {
return err
}
swfClientSet := swfclient.NewForConfigOrDie(restConfig)
swfClient = swfClientSet.ScheduledworkflowV1beta1().ScheduledWorkflows(namespace)
swfClient = swfClientSet.ScheduledworkflowV1beta1()
return nil
}

Expand All @@ -43,5 +55,5 @@ func CreateScheduledWorkflowClientOrFatal(namespace string, initConnectionTimeou
glog.Fatalf("Failed to create scheduled workflow client. Error: %v", err)
}

return swfClient
return &SwfClient{swfClient}
}
47 changes: 47 additions & 0 deletions backend/src/apiserver/client/swf_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
)

type FakeSwfClient struct {
scheduledWorkflowClientFake *FakeScheduledWorkflowClient
}

func NewFakeSwfClient() *FakeSwfClient {
return &FakeSwfClient{NewScheduledWorkflowClientFake()}
}

func (c *FakeSwfClient) ScheduledWorkflow(namespace string) v1beta1.ScheduledWorkflowInterface {
if len(namespace) == 0 {
panic(util.NewResourceNotFoundError("Namespace", namespace))
}
return c.scheduledWorkflowClientFake
}

type FakeSwfClientWithBadWorkflow struct {
scheduledWorkflowClientFake *FakeBadScheduledWorkflowClient
}

func NewFakeSwfClientWithBadWorkflow() *FakeSwfClientWithBadWorkflow {
return &FakeSwfClientWithBadWorkflow{&FakeBadScheduledWorkflowClient{}}
}

func (c *FakeSwfClientWithBadWorkflow) ScheduledWorkflow(namespace string) v1beta1.ScheduledWorkflowInterface {
return c.scheduledWorkflowClientFake
}
8 changes: 3 additions & 5 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
"github.com/minio/minio-go"
)

Expand Down Expand Up @@ -68,7 +67,7 @@ type ClientManager struct {
defaultExperimentStore storage.DefaultExperimentStoreInterface
objectStore storage.ObjectStoreInterface
argoClient client.ArgoClientInterface
swfClient scheduledworkflowclient.ScheduledWorkflowInterface
swfClient client.SwfClientInterface
k8sCoreClient client.KubernetesCoreInterface
kfamClient client.KFAMClientInterface
time util.TimeInterface
Expand Down Expand Up @@ -111,7 +110,7 @@ func (c *ClientManager) ArgoClient() client.ArgoClientInterface {
return c.argoClient
}

func (c *ClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface {
func (c *ClientManager) SwfClient() client.SwfClientInterface {
return c.swfClient
}

Expand Down Expand Up @@ -152,8 +151,7 @@ func (c *ClientManager) init() {

c.argoClient = client.NewArgoClientOrFatal(common.GetDurationConfig(initConnectionTimeout))

c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
common.GetPodNamespace(), common.GetDurationConfig(initConnectionTimeout))
c.swfClient = client.NewScheduledWorkflowClientOrFatal(common.GetDurationConfig(initConnectionTimeout))

c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout))

Expand Down
2 changes: 0 additions & 2 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
"model_converter.go",
"resource_manager.go",
"resource_manager_util.go",
"scheduled_workflow_fake.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/resource",
visibility = ["//visibility:public"],
Expand All @@ -30,7 +29,6 @@ go_library(
"@io_k8s_apimachinery//pkg/api/errors:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
],
)

Expand Down
65 changes: 32 additions & 33 deletions backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
)

const (
Expand All @@ -28,21 +27,21 @@ const (
)

type FakeClientManager struct {
db *storage.DB
experimentStore storage.ExperimentStoreInterface
pipelineStore storage.PipelineStoreInterface
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
defaultExperimentStore storage.DefaultExperimentStoreInterface
objectStore storage.ObjectStoreInterface
ArgoClientFake *client.FakeArgoClient
scheduledWorkflowClientFake *FakeScheduledWorkflowClient
k8sCoreClientFake *client.FakeKuberneteCoreClient
KfamClientFake client.KFAMClientInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
db *storage.DB
experimentStore storage.ExperimentStoreInterface
pipelineStore storage.PipelineStoreInterface
jobStore storage.JobStoreInterface
runStore storage.RunStoreInterface
resourceReferenceStore storage.ResourceReferenceStoreInterface
dBStatusStore storage.DBStatusStoreInterface
defaultExperimentStore storage.DefaultExperimentStoreInterface
objectStore storage.ObjectStoreInterface
ArgoClientFake *client.FakeArgoClient
swfClientFake *client.FakeSwfClient
k8sCoreClientFake *client.FakeKuberneteCoreClient
KfamClientFake client.KFAMClientInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
}

func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterface) (
Expand All @@ -64,21 +63,21 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf

// TODO(neuromage): Pass in metadata.Store instance for tests as well.
return &FakeClientManager{
db: db,
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time),
ArgoClientFake: client.NewFakeArgoClient(),
resourceReferenceStore: storage.NewResourceReferenceStore(db),
dBStatusStore: storage.NewDBStatusStore(db),
defaultExperimentStore: storage.NewDefaultExperimentStore(db),
objectStore: storage.NewFakeObjectStore(),
scheduledWorkflowClientFake: NewScheduledWorkflowClientFake(),
k8sCoreClientFake: client.NewFakeKuberneteCoresClient(),
KfamClientFake: client.NewFakeKFAMClientAuthorized(),
time: time,
uuid: uuid,
db: db,
experimentStore: storage.NewExperimentStore(db, time, uuid),
pipelineStore: storage.NewPipelineStore(db, time, uuid),
jobStore: storage.NewJobStore(db, time),
runStore: storage.NewRunStore(db, time),
ArgoClientFake: client.NewFakeArgoClient(),
resourceReferenceStore: storage.NewResourceReferenceStore(db),
dBStatusStore: storage.NewDBStatusStore(db),
defaultExperimentStore: storage.NewDefaultExperimentStore(db),
objectStore: storage.NewFakeObjectStore(),
swfClientFake: client.NewFakeSwfClient(),
k8sCoreClientFake: client.NewFakeKuberneteCoresClient(),
KfamClientFake: client.NewFakeKFAMClientAuthorized(),
time: time,
uuid: uuid,
}, nil
}

Expand Down Expand Up @@ -139,8 +138,8 @@ func (f *FakeClientManager) DefaultExperimentStore() storage.DefaultExperimentSt
return f.defaultExperimentStore
}

func (f *FakeClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface {
return f.scheduledWorkflowClientFake
func (f *FakeClientManager) SwfClient() client.SwfClientInterface {
return f.swfClientFake
}

func (f *FakeClientManager) KubernetesCoreClient() client.KubernetesCoreInterface {
Expand Down
Loading

0 comments on commit 25ff7be

Please sign in to comment.