From d2ddb2ed1c2afa64fd6014c95190416ff5cdd621 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Sat, 25 Jan 2025 07:42:47 -0500 Subject: [PATCH] feat(backend): add support for uploading new sample pipeline vers (#11553) This change adds additional sampleconfig options that provide support for uploading new Pipelines and PipelineVersion samples. To accommodate this and backwards compatibility the structure of the samples config has been changed. Configs following the old format will continue to be supported. Sample config code is also moved to its own file so as not to bloat main.go In order to handle conflicts, and detecting pipeline/pipelineVersion existence, additional db queries are made per pipeline and pipeline version at apiserver startup. Signed-off-by: Humair Khan --- backend/src/apiserver/config/config.go | 212 ++++++++++++++++ backend/src/apiserver/config/config_test.go | 230 ++++++++++++++++++ .../src/apiserver/config/sample_config.json | 27 +- .../config/testdata/sample_pipeline.yaml | 49 ++++ backend/src/apiserver/main.go | 114 +-------- .../apiserver/resource/resource_manager.go | 13 +- .../src/apiserver/storage/pipeline_store.go | 24 +- 7 files changed, 544 insertions(+), 125 deletions(-) create mode 100644 backend/src/apiserver/config/config.go create mode 100644 backend/src/apiserver/config/config_test.go create mode 100644 backend/src/apiserver/config/testdata/sample_pipeline.yaml diff --git a/backend/src/apiserver/config/config.go b/backend/src/apiserver/config/config.go new file mode 100644 index 00000000000..808ce1f993c --- /dev/null +++ b/backend/src/apiserver/config/config.go @@ -0,0 +1,212 @@ +// Copyright 2025 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "fmt" + "github.com/golang/glog" + "github.com/kubeflow/pipelines/backend/src/apiserver/client" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" + "github.com/kubeflow/pipelines/backend/src/apiserver/model" + "github.com/kubeflow/pipelines/backend/src/apiserver/resource" + "github.com/kubeflow/pipelines/backend/src/apiserver/server" + "github.com/kubeflow/pipelines/backend/src/common/util" + "google.golang.org/grpc/codes" + "os" + "time" +) + +// deprecated +type deprecatedConfig struct { + Name string + Description string + File string +} + +type configPipelines struct { + Name string + Description string + File string + // optional, Name is used for PipelineVersion if not provided + VersionName string + // optional, Description is used for PipelineVersion if not provided + VersionDescription string +} + +type config struct { + // If pipeline version already exists and + // LoadSamplesOnRestart is enabled, then the pipeline + // version is uploaded again on server restart + // if it does not already exist + LoadSamplesOnRestart bool + Pipelines []configPipelines +} + +// LoadSamples preloads a collection of pipeline samples +// +// If LoadSamplesOnRestart is false then Samples are only +// loaded once when the pipeline system is initially installed. +// They won't be loaded on upgrade or pod restart, to +// prevent them from reappearing if user explicitly deletes the +// samples. If LoadSamplesOnRestart is true then PipelineVersions +// are uploaded if they do not already exist upon upgrade or pod +// restart. +func LoadSamples(resourceManager *resource.ResourceManager, sampleConfigPath string) error { + pathExists, err := client.PathExists(sampleConfigPath) + if err != nil { + return err + } + + if !pathExists { + glog.Infof("No samples path provided, skipping loading samples..") + return nil + } + + configBytes, err := os.ReadFile(sampleConfigPath) + if err != nil { + return fmt.Errorf("failed to read sample configurations file. Err: %v", err) + } + + var pipelineConfig config + if configErr := json.Unmarshal(configBytes, &pipelineConfig); configErr != nil { + // Attempt to parse to deprecated config version: + var deprecatedCfg []deprecatedConfig + if depConfigErr := json.Unmarshal(configBytes, &deprecatedCfg); depConfigErr != nil { + return fmt.Errorf("failed to read sample configurations. Err: %v", configErr) + } + glog.Warningf("encountered deprecated version of samples config, please update to the newer version to " + + "ensure future compatibility") + for _, cfg := range deprecatedCfg { + pipelineConfig.Pipelines = append(pipelineConfig.Pipelines, configPipelines{ + Name: cfg.Name, + File: cfg.File, + Description: cfg.Description, + }) + } + pipelineConfig.LoadSamplesOnRestart = false + } + + // Check if sample has been loaded already and skip loading if true. + haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded() + if err != nil { + return err + } + + if !pipelineConfig.LoadSamplesOnRestart && haveSamplesLoaded { + glog.Infof("Samples already loaded in the past. Skip loading.") + return nil + } + + processedPipelines := map[string]bool{} + + for _, cfg := range pipelineConfig.Pipelines { + // Track if this is the first upload of this pipeline + reader, configErr := os.Open(cfg.File) + if configErr != nil { + return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr) + } + pipelineFile, configErr := server.ReadPipelineFile(cfg.File, reader, common.MaxFileLength) + if configErr != nil { + return fmt.Errorf("failed to load sample %s. Error: %v", cfg.Name, configErr) + } + + // Create pipeline if it does not already exist + p, fetchErr := resourceManager.GetPipelineByNameAndNamespace(cfg.Name, "") + if fetchErr != nil { + if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) { + p, configErr = resourceManager.CreatePipeline(&model.Pipeline{ + Name: cfg.Name, + Description: cfg.Description, + }) + if configErr != nil { + // Log the error but not fail. The API Server pod can restart and it could potentially cause + // name collision. In the future, we might consider loading samples during deployment, instead + // of when API server starts. + glog.Warningf(fmt.Sprintf( + "Failed to create pipeline for %s. Error: %v", cfg.Name, configErr)) + continue + } else { + glog.Info(fmt.Sprintf("Successfully uploaded Pipeline %s.", cfg.Name)) + } + } else { + return fmt.Errorf( + "Failed to handle load sample for Pipeline: %s. Error: %v", cfg.Name, fetchErr) + } + } + + // Use Pipeline Version Name/Description if provided + // Otherwise fallback to owning Pipeline's Name/Description + pvDescription := cfg.Description + if cfg.VersionDescription != "" { + pvDescription = cfg.VersionDescription + } + pvName := cfg.Name + if cfg.VersionName != "" { + pvName = cfg.VersionName + } + + // If the Pipeline Version exists, do nothing + // Otherwise upload new Pipeline Version for + // this pipeline. + _, fetchErr = resourceManager.GetPipelineVersionByName(pvName) + if fetchErr != nil { + if util.IsUserErrorCodeMatch(fetchErr, codes.NotFound) { + _, configErr = resourceManager.CreatePipelineVersion( + &model.PipelineVersion{ + Name: pvName, + Description: pvDescription, + PipelineId: p.UUID, + PipelineSpec: string(pipelineFile), + }, + ) + if configErr != nil { + // Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. + // In the future, we might consider loading samples during deployment, instead of when API server starts. + glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", pvName, configErr)) + + continue + } else { + glog.Info(fmt.Sprintf("Successfully uploaded PipelineVersion %s.", pvName)) + } + + if processedPipelines[pvName] { + // Since the default sorting is by create time, + // Sleep one second makes sure the samples are + // showing up in the same order as they are added. + time.Sleep(1 * time.Second) + } + } else { + return fmt.Errorf( + "Failed to handle load sample for PipelineVersion: %s. Error: %v", pvName, fetchErr) + } + } else { + // pipeline version already exists, do nothing + continue + } + + processedPipelines[pvName] = true + } + + if !haveSamplesLoaded { + err = resourceManager.MarkSampleLoaded() + if err != nil { + return err + } + } + + glog.Info("All samples are loaded.") + return nil +} diff --git a/backend/src/apiserver/config/config_test.go b/backend/src/apiserver/config/config_test.go new file mode 100644 index 00000000000..fb5af088940 --- /dev/null +++ b/backend/src/apiserver/config/config_test.go @@ -0,0 +1,230 @@ +// Copyright 2025 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "github.com/kubeflow/pipelines/backend/src/apiserver/list" + "github.com/kubeflow/pipelines/backend/src/apiserver/model" + "github.com/kubeflow/pipelines/backend/src/apiserver/resource" + "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "os" + "path/filepath" + "testing" +) + +func fakeResourceManager() *resource.ResourceManager { + clientManager := resource.NewFakeClientManagerOrFatalV2() + resourceManager := resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false}) + return resourceManager +} + +func TestLoadSamplesConfigBackwardsCompatibility(t *testing.T) { + rm := fakeResourceManager() + pc := []deprecatedConfig{ + { + Name: "Pipeline 1", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + }, + { + Name: "Pipeline 2", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + }, + } + + path, err := writeSampleConfigDeprecated(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + _, err = rm.GetPipelineByNameAndNamespace(pc[0].Name, "") + require.NoError(t, err) + _, err = rm.GetPipelineByNameAndNamespace(pc[1].Name, "") + require.NoError(t, err) + + _, err = rm.GetPipelineVersionByName(pc[0].Name) + require.NoError(t, err) + _, err = rm.GetPipelineVersionByName(pc[1].Name) + require.NoError(t, err) + + // Update pipeline version for Pipeline 1 + pc[0].Name = "Pipeline 3" + path, err = writeSampleConfigDeprecated(t, pc, "sample.json") + require.NoError(t, err) + + // Loading samples should result in no pipeline uploaded + err = LoadSamples(rm, path) + require.NoError(t, err) + _, err = rm.GetPipelineByNameAndNamespace(pc[0].Name, "") + var userErr *util.UserError + if assert.ErrorAs(t, err, &userErr) { + require.Equal(t, codes.NotFound, userErr.ExternalStatusCode()) + } +} + +func TestLoadSamples(t *testing.T) { + rm := fakeResourceManager() + pc := config{ + LoadSamplesOnRestart: true, + Pipelines: []configPipelines{ + { + Name: "Pipeline 1", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + VersionName: "Pipeline 1 - Ver 1", + VersionDescription: "Pipeline 1 - Ver 1 Description", + }, + { + Name: "Pipeline 2", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + VersionName: "Pipeline 2 - Ver 1", + VersionDescription: "Pipeline 2 - Ver 1 Description", + }, + }, + } + + path, err := writeSampleConfig(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + var pipeline1 *model.Pipeline + pipeline1, err = rm.GetPipelineByNameAndNamespace(pc.Pipelines[0].Name, "") + require.NoError(t, err) + var pipeline2 *model.Pipeline + pipeline2, err = rm.GetPipelineByNameAndNamespace(pc.Pipelines[1].Name, "") + require.NoError(t, err) + + _, err = rm.GetPipelineVersionByName(pc.Pipelines[0].VersionName) + require.NoError(t, err) + + // Update pipeline version for Pipeline 1 + pc.Pipelines[0].VersionName = "Pipeline 1 - Ver 2" + path, err = writeSampleConfig(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + // Expect another Pipeline version added for Pipeline 1 + opts, err := list.NewOptions(&model.PipelineVersion{}, 10, "id", nil) + require.NoError(t, err) + _, totalSize, _, err := rm.ListPipelineVersions(pipeline1.UUID, opts) + require.NoError(t, err) + require.Equal(t, totalSize, 2) + + // Update pipeline version for Pipeline 2 + pc.Pipelines[1].VersionName = "Pipeline 2 - Ver 2" + path, err = writeSampleConfig(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + // Expect another Pipeline version added for Pipeline 2 + _, err = rm.GetPipelineVersionByName(pc.Pipelines[1].VersionName) + require.NoError(t, err) + _, totalSize, _, err = rm.ListPipelineVersions(pipeline2.UUID, opts) + require.Equal(t, totalSize, 2) + + // Confirm previous pipeline version count has not been affected + _, totalSize, _, err = rm.ListPipelineVersions(pipeline1.UUID, opts) + require.Equal(t, totalSize, 2) + + // When LoadSamplesOnRestart is false, changes to config should + // result in no new pipelines update upon restart + pc.LoadSamplesOnRestart = false + + // Update pipeline version for Pipeline 2 + pc.Pipelines[1].VersionName = "Pipeline 2 - Ver 3" + path, err = writeSampleConfig(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + // Expect no change + _, totalSize, _, err = rm.ListPipelineVersions(pipeline2.UUID, opts) + require.NoError(t, err) + require.Equal(t, totalSize, 2) +} + +func TestLoadSamplesMultiplePipelineVersionsInConfig(t *testing.T) { + rm := fakeResourceManager() + pc := config{ + LoadSamplesOnRestart: true, + Pipelines: []configPipelines{ + { + Name: "Pipeline 1", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + VersionName: "Pipeline 1 - Ver 1", + VersionDescription: "Pipeline 1 - Ver 1 Description", + }, + { + Name: "Pipeline 1", + Description: "test description", + File: "testdata/sample_pipeline.yaml", + VersionName: "Pipeline 1 - Ver 2", + VersionDescription: "Pipeline 1 - Ver 2 Description", + }, + }, + } + + path, err := writeSampleConfig(t, pc, "sample.json") + require.NoError(t, err) + err = LoadSamples(rm, path) + require.NoError(t, err) + + // Expect both versions to be added + var pipeline *model.Pipeline + pipeline, err = rm.GetPipelineByNameAndNamespace(pc.Pipelines[0].Name, "") + require.NoError(t, err) + + _, err = rm.GetPipelineVersionByName(pc.Pipelines[0].VersionName) + require.NoError(t, err) + _, err = rm.GetPipelineVersionByName(pc.Pipelines[1].VersionName) + require.NoError(t, err) + + opts, err := list.NewOptions(&model.PipelineVersion{}, 10, "id", nil) + require.NoError(t, err) + + _, totalSize, _, err := rm.ListPipelineVersions(pipeline.UUID, opts) + require.Equal(t, totalSize, 2) +} + +func writeSampleConfig(t *testing.T, config config, path string) (string, error) { + return writeContents(t, config, path) +} + +func writeSampleConfigDeprecated(t *testing.T, config []deprecatedConfig, path string) (string, error) { + return writeContents(t, config, path) +} + +func writeContents(t *testing.T, content interface{}, path string) (string, error) { + tempDir := t.TempDir() + sampleFilePath := filepath.Join(tempDir, path) + marshal, err := json.Marshal(content) + if err != nil { + return "", err + } + if err := os.WriteFile(sampleFilePath, marshal, 0644); err != nil { + t.Fatalf("Failed to create %v file: %v", path, err) + } + return sampleFilePath, nil +} diff --git a/backend/src/apiserver/config/sample_config.json b/backend/src/apiserver/config/sample_config.json index a4f4145c54e..01064c9584e 100644 --- a/backend/src/apiserver/config/sample_config.json +++ b/backend/src/apiserver/config/sample_config.json @@ -1,12 +1,15 @@ -[ - { - "name": "[Tutorial] Data passing in python components", - "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/Data%20passing%20in%20python%20components) Shows how to pass data between python components.", - "file": "/samples/tutorials/Data passing in python components/Data passing in python components - Files.py.yaml" - }, - { - "name": "[Tutorial] DSL - Control structures", - "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/DSL%20-%20Control%20structures) Shows how to use conditional execution and exit handlers. This pipeline will randomly fail to demonstrate that the exit handler gets executed even in case of failure.", - "file": "/samples/tutorials/DSL - Control structures/DSL - Control structures.py.yaml" - } -] +{ + "loadSamplesOnRestart": true, + "pipelines": [ + { + "name": "[Tutorial] Data passing in python components", + "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/Data%20passing%20in%20python%20components) Shows how to pass data between python components.", + "file": "/samples/tutorials/Data passing in python components/Data passing in python components - Files.py.yaml" + }, + { + "name": "[Tutorial] DSL - Control structures", + "description": "[source code](https://github.com/kubeflow/pipelines/tree/master/samples/tutorials/DSL%20-%20Control%20structures) Shows how to use conditional execution and exit handlers. This pipeline will randomly fail to demonstrate that the exit handler gets executed even in case of failure.", + "file": "/samples/tutorials/DSL - Control structures/DSL - Control structures.py.yaml" + } + ] +} diff --git a/backend/src/apiserver/config/testdata/sample_pipeline.yaml b/backend/src/apiserver/config/testdata/sample_pipeline.yaml new file mode 100644 index 00000000000..4a0edac76e4 --- /dev/null +++ b/backend/src/apiserver/config/testdata/sample_pipeline.yaml @@ -0,0 +1,49 @@ +# PIPELINE DEFINITION +# Name: pipeline-hello-world +components: + comp-hello-world: + executorLabel: exec-hello-world +deploymentSpec: + executors: + exec-hello-world: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - hello_world + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.10.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef hello_world():\n print('hello')\n\n" + image: python:3.9 +pipelineInfo: + name: pipeline-hello-world +root: + dag: + tasks: + hello-world: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world + taskInfo: + name: hello-world +schemaVersion: 2.1.0 +sdkVersion: kfp-2.10.1 diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 9841503a32f..740e735e4c2 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -16,21 +16,7 @@ package main import ( "context" - "encoding/json" "flag" - "fmt" - "github.com/kubeflow/pipelines/backend/src/apiserver/client" - "io" - "io/ioutil" - "math" - "net" - "net/http" - "os" - "strconv" - "strings" - "sync" - "time" - "github.com/fsnotify/fsnotify" "github.com/golang/glog" "github.com/gorilla/mux" @@ -39,7 +25,7 @@ import ( apiv2beta1 "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client" cm "github.com/kubeflow/pipelines/backend/src/apiserver/client_manager" "github.com/kubeflow/pipelines/backend/src/apiserver/common" - "github.com/kubeflow/pipelines/backend/src/apiserver/model" + "github.com/kubeflow/pipelines/backend/src/apiserver/config" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/apiserver/server" "github.com/kubeflow/pipelines/backend/src/apiserver/template" @@ -49,6 +35,13 @@ import ( "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/reflection" + "io" + "math" + "net" + "net/http" + "strconv" + "strings" + "sync" ) const ( @@ -84,7 +77,7 @@ func main() { &clientManager, &resource.ResourceManagerOptions{CollectMetrics: *collectMetricsFlag}, ) - err := loadSamples(resourceManager) + err := config.LoadSamples(resourceManager, *sampleConfigPath) if err != nil { glog.Fatalf("Failed to load samples. Err: %v", err) } @@ -255,95 +248,6 @@ func registerHttpHandlerFromEndpoint(handler RegisterHttpHandlerFromEndpoint, se } } -// Preload a bunch of pipeline samples -// Samples are only loaded once when the pipeline system is initially installed. -// They won't be loaded when upgrade or pod restart, to prevent them reappear if user explicitly -// delete the samples. -func loadSamples(resourceManager *resource.ResourceManager) error { - // Check if sample has being loaded already and skip loading if true. - haveSamplesLoaded, err := resourceManager.HaveSamplesLoaded() - if err != nil { - return err - } - if haveSamplesLoaded { - glog.Infof("Samples already loaded in the past. Skip loading.") - return nil - } - - pathExists, err := client.PathExists(*sampleConfigPath) - if err != nil { - return err - } - - if !pathExists { - glog.Infof("No samples path provided, skipping loading samples..") - return nil - } - - configBytes, err := ioutil.ReadFile(*sampleConfigPath) - if err != nil { - return fmt.Errorf("failed to read sample configurations file. Err: %v", err) - } - type config struct { - Name string - Description string - File string - } - var configs []config - if err = json.Unmarshal(configBytes, &configs); err != nil { - return fmt.Errorf("failed to read sample configurations. Err: %v", err) - } - for _, config := range configs { - reader, configErr := os.Open(config.File) - if configErr != nil { - return fmt.Errorf("failed to load sample %s. Error: %v", config.Name, configErr) - } - pipelineFile, configErr := server.ReadPipelineFile(config.File, reader, common.MaxFileLength) - if configErr != nil { - return fmt.Errorf("failed to decompress the file %s. Error: %v", config.Name, configErr) - } - p, configErr := resourceManager.CreatePipeline( - &model.Pipeline{ - Name: config.Name, - Description: config.Description, - }, - ) - if configErr != nil { - // Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. - // In the future, we might consider loading samples during deployment, instead of when API server starts. - glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", config.Name, configErr)) - - continue - } - - _, configErr = resourceManager.CreatePipelineVersion( - &model.PipelineVersion{ - Name: config.Name, - Description: config.Description, - PipelineId: p.UUID, - PipelineSpec: string(pipelineFile), - }, - ) - if configErr != nil { - // Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. - // In the future, we might consider loading samples during deployment, instead of when API server starts. - glog.Warningf(fmt.Sprintf("Failed to create pipeline for %s. Error: %v", config.Name, configErr)) - - continue - } - // Since the default sorting is by create time, - // Sleep one second makes sure the samples are showing up in the same order as they are added. - time.Sleep(1 * time.Second) - } - // Mark sample as loaded - err = resourceManager.MarkSampleLoaded() - if err != nil { - return err - } - glog.Info("All samples are loaded.") - return nil -} - func initConfig() { // Import environment variable, support nested vars e.g. OBJECTSTORECONFIG_ACCESSKEY replacer := strings.NewReplacer(".", "_") diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 94fdc578c62..c183aa7799f 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -1616,7 +1616,7 @@ func (r *ResourceManager) CreatePipelineVersion(pv *model.PipelineVersion) (*mod } // After pipeline version being created in DB and pipeline file being - // saved in minio server, set this pieline version to status ready. + // saved in minio server, set this pipeline version to status ready. version.Status = model.PipelineVersionReady err = r.pipelineStore.UpdatePipelineVersionStatus(version.UUID, version.Status) if err != nil { @@ -1625,7 +1625,7 @@ func (r *ResourceManager) CreatePipelineVersion(pv *model.PipelineVersion) (*mod return version, nil } -// Returns a pipeline version. +// Returns a pipeline version by Id. func (r *ResourceManager) GetPipelineVersion(pipelineVersionId string) (*model.PipelineVersion, error) { if pipelineVersion, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId); err != nil { return nil, util.Wrapf(err, "Failed to get a pipeline version with id %v", pipelineVersionId) @@ -1634,6 +1634,15 @@ func (r *ResourceManager) GetPipelineVersion(pipelineVersionId string) (*model.P } } +// Returns a pipeline version by Name. +func (r *ResourceManager) GetPipelineVersionByName(name string) (*model.PipelineVersion, error) { + if pipelineVersion, err := r.pipelineStore.GetPipelineVersionByName(name); err != nil { + return nil, util.Wrapf(err, "Failed to get a pipeline version with name %v", name) + } else { + return pipelineVersion, nil + } +} + // Returns the latest pipeline version for a specified pipeline id. func (r *ResourceManager) GetLatestPipelineVersion(pipelineId string) (*model.PipelineVersion, error) { // Verify pipeline exists diff --git a/backend/src/apiserver/storage/pipeline_store.go b/backend/src/apiserver/storage/pipeline_store.go index e085e0f3665..0d4e016007c 100644 --- a/backend/src/apiserver/storage/pipeline_store.go +++ b/backend/src/apiserver/storage/pipeline_store.go @@ -17,7 +17,6 @@ package storage import ( "database/sql" "fmt" - sq "github.com/Masterminds/squirrel" "github.com/golang/glog" "github.com/kubeflow/pipelines/backend/src/apiserver/list" @@ -100,6 +99,7 @@ type PipelineStoreInterface interface { CreatePipelineVersion(pipelineVersion *model.PipelineVersion) (*model.PipelineVersion, error) GetPipelineVersionWithStatus(pipelineVersionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error) GetPipelineVersion(pipelineVersionId string) (*model.PipelineVersion, error) + GetPipelineVersionByName(name string) (*model.PipelineVersion, error) GetLatestPipelineVersion(pipelineId string) (*model.PipelineVersion, error) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error) UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error @@ -822,33 +822,45 @@ func (s *PipelineStore) GetPipelineVersion(versionId string) (*model.PipelineVer return s.GetPipelineVersionWithStatus(versionId, model.PipelineVersionReady) } +func (s *PipelineStore) GetPipelineVersionByName(name string) (*model.PipelineVersion, error) { + return s.getPipelineVersionByCol("Name", name, model.PipelineVersionReady) +} + // Returns a pipeline version with specified status. func (s *PipelineStore) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error) { + return s.getPipelineVersionByCol("UUID", versionId, status) +} + +// getPipelineVersionByCol retrieves a PipelineVersion filtered on +// colName with colVal and the given status. This is particularly +// useful for fetching pipeline Version by either UUID or Name columns. +func (s *PipelineStore) getPipelineVersionByCol(colName, colVal string, status model.PipelineVersionStatus) (*model.PipelineVersion, error) { // Prepare a SQL query sql, args, err := sq. Select(pipelineVersionColumns...). From("pipeline_versions"). - Where(sq.And{sq.Eq{"pipeline_versions.UUID": versionId}, sq.Eq{"pipeline_versions.Status": status}}). + Where(sq.And{ + sq.Eq{fmt.Sprintf("pipeline_versions.%s", colName): colVal}, sq.Eq{"pipeline_versions.Status": status}}). Limit(1). ToSql() if err != nil { - return nil, util.NewInternalServerError(err, "Failed to create query to fetch a pipeline version %v with status %v", versionId, string(status)) + return nil, util.NewInternalServerError(err, "Failed to create query to fetch a pipeline version with %v=%v and status=%v", colName, colVal, string(status)) } // Execute the query r, err := s.db.Query(sql, args...) if err != nil { - return nil, util.NewInternalServerError(err, "Failed fetching pipeline version %v with status %v", versionId, string(status)) + return nil, util.NewInternalServerError(err, "Failed fetching pipeline version with %v=%v and status=%v", colName, colVal, string(status)) } defer r.Close() // Parse results versions, err := s.scanPipelineVersionsRows(r) if err != nil || len(versions) > 1 { - return nil, util.NewInternalServerError(err, "Failed to parse a pipeline version %v from SQL response with status %v", versionId, string(status)) + return nil, util.NewInternalServerError(err, "Failed to parse a pipeline version from SQL response with %v=%v and status=%v", colName, colVal, string(status)) } if len(versions) == 0 { - return nil, util.NewResourceNotFoundError("PipelineVersion", versionId) + return nil, util.NewResourceNotFoundError("PipelineVersion", colVal) } return versions[0], nil }