Skip to content

Commit

Permalink
Separate run resources in namespaces (kubeflow#2694)
Browse files Browse the repository at this point in the history
* add namespace to some run APIs

* update only the create run api

* add resourcereference for namespace runs

* pass user identity header from the gRPC server to KFP service

* add variables in const

* declare a flag and fill in the authorizations

* add types to toModel func

* bug fix

* strip the namespace resource reference when mapping to the db model

* add unit tests

* add authorization

* interpret json response

* use gofmt

* add more meaningful error message; format

* refactoring codes

* separate workflow client

* replace belonging relationshipreference to owner

* put a todo for further investigation of using namespace or uuid

* apply gofmt

* revert minor change

* refactor codes

* minor change

* use internal server error in kfam client

* minor change

* use timeout in kfam client

* make kfam service host/port configurable

* minor changes

* update name

* rename

* update the util function to accept a list of resourcereferences

* better error message

* reformat

* remove IsRequestAuthorized func

* add multi-user mode flag

* apply different service accounts based on the multi-user mode flag

* apply service account only when it is not set

* add kfam host and port in config.json

* generalize the auth code

* rename KFAMInterface to KFAMClientInterface

* add kfam fake for tests

* add build bazel

* add unit tests for util func

* remove the config

* add unit test for authorization with httptest

* only intialize the kfam client when kubeflow deployment

* minor change

* fix typo

* wrap the whole auth func

* update authz logic to be enabled when it is kubeflow deployment

* change flag from kubeflow deployment to multiuser mode

* gofmt

* minor change

* combine getnamespace func

* insert annotation to disable istio injection

* move unit tests

* move fake kfam to the original kfam; create multiple fake kfam clients

* combine authorize func, add unit tests for util_test

* wrap errors

* fix unit test

* service unauthorized info to user

* better user errors

* inject default sa when it is empty or injected by the SDK in multi-user mode

* revert some accidental change

* revert some accidental change

* Update util.go

* make functions local

* deduplicate return values from isauthorized

* update kfam service host env variable

* disable istio injection

* set annotations to template instead of the workflow

* fix reference/value bug

* addressing comments

* Create an argoclient class

* move podnamespace to argo client

* addressing comments

* addressing comments
  • Loading branch information
gaoning777 authored and Jeffwan committed Dec 9, 2020
1 parent f9c17d7 commit dae90c3
Show file tree
Hide file tree
Showing 17 changed files with 313 additions and 165 deletions.
9 changes: 8 additions & 1 deletion backend/src/apiserver/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ go_library(
"pod.go",
"scheduled_workflow.go",
"sql.go",
"workflow.go",
"argo.go",
"argo_fake.go",
"workflow_fake.go",
"kfam.go",
"kfam_fake.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/client",
visibility = ["//visibility:public"],
deps = [
"//backend/src/common/util:go_default_library",
"//backend/src/apiserver/common:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned:go_default_library",
"//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library",
"@com_github_argoproj_argo//pkg/client/clientset/versioned:go_default_library",
"@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_go_sql_driver_mysql//:go_default_library",
"@com_github_golang_glog//:go_default_library",
Expand All @@ -27,6 +31,9 @@ go_library(
"@io_k8s_client_go//kubernetes:go_default_library",
"@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
"@io_k8s_client_go//rest:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
],
)

Expand Down
67 changes: 67 additions & 0 deletions backend/src/apiserver/client/argo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"time"

argoclient "github.com/argoproj/argo/pkg/client/clientset/versioned"
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/pkg/errors"
"k8s.io/client-go/rest"
)

const (
PodNamespace = "POD_NAMESPACE"
)

type ArgoClientInterface interface {
Workflow(namespace string) argoprojv1alpha1.WorkflowInterface
}

type ArgoClient struct {
argoProjClient argoprojv1alpha1.ArgoprojV1alpha1Interface
}

func (argoClient *ArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface {
if len(namespace) == 0 {
namespace = common.GetStringConfig(PodNamespace)
}
return argoClient.argoProjClient.Workflows(namespace)
}

func NewArgoClientOrFatal(initConnectionTimeout time.Duration) *ArgoClient {
var argoProjClient argoprojv1alpha1.ArgoprojV1alpha1Interface
var operation = func() error {
restConfig, err := rest.InClusterConfig()
if err != nil {
return errors.Wrap(err, "Failed to initialize the RestConfig")
}
argoProjClient = argoclient.NewForConfigOrDie(restConfig).ArgoprojV1alpha1()
return nil
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err := backoff.Retry(operation, b)

if err != nil {
glog.Fatalf("Failed to create ArgoClient. Error: %v", err)
}
return &ArgoClient{argoProjClient}
}
70 changes: 70 additions & 0 deletions backend/src/apiserver/client/argo_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/pkg/errors"
)

type FakeArgoClient struct {
workflowClientFake *FakeWorkflowClient
}

func NewFakeArgoClient() *FakeArgoClient {
return &FakeArgoClient{NewWorkflowClientFake()}
}

func (c *FakeArgoClient) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface {
return c.workflowClientFake
}

func (c *FakeArgoClient) GetWorkflowCount() int {
return len(c.workflowClientFake.workflows)
}

func (c *FakeArgoClient) GetWorkflowKeys() map[string]bool {
result := map[string]bool{}
for key := range c.workflowClientFake.workflows {
result[key] = true
}
return result
}

func (c *FakeArgoClient) IsTerminated(name string) (bool, error) {
workflow, ok := c.workflowClientFake.workflows[name]
if !ok {
return false, errors.New("No workflow found with name: " + name)
}

activeDeadlineSeconds := workflow.Spec.ActiveDeadlineSeconds
if activeDeadlineSeconds == nil {
return false, errors.New("No ActiveDeadlineSeconds found in workflow with name: " + name)
}

return *activeDeadlineSeconds == 0, nil
}

type FakeArgoClientWithBadWorkflow struct {
workflowClientFake *FakeBadWorkflowClient
}

func NewFakeArgoClientWithBadWorkflow() *FakeArgoClientWithBadWorkflow {
return &FakeArgoClientWithBadWorkflow{&FakeBadWorkflowClient{}}
}

func (c *FakeArgoClientWithBadWorkflow) Workflow(namespace string) argoprojv1alpha1.WorkflowInterface {
return c.workflowClientFake
}
57 changes: 0 additions & 57 deletions backend/src/apiserver/client/workflow.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package resource
package client

import (
"encoding/json"
"github.com/kubeflow/pipelines/backend/src/common/util"
"strconv"

"github.com/kubeflow/pipelines/backend/src/common/util"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/pkg/errors"
Expand Down Expand Up @@ -49,18 +50,6 @@ func (c *FakeWorkflowClient) Create(workflow *v1alpha1.Workflow) (*v1alpha1.Work
return workflow, nil
}

func (c *FakeWorkflowClient) GetWorkflowCount() int {
return len(c.workflows)
}

func (c *FakeWorkflowClient) GetWorkflowKeys() map[string]bool {
result := map[string]bool{}
for key := range c.workflows {
result[key] = true
}
return result
}

func (c *FakeWorkflowClient) Get(name string, options v1.GetOptions) (*v1alpha1.Workflow, error) {
workflow, ok := c.workflows[name]
if ok {
Expand Down Expand Up @@ -129,20 +118,6 @@ func (c *FakeWorkflowClient) Patch(name string, pt types.PatchType, data []byte,
return nil, errors.New("Failed to patch workflow")
}

func (c *FakeWorkflowClient) isTerminated(name string) (bool, error) {
workflow, ok := c.workflows[name]
if !ok {
return false, errors.New("No workflow found with name: " + name)
}

activeDeadlineSeconds := workflow.Spec.ActiveDeadlineSeconds
if activeDeadlineSeconds == nil {
return false, errors.New("No ActiveDeadlineSeconds found in workflow with name: " + name)
}

return *activeDeadlineSeconds == 0, nil
}

type FakeBadWorkflowClient struct {
FakeWorkflowClient
}
Expand Down
20 changes: 9 additions & 11 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package main
import (
"database/sql"
"fmt"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"os"
"time"

workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/cenkalti/backoff"
"github.com/golang/glog"

Expand Down Expand Up @@ -51,7 +51,6 @@ const (
visualizationServiceHost = "ML_PIPELINE_VISUALIZATIONSERVER_SERVICE_HOST"
visualizationServicePort = "ML_PIPELINE_VISUALIZATIONSERVER_SERVICE_PORT"

podNamespace = "POD_NAMESPACE"
initConnectionTimeout = "InitConnectionTimeout"
)

Expand All @@ -66,7 +65,7 @@ type ClientManager struct {
dBStatusStore storage.DBStatusStoreInterface
defaultExperimentStore storage.DefaultExperimentStoreInterface
objectStore storage.ObjectStoreInterface
wfClient workflowclient.WorkflowInterface
argoClient client.ArgoClientInterface
swfClient scheduledworkflowclient.ScheduledWorkflowInterface
podClient v1.PodInterface
kfamClient client.KFAMClientInterface
Expand Down Expand Up @@ -106,8 +105,8 @@ func (c *ClientManager) ObjectStore() storage.ObjectStoreInterface {
return c.objectStore
}

func (c *ClientManager) Workflow() workflowclient.WorkflowInterface {
return c.wfClient
func (c *ClientManager) ArgoClient() client.ArgoClientInterface {
return c.argoClient
}

func (c *ClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface {
Expand Down Expand Up @@ -149,14 +148,13 @@ func (c *ClientManager) init() {
c.defaultExperimentStore = storage.NewDefaultExperimentStore(db)
c.objectStore = initMinioClient(common.GetDurationConfig(initConnectionTimeout))

c.wfClient = client.CreateWorkflowClientOrFatal(
common.GetStringConfig(podNamespace), common.GetDurationConfig(initConnectionTimeout))
c.argoClient = client.NewArgoClientOrFatal(common.GetDurationConfig(initConnectionTimeout))

c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
common.GetStringConfig(podNamespace), common.GetDurationConfig(initConnectionTimeout))
common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout))

c.podClient = client.CreatePodClientOrFatal(
common.GetStringConfig(podNamespace), common.GetDurationConfig(initConnectionTimeout))
common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout))

runStore := storage.NewRunStore(db, c.time)
c.runStore = runStore
Expand Down
15 changes: 14 additions & 1 deletion backend/src/apiserver/common/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -8,6 +8,7 @@ go_library(
"filter_context.go",
"pagination_context.go",
"paths.go",
"util.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/common",
visibility = ["//visibility:public"],
Expand All @@ -18,3 +19,15 @@ go_library(
"@com_github_spf13_viper//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = [
"util_test.go",
],
embed = [":go_default_library"],
deps = [
"//backend/api:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
Loading

0 comments on commit dae90c3

Please sign in to comment.