From 7aaecb150114d358c886ab85f01f327428b2a253 Mon Sep 17 00:00:00 2001 From: jingzhang36 Date: Thu, 26 Sep 2019 14:59:07 +0800 Subject: [PATCH] Add necessary data types to api and database to support pipeline version. (#1873) * Add necessary data types/tables for pipeline version. Mostly based on Yang's branch at https://github.com/IronPan/pipelines/tree/kfpci/. Backward compatible. * Modified comment * Modify api converter according with new pipeline (version) definition * Change pipeline_store for DefaultVersionId field * Add pipeline spec to pipeline version * fix model converter * fix a comment * Add foreign key, pagination of list request, refactor code source * Refactor code source * Foreign key * Change code source and package source type * Fix ; separator * Add versions table and modify existing pipeline apis * Remove api pipeline defintiion change and leave it for later PR * Add comment * Make schema changing and data backfilling a single transaction * Tolerate null default version id in code * fix status * Revise delete pipeline func * Use raw query to migrate data * No need to update versions status * rename and minor changes * accidentally removed a where clause * Fix a model name prefix * Refine comments * Revise if condition * Address comments * address more comments * Rearrange pipeline and version related parts inside CreatePipeline, to make them more separate. * Add package url to pipeline version. Required when calling CreatePipelineVersionRequest * Single code source url; remove pipeline id as sorting field; reformat * resolve remote branch and local branch diff * remove unused func * Remove an empty line --- backend/api/pipeline.proto | 69 +++- backend/api/resource_reference.proto | 1 + backend/src/apiserver/BUILD.bazel | 1 + backend/src/apiserver/client_manager.go | 56 ++++ backend/src/apiserver/list/list.go | 29 +- backend/src/apiserver/list/list_test.go | 4 + backend/src/apiserver/model/BUILD.bazel | 22 +- backend/src/apiserver/model/experiment.go | 5 + backend/src/apiserver/model/job.go | 5 + backend/src/apiserver/model/pipeline.go | 12 + .../src/apiserver/model/pipeline_version.go | 75 +++++ .../apiserver/model/pipeline_version_test.go | 32 ++ backend/src/apiserver/model/run.go | 7 + .../src/apiserver/resource/model_converter.go | 25 +- .../resource/model_converter_test.go | 33 ++ .../apiserver/resource/resource_manager.go | 54 +++- .../resource/resource_manager_test.go | 30 +- backend/src/apiserver/server/api_converter.go | 47 +++ .../apiserver/server/api_converter_test.go | 26 ++ .../server/list_request_util_test.go | 4 + .../server/pipeline_upload_server_test.go | 57 +++- backend/src/apiserver/server/util_test.go | 11 +- backend/src/apiserver/storage/db_fake.go | 1 + .../src/apiserver/storage/pipeline_store.go | 235 ++++++++++++-- .../apiserver/storage/pipeline_store_test.go | 299 +++++++++++++----- backend/src/apiserver/storage/run_store.go | 80 ++--- .../src/apiserver/storage/run_store_test.go | 14 +- 27 files changed, 1041 insertions(+), 193 deletions(-) create mode 100644 backend/src/apiserver/model/pipeline_version.go create mode 100644 backend/src/apiserver/model/pipeline_version_test.go diff --git a/backend/api/pipeline.proto b/backend/api/pipeline.proto index 72485083db5..fa775bed4f8 100644 --- a/backend/api/pipeline.proto +++ b/backend/api/pipeline.proto @@ -20,8 +20,10 @@ package api; import "google/api/annotations.proto"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; -import "backend/api/parameter.proto"; import "backend/api/error.proto"; +import "backend/api/parameter.proto"; +import "backend/api/pipeline_spec.proto"; +import "backend/api/resource_reference.proto"; import "protoc-gen-swagger/options/annotations.proto"; option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = { @@ -136,6 +138,32 @@ message GetTemplateResponse { string template = 1; } +message GetPipelineVersionTemplateRequest{ + string version_id = 1; +} + +message CreatePipelineVersionRequest { + // ResourceReference inside PipelineVersion specifies the pipeline that this + // version belongs to. + PipelineVersion version = 1; +} + +message GetPipelineVersionRequest { + string version_id = 1; +} + +message ListPipelineVersionsRequest { + // ResourceKey specifies the pipeline whose versions are to be listed. + ResourceKey resource_key = 1; + int32 page_size = 2; + string page_token = 3; +} + +message ListPipelineVersionsResponse { + repeated PipelineVersion versions = 1; + string next_page_token = 2; +} + message Pipeline { // Output. Unique pipeline ID. Generated by API server. string id = 1; @@ -151,14 +179,53 @@ message Pipeline { string description = 4; // Output. The input parameters for this pipeline. + // TODO(jingzhang36): replace this parameters field with the parameters field + // inside PipelineVersion when all usage of the former has been changed to use + // the latter. repeated Parameter parameters = 5; // The URL to the source of the pipeline. This is required when creating the // pipeine through CreatePipeline API. + // TODO(jingzhang36): replace this url field with the code_source_urls field + // inside PipelineVersion when all usage of the former has been changed to use + // the latter. Url url = 7; // In case any error happens retrieving a pipeline field, only pipeline ID // and the error message is returned. Client has the flexibility of choosing // how to handle error. This is especially useful during listing call. string error = 6; + + // Output only. The default version of the pipeline. As of now, the latest + // version is used as default. (In the future, if desired by customers, we + // can allow them to set default version.) + // TODO(jingzhang36): expose this in API pipeline definition with FE changes. + // PipelineVersion default_version = 8; } + +message PipelineVersion { + // Output. Unique version ID. Generated by API server. + string id = 1; + + // Optional input field. Version name provided by user. + string name = 2; + + // Output. The time this pipeline version is created. + google.protobuf.Timestamp created_at = 3; + + // Output. The input parameters for this pipeline. + repeated Parameter parameters = 4; + + // Input. Optional. Pipeline version code source. + string code_source_url = 5; + + // Input. Required. Pipeline version package url. + // Whe calling CreatePipelineVersion API method, need to provide one package + // file location. + Url package_url = 6; + + // Input. Required. E.g., specify which pipeline this pipeline version belongs + // to. + repeated ResourceReference resource_references = 7; +} + diff --git a/backend/api/resource_reference.proto b/backend/api/resource_reference.proto index a5e0969b8a2..d866843d063 100644 --- a/backend/api/resource_reference.proto +++ b/backend/api/resource_reference.proto @@ -21,6 +21,7 @@ enum ResourceType { UNKNOWN_RESOURCE_TYPE = 0; EXPERIMENT = 1; JOB = 2; + PIPELINE = 3; } enum Relationship { diff --git a/backend/src/apiserver/BUILD.bazel b/backend/src/apiserver/BUILD.bazel index eaa254f7a64..8489d944f59 100644 --- a/backend/src/apiserver/BUILD.bazel +++ b/backend/src/apiserver/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library", "@com_github_jinzhu_gorm//:go_default_library", "@com_github_jinzhu_gorm//dialects/sqlite:go_default_library", + "@com_github_masterminds_squirrel//:go_default_library", "@com_github_minio_minio_go//:go_default_library", "@com_github_spf13_viper//:go_default_library", "@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library", diff --git a/backend/src/apiserver/client_manager.go b/backend/src/apiserver/client_manager.go index d9579973cec..4a81fbf6e78 100644 --- a/backend/src/apiserver/client_manager.go +++ b/backend/src/apiserver/client_manager.go @@ -176,11 +176,24 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB { db, err := gorm.Open(driverName, arg) util.TerminateIfError(err) + // If pipeline_versions table is introduced into DB for the first time, + // it needs initialization or data backfill. + var tableNames []string + var initializePipelineVersions = true + db.Raw(`show tables`).Pluck("Tables_in_mlpipeline", &tableNames) + for _, tableName := range tableNames { + if tableName == "pipeline_versions" { + initializePipelineVersions = false + break + } + } + // Create table response := db.AutoMigrate( &model.Experiment{}, &model.Job{}, &model.Pipeline{}, + &model.PipelineVersion{}, &model.ResourceReference{}, &model.RunDetail{}, &model.RunMetric{}, @@ -201,6 +214,18 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB { if response.Error != nil { glog.Fatalf("Failed to create a foreign key for RunID in run_metrics table. Error: %s", response.Error) } + response = db.Model(&model.PipelineVersion{}). + AddForeignKey("PipelineId", "pipelines(UUID)", "CASCADE" /* onDelete */, "CASCADE" /* update */) + if response.Error != nil { + glog.Fatalf("Failed to create a foreign key for PipelineId in pipeline_versions table. Error: %s", response.Error) + } + + // Data backfill for pipeline_versions if this is the first time for + // pipeline_versions to enter mlpipeline DB. + if initializePipelineVersions { + initPipelineVersionsFromPipelines(db) + } + return storage.NewDB(db.DB(), storage.NewMySQLDialect()) } @@ -288,3 +313,34 @@ func newClientManager() ClientManager { return clientManager } + +// Data migration in 2 steps to introduce pipeline_versions table. This +// migration shall be called only once when pipeline_versions table is created +// for the first time in DB. +func initPipelineVersionsFromPipelines(db *gorm.DB) { + tx := db.Begin() + + // Step 1: duplicate pipelines to pipeline versions. + // The pipeline versions created here are not through KFP pipeine version + // API, and are only for the legacy pipelines that are created + // before pipeline version API is introduced. + // For those legacy pipelines, who don't have versions before, we create one + // implicit version for each of them. Given a legacy pipeline, the implicit + // version created here is assigned an ID the same as the pipeline ID. This + // way we don't need to move the minio file of pipeline package around, + // since the minio file's path is based on the pipeline ID (and now on the + // implicit version ID too). Meanwhile, IDs are required to be unique inside + // the same resource type, so pipeline and pipeline version as two different + // resources useing the same ID is OK. + // On the other hand, pipeline and its pipeline versions created after + // pipeline version API is introduced will have different Ids; and the minio + // file will be put directly into the directories for pipeline versions. + tx.Exec(`INSERT INTO + pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId) + SELECT UUID, Name, CreatedAtInSec, Parameters, Status, UUID FROM pipelines;`) + + // Step 2: modifiy pipelines table after pipeline_versions are populated. + tx.Exec("update pipelines set DefaultVersionId=UUID;") + + tx.Commit() +} diff --git a/backend/src/apiserver/list/list.go b/backend/src/apiserver/list/list.go index a33de86792b..cf5b226d6e0 100644 --- a/backend/src/apiserver/list/list.go +++ b/backend/src/apiserver/list/list.go @@ -51,6 +51,8 @@ type token struct { KeyFieldValue interface{} // IsDesc is true if the sorting order should be descending. IsDesc bool + // ModelName is the table where ***FieldName belongs to. + ModelName string // Filter represents the filtering that should be applied in the query. Filter *filter.Filter } @@ -125,7 +127,9 @@ func NewOptions(listable Listable, pageSize int, sortBy string, filterProto *api return nil, err } - token := &token{KeyFieldName: listable.PrimaryKeyColumnName()} + token := &token{ + KeyFieldName: listable.PrimaryKeyColumnName(), + ModelName: listable.GetModelName()} // Ignore the case of the letter. Split query string by space. queryList := strings.Fields(strings.ToLower(sortBy)) @@ -177,15 +181,23 @@ func (o *Options) AddPaginationToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBu // containing these. func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuilder { // If next row's value is specified, set those values in the clause. + var modelNamePrefix string + if len(o.ModelName) == 0 { + modelNamePrefix = "" + } else { + modelNamePrefix = o.ModelName + "." + } if o.SortByFieldValue != nil && o.KeyFieldValue != nil { if o.IsDesc { sqlBuilder = sqlBuilder. - Where(sq.Or{sq.Lt{o.SortByFieldName: o.SortByFieldValue}, - sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.LtOrEq{o.KeyFieldName: o.KeyFieldValue}}}) + Where(sq.Or{sq.Lt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, + sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, + sq.LtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}}) } else { sqlBuilder = sqlBuilder. - Where(sq.Or{sq.Gt{o.SortByFieldName: o.SortByFieldValue}, - sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.GtOrEq{o.KeyFieldName: o.KeyFieldValue}}}) + Where(sq.Or{sq.Gt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, + sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue}, + sq.GtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}}) } } @@ -194,8 +206,8 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild order = "DESC" } sqlBuilder = sqlBuilder. - OrderBy(fmt.Sprintf("%v %v", o.SortByFieldName, order)). - OrderBy(fmt.Sprintf("%v %v", o.KeyFieldName, order)) + OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.SortByFieldName, order)). + OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.KeyFieldName, order)) return sqlBuilder } @@ -258,6 +270,8 @@ type Listable interface { // APIToModelFieldMap returns a map from field names in the API representation // of the model to its corresponding field name in the model itself. APIToModelFieldMap() map[string]string + // GetModelName returns table name used as sort field prefix. + GetModelName() string } // NextPageToken returns a string that can be used to fetch the subsequent set @@ -292,6 +306,7 @@ func (o *Options) nextPageToken(listable Listable) (*token, error) { KeyFieldValue: keyField.Interface(), IsDesc: o.IsDesc, Filter: o.Filter, + ModelName: o.ModelName, }, nil } diff --git a/backend/src/apiserver/list/list_test.go b/backend/src/apiserver/list/list_test.go index bf8055168a3..e379af8719a 100644 --- a/backend/src/apiserver/list/list_test.go +++ b/backend/src/apiserver/list/list_test.go @@ -39,6 +39,10 @@ func (f *fakeListable) APIToModelFieldMap() map[string]string { return fakeAPIToModelMap } +func (f *fakeListable) GetModelName() string { + return "" +} + func TestNextPageToken_ValidTokens(t *testing.T) { l := &fakeListable{PrimaryKey: "uuid123", FakeName: "Fake", CreatedTimestamp: 1234} diff --git a/backend/src/apiserver/model/BUILD.bazel b/backend/src/apiserver/model/BUILD.bazel index 18561112a27..e6a15d7878f 100644 --- a/backend/src/apiserver/model/BUILD.bazel +++ b/backend/src/apiserver/model/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -10,6 +10,7 @@ go_library( "listable_model.go", "pipeline.go", "pipeline_spec.go", + "pipeline_version.go", "resource_reference.go", "run.go", ], @@ -17,3 +18,22 @@ go_library( visibility = ["//visibility:public"], deps = ["//backend/src/apiserver/common:go_default_library"], ) + +go_test( + name = "go_default_test", + srcs = [ + "pipeline_version_test.go", + ], + importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model", + visibility = ["//visibility:public"], + embed = [":go_default_library"], + deps = [ + "//backend/api:go_default_library", + "//backend/src/apiserver/filter:go_default_library", + "//backend/src/apiserver/list:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp/cmpopts:go_default_library", + "@com_github_masterminds_squirrel//:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + ], +) diff --git a/backend/src/apiserver/model/experiment.go b/backend/src/apiserver/model/experiment.go index 64c90ce6bbc..cdf725d5d97 100644 --- a/backend/src/apiserver/model/experiment.go +++ b/backend/src/apiserver/model/experiment.go @@ -37,3 +37,8 @@ var experimentAPIToModelFieldMap = map[string]string{ func (e *Experiment) APIToModelFieldMap() map[string]string { return experimentAPIToModelFieldMap } + +// GetModelName returns table name used as sort field prefix +func (e *Experiment) GetModelName() string { + return "experiments" +} diff --git a/backend/src/apiserver/model/job.go b/backend/src/apiserver/model/job.go index 138693af427..43d10c51f8b 100644 --- a/backend/src/apiserver/model/job.go +++ b/backend/src/apiserver/model/job.go @@ -97,3 +97,8 @@ var jobAPIToModelFieldMap = map[string]string{ func (k *Job) APIToModelFieldMap() map[string]string { return jobAPIToModelFieldMap } + +// GetModelName returns table name used as sort field prefix +func (j *Job) GetModelName() string { + return "jobs" +} diff --git a/backend/src/apiserver/model/pipeline.go b/backend/src/apiserver/model/pipeline.go index e4232f316ac..5baec4caefc 100644 --- a/backend/src/apiserver/model/pipeline.go +++ b/backend/src/apiserver/model/pipeline.go @@ -31,9 +31,14 @@ type Pipeline struct { CreatedAtInSec int64 `gorm:"column:CreatedAtInSec; not null"` Name string `gorm:"column:Name; not null; unique"` Description string `gorm:"column:Description; not null"` + // TODO(jingzhang36): remove Parameters when no code is accessing this + // field. Should use PipelineVersion.Parameters instead. /* Set size to 65535 so it will be stored as longtext. https://dev.mysql.com/doc/refman/8.0/en/column-count-limit.html */ Parameters string `gorm:"column:Parameters; not null; size:65535"` Status PipelineStatus `gorm:"column:Status; not null"` + // Default version of this pipeline. It could be null. + DefaultVersionId string `gorm:"column:DefaultVersionId;"` + DefaultVersion *PipelineVersion `gorm:"-"` } func (p Pipeline) GetValueOfPrimaryKey() string { @@ -59,6 +64,8 @@ var pipelineAPIToModelFieldMap = map[string]string{ "name": "Name", "created_at": "CreatedAtInSec", "description": "Description", + // TODO(jingzhang36): uncomment this field when we expose it to API + // "default_version_id": "DefaultVersionId", } // APIToModelFieldMap returns a map from API names to field names for model @@ -66,3 +73,8 @@ var pipelineAPIToModelFieldMap = map[string]string{ func (p *Pipeline) APIToModelFieldMap() map[string]string { return pipelineAPIToModelFieldMap } + +// GetModelName returns table name used as sort field prefix +func (p *Pipeline) GetModelName() string { + return "pipelines" +} diff --git a/backend/src/apiserver/model/pipeline_version.go b/backend/src/apiserver/model/pipeline_version.go new file mode 100644 index 00000000000..8ff5137badf --- /dev/null +++ b/backend/src/apiserver/model/pipeline_version.go @@ -0,0 +1,75 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "fmt" +) + +// PipelineVersionStatus a label for the status of the Pipeline. +// This is intend to make pipeline creation and deletion atomic. +type PipelineVersionStatus string + +const ( + PipelineVersionCreating PipelineVersionStatus = "CREATING" + PipelineVersionReady PipelineVersionStatus = "READY" + PipelineVersionDeleting PipelineVersionStatus = "DELETING" +) + +type PipelineVersion struct { + UUID string `gorm:"column:UUID; not null; primary_key"` + CreatedAtInSec int64 `gorm:"column:CreatedAtInSec; not null; index"` + Name string `gorm:"column:Name; not null; unique_index:idx_pipeline_version_uuid_name"` + // Set size to 65535 so it will be stored as longtext. + // https://dev.mysql.com/doc/refman/8.0/en/column-count-limit.html + Parameters string `gorm:"column:Parameters; not null; size:65535"` + // PipelineVersion belongs to Pipeline. If a pipeline with a specific UUID + // is deleted from Pipeline table, all this pipeline's versions will be + // deleted from PipelineVersion table. + PipelineId string `gorm:"column:PipelineId; not null;index;"` + Status PipelineVersionStatus `gorm:"column:Status; not null"` + // Code source url links to the pipeline version's definition in repo. + CodeSourceUrl string `gorm:"column:CodeSourceUrl;"` +} + +func (p PipelineVersion) GetValueOfPrimaryKey() string { + return fmt.Sprint(p.UUID) +} + +// PrimaryKeyColumnName returns the primary key for model PipelineVersion. +func (p *PipelineVersion) PrimaryKeyColumnName() string { + return "UUID" +} + +// DefaultSortField returns the default sorting field for model Pipeline. +func (p *PipelineVersion) DefaultSortField() string { + return "CreatedAtInSec" +} + +// APIToModelFieldMap returns a map from API names to field names for model +// PipelineVersion. +func (p *PipelineVersion) APIToModelFieldMap() map[string]string { + return map[string]string{ + "id": "UUID", + "name": "Name", + "created_at": "CreatedAtInSec", + "status": "Status", + } +} + +// GetModelName returns table name used as sort field prefix +func (p *PipelineVersion) GetModelName() string { + return "pipeline_versions" +} diff --git a/backend/src/apiserver/model/pipeline_version_test.go b/backend/src/apiserver/model/pipeline_version_test.go new file mode 100644 index 00000000000..1746f333952 --- /dev/null +++ b/backend/src/apiserver/model/pipeline_version_test.go @@ -0,0 +1,32 @@ +package model + +import ( + "testing" + + sq "github.com/Masterminds/squirrel" + api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/list" + "github.com/stretchr/testify/assert" +) + +// Test model name usage in sorting clause +func TestAddSortingToSelect(t *testing.T) { + listable := &PipelineVersion{ + UUID: "version_id_1", + CreatedAtInSec: 1, + Name: "version_name_1", + Parameters: "", + PipelineId: "pipeline_id_1", + Status: PipelineVersionReady, + CodeSourceUrl: "", + } + protoFilter := &api.Filter{} + listableOptions, err := list.NewOptions(listable, 10, "name", protoFilter) + assert.Nil(t, err) + sqlBuilder := sq.Select("*").From("pipeline_versions") + sql, _, err := listableOptions.AddSortingToSelect(sqlBuilder).ToSql() + assert.Nil(t, err) + + assert.Contains(t, sql, "pipeline_versions.Name") // sorting field + assert.Contains(t, sql, "pipeline_versions.UUID") // primary key field +} diff --git a/backend/src/apiserver/model/run.go b/backend/src/apiserver/model/run.go index 3e0aa4e780c..8536878879d 100644 --- a/backend/src/apiserver/model/run.go +++ b/backend/src/apiserver/model/run.go @@ -81,3 +81,10 @@ var runAPIToModelFieldMap = map[string]string{ func (r *Run) APIToModelFieldMap() map[string]string { return runAPIToModelFieldMap } + +// GetModelName returns table name used as sort field prefix +func (r *Run) GetModelName() string { + // TODO(jingzhang36): return run_details here, and use model name as alias + // and thus as prefix in sorting fields. + return "" +} diff --git a/backend/src/apiserver/resource/model_converter.go b/backend/src/apiserver/resource/model_converter.go index 1dd0b047fee..82f6d0b9e4e 100644 --- a/backend/src/apiserver/resource/model_converter.go +++ b/backend/src/apiserver/resource/model_converter.go @@ -111,6 +111,29 @@ func (r *ResourceManager) ToModelJob(job *api.Job, swf *util.ScheduledWorkflow, }, nil } +func (r *ResourceManager) ToModelPipelineVersion(version *api.PipelineVersion) (*model.PipelineVersion, error) { + paramStr, err := toModelParameters(version.Parameters) + if err != nil { + return nil, err + } + + var pipelineId string + for _, resourceReference := range version.ResourceReferences { + if resourceReference.Key.Type == api.ResourceType_PIPELINE { + pipelineId = resourceReference.Key.Id + } + } + + return &model.PipelineVersion{ + UUID: string(version.Id), + Name: version.Name, + CreatedAtInSec: version.CreatedAt.Seconds, + Parameters: paramStr, + PipelineId: pipelineId, + CodeSourceUrl: version.CodeSourceUrl, + }, nil +} + func toModelTrigger(trigger *api.Trigger) model.Trigger { modelTrigger := model.Trigger{} if trigger == nil { @@ -161,7 +184,7 @@ func toModelParameters(apiParams []*api.Parameter) (string, error) { } func (r *ResourceManager) toModelResourceReferences( - resourceId string, resourceType common.ResourceType, apiRefs []*api.ResourceReference) ([]*model.ResourceReference, error) { + resourceId string, resourceType common.ResourceType, apiRefs []*api.ResourceReference) ([]*model.ResourceReference, error) { var modelRefs []*model.ResourceReference for _, apiRef := range apiRefs { modelReferenceType, err := common.ToModelResourceType(apiRef.Key.Type) diff --git a/backend/src/apiserver/resource/model_converter_test.go b/backend/src/apiserver/resource/model_converter_test.go index 9afd77a6a78..a607ced0636 100644 --- a/backend/src/apiserver/resource/model_converter_test.go +++ b/backend/src/apiserver/resource/model_converter_test.go @@ -230,3 +230,36 @@ func TestToModelResourceReferences_ReferredExperimentNotFound(t *testing.T) { assert.NotNil(t, err) assert.Contains(t, err.Error(), "Failed to find the referred resource") } + +func TestToModelPipelineVersion(t *testing.T) { + store, manager := initResourceManager() + defer store.Close() + apiPipelineVersion := &api.PipelineVersion{ + Id: "pipelineversion1", + CreatedAt: ×tamp.Timestamp{Seconds: 1}, + Parameters: []*api.Parameter{}, + CodeSourceUrl: "http://repo/11111", + ResourceReferences: []*api.ResourceReference{ + &api.ResourceReference{ + Key: &api.ResourceKey{ + Id: "pipeline1", + Type: api.ResourceType_PIPELINE, + }, + Relationship: api.Relationship_OWNER, + }, + }, + } + + convertedModelPipelineVersion, _ := manager.ToModelPipelineVersion( + apiPipelineVersion) + + expectedModelPipelineVersion := &model.PipelineVersion{ + UUID: "pipelineversion1", + CreatedAtInSec: 1, + Parameters: "", + PipelineId: "pipeline1", + CodeSourceUrl: "http://repo/11111", + } + + assert.Equal(t, convertedModelPipelineVersion, expectedModelPipelineVersion) +} diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index e959617cf59..57e617d1b8c 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -40,7 +40,7 @@ import ( const ( defaultPipelineRunnerServiceAccountEnvVar = "DefaultPipelineRunnerServiceAccount" - defaultPipelineRunnerServiceAccount = "pipeline-runner" + defaultPipelineRunnerServiceAccount = "pipeline-runner" ) type ClientManagerInterface interface { @@ -106,7 +106,7 @@ func (r *ResourceManager) GetExperiment(experimentId string) (*model.Experiment, } func (r *ResourceManager) ListExperiments(opts *list.Options) ( - experiments []*model.Experiment, total_size int, nextPageToken string, err error) { + experiments []*model.Experiment, total_size int, nextPageToken string, err error) { return r.experimentStore.ListExperiments(opts) } @@ -119,7 +119,7 @@ func (r *ResourceManager) DeleteExperiment(experimentID string) error { } func (r *ResourceManager) ListPipelines(opts *list.Options) ( - pipelines []*model.Pipeline, total_size int, nextPageToken string, err error) { + pipelines []*model.Pipeline, total_size int, nextPageToken string, err error) { return r.pipelineStore.ListPipelines(opts) } @@ -142,6 +142,12 @@ func (r *ResourceManager) DeletePipeline(pipelineId string) error { // Delete pipeline file and DB entry. // Not fail the request if this step failed. A background run will do the cleanup. // https://github.com/kubeflow/pipelines/issues/388 + // TODO(jingzhang36): For now (before exposing version API), we have only 1 + // file with both pipeline and version pointing to it; so it is ok to do + // the deletion as follows. After exposing version API, we can have multiple + // versions and hence multiple files, and we shall improve performance by + // either using async deletion in order for this method to be non-blocking + // or or exploring other performance optimization tools provided by gcs. err = r.objectStore.DeleteFile(storage.CreatePipelinePath(fmt.Sprint(pipelineId))) if err != nil { glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline file for pipeline %v", pipelineId)) @@ -162,20 +168,34 @@ func (r *ResourceManager) CreatePipeline(name string, description string, pipeli } // Create an entry with status of creating the pipeline - pipeline := &model.Pipeline{Name: name, Description: description, Parameters: params, Status: model.PipelineCreating} + pipeline := &model.Pipeline{ + Name: name, + Description: description, + Parameters: params, + Status: model.PipelineCreating, + DefaultVersion: &model.PipelineVersion{ + Name: name, + Parameters: params, + Status: model.PipelineVersionCreating}} newPipeline, err := r.pipelineStore.CreatePipeline(pipeline) if err != nil { return nil, util.Wrap(err, "Create pipeline failed") } - // Store the pipeline file - err = r.objectStore.AddFile(pipelineFile, storage.CreatePipelinePath(fmt.Sprint(newPipeline.UUID))) + // Store the pipeline file to a path dependent on pipeline version + err = r.objectStore.AddFile(pipelineFile, + storage.CreatePipelinePath(fmt.Sprint(newPipeline.DefaultVersion.UUID))) if err != nil { return nil, util.Wrap(err, "Create pipeline failed") } newPipeline.Status = model.PipelineReady - err = r.pipelineStore.UpdatePipelineStatus(newPipeline.UUID, newPipeline.Status) + newPipeline.DefaultVersion.Status = model.PipelineVersionReady + err = r.pipelineStore.UpdatePipelineAndVersionsStatus( + newPipeline.UUID, + newPipeline.Status, + newPipeline.DefaultVersionId, + newPipeline.DefaultVersion.Status) if err != nil { return nil, util.Wrap(err, "Create pipeline failed") } @@ -186,14 +206,22 @@ func (r *ResourceManager) UpdatePipelineStatus(pipelineId string, status model.P return r.pipelineStore.UpdatePipelineStatus(pipelineId, status) } +func (r *ResourceManager) UpdatePipelineVersionStatus(pipelineId string, status model.PipelineVersionStatus) error { + return r.pipelineStore.UpdatePipelineVersionStatus(pipelineId, status) +} + func (r *ResourceManager) GetPipelineTemplate(pipelineId string) ([]byte, error) { // Verify pipeline exist - _, err := r.pipelineStore.GetPipeline(pipelineId) + pipeline, err := r.pipelineStore.GetPipeline(pipelineId) if err != nil { return nil, util.Wrap(err, "Get pipeline template failed") } - template, err := r.objectStore.GetFile(storage.CreatePipelinePath(fmt.Sprint(pipelineId))) + if pipeline.DefaultVersion == nil { + return nil, util.Wrap(err, + "Get pipeline template failed since no default version is defined") + } + template, err := r.objectStore.GetFile(storage.CreatePipelinePath(fmt.Sprint(pipeline.DefaultVersion.UUID))) if err != nil { return nil, util.Wrap(err, "Get pipeline template failed") } @@ -278,7 +306,7 @@ func (r *ResourceManager) GetRun(runId string) (*model.RunDetail, error) { } func (r *ResourceManager) ListRuns(filterContext *common.FilterContext, - opts *list.Options) (runs []*model.Run, total_size int, nextPageToken string, err error) { + opts *list.Options) (runs []*model.Run, total_size int, nextPageToken string, err error) { return r.runStore.ListRuns(filterContext, opts) } @@ -309,7 +337,7 @@ func (r *ResourceManager) DeleteRun(runID string) error { } func (r *ResourceManager) ListJobs(filterContext *common.FilterContext, - opts *list.Options) (jobs []*model.Job, total_size int, nextPageToken string, err error) { + opts *list.Options) (jobs []*model.Job, total_size int, nextPageToken string, err error) { return r.jobStore.ListJobs(filterContext, opts) } @@ -783,6 +811,6 @@ func (r *ResourceManager) MarkSampleLoaded() error { return r.dBStatusStore.MarkSampleLoaded() } -func (r *ResourceManager) getDefaultSA() string{ +func (r *ResourceManager) getDefaultSA() string { return common.GetStringConfigWithDefault(defaultPipelineRunnerServiceAccountEnvVar, defaultPipelineRunnerServiceAccount) -} \ No newline at end of file +} diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 572a8e4fb26..5416770adc7 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -164,19 +164,33 @@ func initWithOneTimeFailedRun(t *testing.T) (*FakeClientManager, *ResourceManage } func createPipeline(name string) *model.Pipeline { - return &model.Pipeline{Name: name, Status: model.PipelineReady} + return &model.Pipeline{ + Name: name, + Status: model.PipelineReady, + DefaultVersion: &model.PipelineVersion{ + Name: name + "_version", + Status: model.PipelineVersionReady, + }} } func TestCreatePipeline(t *testing.T) { store, _, pipeline := initWithPipeline(t) defer store.Close() pipelineExpected := &model.Pipeline{ - UUID: DefaultFakeUUID, - CreatedAtInSec: 1, - Name: "p1", - Parameters: "[{\"name\":\"param1\"}]", - Status: model.PipelineReady, - } + UUID: DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "p1", + Parameters: "[{\"name\":\"param1\"}]", + Status: model.PipelineReady, + DefaultVersionId: DefaultFakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "p1", + Parameters: "[{\"name\":\"param1\"}]", + Status: model.PipelineVersionReady, + PipelineId: DefaultFakeUUID, + }} assert.Equal(t, pipelineExpected, pipeline) } @@ -208,7 +222,7 @@ func TestCreatePipeline_StorePipelineMetadataError(t *testing.T) { manager := NewResourceManager(store) _, err := manager.CreatePipeline("pipeline1", "", []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode()) - assert.Contains(t, err.Error(), "Failed to add pipeline to pipeline table") + assert.Contains(t, err.Error(), "Failed to start a transaction to create a new pipeline") } func TestCreatePipeline_CreatePipelineFileError(t *testing.T) { diff --git a/backend/src/apiserver/server/api_converter.go b/backend/src/apiserver/server/api_converter.go index ed6b3faadc8..20da9ee97d3 100644 --- a/backend/src/apiserver/server/api_converter.go +++ b/backend/src/apiserver/server/api_converter.go @@ -57,13 +57,60 @@ func ToApiPipeline(pipeline *model.Pipeline) *api.Pipeline { Error: err.Error(), } } + + // TODO(jingzhang36): uncomment when exposing versions to API. + // defaultVersion, err := ToApiPipelineVersion(pipeline.DefaultVersion) + // if err != nil { + // return &api.Pipeline{ + // Id: pipeline.UUID, + // Error: err.Error(), + // } + // } + return &api.Pipeline{ Id: pipeline.UUID, CreatedAt: ×tamp.Timestamp{Seconds: pipeline.CreatedAtInSec}, Name: pipeline.Name, Description: pipeline.Description, Parameters: params, + // DefaultVersion: defaultVersion, + } +} + +func ToApiPipelineVersion(version *model.PipelineVersion) (*api.PipelineVersion, error) { + if version == nil { + return nil, nil + } + params, err := toApiParameters(version.Parameters) + if err != nil { + return nil, err + } + + return &api.PipelineVersion{ + Id: version.UUID, + Name: version.Name, + CreatedAt: ×tamp.Timestamp{Seconds: version.CreatedAtInSec}, + Parameters: params, + CodeSourceUrl: version.CodeSourceUrl, + ResourceReferences: []*api.ResourceReference{ + &api.ResourceReference{ + Key: &api.ResourceKey{ + Id: version.PipelineId, + Type: api.ResourceType_PIPELINE, + }, + Relationship: api.Relationship_OWNER, + }, + }, + }, nil +} + +func ToApiPipelineVersions(versions []*model.PipelineVersion) ([]*api.PipelineVersion, error) { + apiVersions := make([]*api.PipelineVersion, 0) + for _, version := range versions { + v, _ := ToApiPipelineVersion(version) + apiVersions = append(apiVersions, v) } + return apiVersions, nil } func ToApiPipelines(pipelines []*model.Pipeline) []*api.Pipeline { diff --git a/backend/src/apiserver/server/api_converter_test.go b/backend/src/apiserver/server/api_converter_test.go index 7759ad951a5..e123a2ccf9e 100644 --- a/backend/src/apiserver/server/api_converter_test.go +++ b/backend/src/apiserver/server/api_converter_test.go @@ -30,12 +30,36 @@ func TestToApiPipeline(t *testing.T) { UUID: "pipeline1", CreatedAtInSec: 1, Parameters: "[]", + // TODO(jingzhang36): uncomment when exposing versions to API. + // DefaultVersion: &model.PipelineVersion{ + // UUID: "pipelineversion1", + // CreatedAtInSec: 1, + // Parameters: "[]", + // PipelineId: "pipeline1", + // CodeSourceUrl: "http://repo/22222", + // }, } apiPipeline := ToApiPipeline(modelPipeline) expectedApiPipeline := &api.Pipeline{ Id: "pipeline1", CreatedAt: ×tamp.Timestamp{Seconds: 1}, Parameters: []*api.Parameter{}, + // TODO(jingzhang36): uncomment when exposing versions to API. + // DefaultVersion: &api.PipelineVersion{ + // Id: "pipelineversion1", + // CreatedAt: ×tamp.Timestamp{Seconds: 1}, + // Parameters: []*api.Parameter{}, + // CodeSourceUrl: "http://repo/22222", + // ResourceReferences: []*api.ResourceReference{ + // &api.ResourceReference{ + // Key: &api.ResourceKey{ + // Id: "pipeline1", + // Type: api.ResourceType_PIPELINE, + // }, + // Relationship: api.Relationship_OWNER, + // }, + // }, + // }, } assert.Equal(t, expectedApiPipeline, apiPipeline) } @@ -45,6 +69,8 @@ func TestToApiPipeline_ErrorParsingField(t *testing.T) { UUID: "pipeline1", CreatedAtInSec: 1, Parameters: "[invalid parameter", + // TODO(jingzhang36): uncomment when exposing versions to API. + // DefaultVersion: &model.PipelineVersion{}, } apiPipeline := ToApiPipeline(modelPipeline) expectedApiPipeline := &api.Pipeline{ diff --git a/backend/src/apiserver/server/list_request_util_test.go b/backend/src/apiserver/server/list_request_util_test.go index 074c22b4299..76607317cf7 100644 --- a/backend/src/apiserver/server/list_request_util_test.go +++ b/backend/src/apiserver/server/list_request_util_test.go @@ -241,6 +241,10 @@ func (f *fakeListable) APIToModelFieldMap() map[string]string { return fakeAPIToModelMap } +func (f *fakeListable) GetModelName() string { + return "" +} + func TestValidatedListOptions_Errors(t *testing.T) { opts, err := list.NewOptions(&fakeListable{}, 10, "name asc", nil) if err != nil { diff --git a/backend/src/apiserver/server/pipeline_upload_server_test.go b/backend/src/apiserver/server/pipeline_upload_server_test.go index dfd9a531e56..2cf127d5371 100644 --- a/backend/src/apiserver/server/pipeline_upload_server_test.go +++ b/backend/src/apiserver/server/pipeline_upload_server_test.go @@ -65,11 +65,20 @@ func TestUploadPipeline_YAML(t *testing.T) { // Verify metadata in db pkgsExpect := []*model.Pipeline{ { - UUID: resource.DefaultFakeUUID, - CreatedAtInSec: 1, - Name: "hello-world.yaml", - Parameters: "[]", - Status: model.PipelineReady}} + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "hello-world.yaml", + Parameters: "[]", + Status: model.PipelineReady, + DefaultVersionId: resource.DefaultFakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "hello-world.yaml", + Parameters: "[]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }}} pkg, total_size, str, err := clientManager.PipelineStore().ListPipelines(opts) assert.Nil(t, err) assert.Equal(t, str, "") @@ -107,11 +116,20 @@ func TestUploadPipeline_Tarball(t *testing.T) { // Verify metadata in db pkgsExpect := []*model.Pipeline{ { - UUID: resource.DefaultFakeUUID, - CreatedAtInSec: 1, - Name: "arguments.tar.gz", - Parameters: "[{\"name\":\"param1\",\"value\":\"hello\"},{\"name\":\"param2\"}]", - Status: model.PipelineReady}} + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "arguments.tar.gz", + Parameters: "[{\"name\":\"param1\",\"value\":\"hello\"},{\"name\":\"param2\"}]", + Status: model.PipelineReady, + DefaultVersionId: resource.DefaultFakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "arguments.tar.gz", + Parameters: "[{\"name\":\"param1\",\"value\":\"hello\"},{\"name\":\"param2\"}]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }}} pkg, total_size, str, err := clientManager.PipelineStore().ListPipelines(opts) assert.Nil(t, err) assert.Equal(t, str, "") @@ -166,11 +184,20 @@ func TestUploadPipeline_SpecifyFileName(t *testing.T) { // Verify metadata in db pkgsExpect := []*model.Pipeline{ { - UUID: resource.DefaultFakeUUID, - CreatedAtInSec: 1, - Name: "foo bar", - Parameters: "[]", - Status: model.PipelineReady}} + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "foo bar", + Parameters: "[]", + Status: model.PipelineReady, + DefaultVersionId: resource.DefaultFakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "foo bar", + Parameters: "[]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }}} pkg, total_size, str, err := clientManager.PipelineStore().ListPipelines(opts) assert.Nil(t, err) assert.Equal(t, 1, total_size) diff --git a/backend/src/apiserver/server/util_test.go b/backend/src/apiserver/server/util_test.go index 91912b1e3d4..e76473202d1 100644 --- a/backend/src/apiserver/server/util_test.go +++ b/backend/src/apiserver/server/util_test.go @@ -1,14 +1,15 @@ package server import ( - api "github.com/kubeflow/pipelines/backend/api/go_client" - "github.com/kubeflow/pipelines/backend/src/apiserver/resource" - "github.com/kubeflow/pipelines/backend/src/common/util" - "github.com/stretchr/testify/assert" "io/ioutil" "os" "strings" "testing" + + api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/resource" + "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/stretchr/testify/assert" ) func TestGetPipelineName_QueryStringNotEmpty(t *testing.T) { @@ -330,4 +331,4 @@ func TestValidatePipelineSpec_ParameterTooLong(t *testing.T) { err := ValidatePipelineSpec(manager, spec) assert.NotNil(t, err) assert.Contains(t, err.Error(), "The input parameter length exceed maximum size") -} \ No newline at end of file +} diff --git a/backend/src/apiserver/storage/db_fake.go b/backend/src/apiserver/storage/db_fake.go index 21d0043d779..5f24bc772e5 100644 --- a/backend/src/apiserver/storage/db_fake.go +++ b/backend/src/apiserver/storage/db_fake.go @@ -34,6 +34,7 @@ func NewFakeDb() (*DB, error) { &model.Experiment{}, &model.Job{}, &model.Pipeline{}, + &model.PipelineVersion{}, &model.ResourceReference{}, &model.RunDetail{}, &model.RunMetric{}, diff --git a/backend/src/apiserver/storage/pipeline_store.go b/backend/src/apiserver/storage/pipeline_store.go index 7367abdaa89..02dce6aaf74 100644 --- a/backend/src/apiserver/storage/pipeline_store.go +++ b/backend/src/apiserver/storage/pipeline_store.go @@ -25,6 +25,24 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" ) +// The order of the selected columns must match the order used in scan rows. +var pipelineColumns = []string{ + "pipelines.UUID", + "pipelines.CreatedAtInSec", + "pipelines.Name", + "pipelines.Description", + "pipelines.Parameters", + "pipelines.Status", + "pipelines.DefaultVersionId", + "pipeline_versions.UUID", + "pipeline_versions.CreatedAtInSec", + "pipeline_versions.Name", + "pipeline_versions.Parameters", + "pipeline_versions.PipelineId", + "pipeline_versions.Status", + "pipeline_versions.CodeSourceUrl", +} + type PipelineStoreInterface interface { ListPipelines(opts *list.Options) ([]*model.Pipeline, int, string, error) GetPipeline(pipelineId string) (*model.Pipeline, error) @@ -32,6 +50,12 @@ type PipelineStoreInterface interface { DeletePipeline(pipelineId string) error CreatePipeline(*model.Pipeline) (*model.Pipeline, error) UpdatePipelineStatus(string, model.PipelineStatus) error + + // Change status of a particular version. + UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error + // TODO(jingzhang36): remove this temporary method after resource manager's + // CreatePipeline stops using it. + UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error } type PipelineStore struct { @@ -48,10 +72,13 @@ func (s *PipelineStore) ListPipelines(opts *list.Options) ([]*model.Pipeline, in } buildQuery := func(sqlBuilder sq.SelectBuilder) sq.SelectBuilder { - return sqlBuilder.From("pipelines").Where(sq.Eq{"Status": model.PipelineReady}) + return sqlBuilder. + From("pipelines"). + LeftJoin("pipeline_versions ON pipelines.DefaultVersionId = pipeline_versions.UUID"). + Where(sq.Eq{"pipelines.Status": model.PipelineReady}) } - sqlBuilder := buildQuery(sq.Select("*")) + sqlBuilder := buildQuery(sq.Select(pipelineColumns...)) // SQL for row list rowsSql, rowsArgs, err := opts.AddPaginationToSelect(sqlBuilder).ToSql() @@ -115,18 +142,57 @@ func (s *PipelineStore) scanRows(rows *sql.Rows) ([]*model.Pipeline, error) { var pipelines []*model.Pipeline for rows.Next() { var uuid, name, parameters, description string + var defaultVersionId sql.NullString var createdAtInSec int64 var status model.PipelineStatus - if err := rows.Scan(&uuid, &createdAtInSec, &name, &description, ¶meters, &status); err != nil { + var versionUUID, versionName, versionParameters, versionPipelineId, versionCodeSourceUrl, versionStatus sql.NullString + var versionCreatedAtInSec sql.NullInt64 + if err := rows.Scan( + &uuid, + &createdAtInSec, + &name, + &description, + ¶meters, + &status, + &defaultVersionId, + &versionUUID, + &versionCreatedAtInSec, + &versionName, + &versionParameters, + &versionPipelineId, + &versionStatus, + &versionCodeSourceUrl); err != nil { return nil, err } - pipelines = append(pipelines, &model.Pipeline{ - UUID: uuid, - CreatedAtInSec: createdAtInSec, - Name: name, - Description: description, - Parameters: parameters, - Status: status}) + if defaultVersionId.Valid { + pipelines = append(pipelines, &model.Pipeline{ + UUID: uuid, + CreatedAtInSec: createdAtInSec, + Name: name, + Description: description, + Parameters: parameters, + Status: status, + DefaultVersionId: defaultVersionId.String, + DefaultVersion: &model.PipelineVersion{ + UUID: versionUUID.String, + CreatedAtInSec: versionCreatedAtInSec.Int64, + Name: versionName.String, + Parameters: versionParameters.String, + PipelineId: versionPipelineId.String, + Status: model.PipelineVersionStatus(versionStatus.String), + CodeSourceUrl: versionCodeSourceUrl.String, + }}) + } else { + pipelines = append(pipelines, &model.Pipeline{ + UUID: uuid, + CreatedAtInSec: createdAtInSec, + Name: name, + Description: description, + Parameters: parameters, + Status: status, + DefaultVersionId: "", + DefaultVersion: nil}) + } } return pipelines, nil } @@ -137,10 +203,11 @@ func (s *PipelineStore) GetPipeline(id string) (*model.Pipeline, error) { func (s *PipelineStore) GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error) { sql, args, err := sq. - Select("*"). + Select(pipelineColumns...). From("pipelines"). - Where(sq.Eq{"uuid": id}). - Where(sq.Eq{"status": status}). + LeftJoin("pipeline_versions on pipelines.DefaultVersionId = pipeline_versions.UUID"). + Where(sq.Eq{"pipelines.uuid": id}). + Where(sq.Eq{"pipelines.Status": status}). Limit(1).ToSql() if err != nil { return nil, util.NewInternalServerError(err, "Failed to create query to get pipeline: %v", err.Error()) @@ -175,6 +242,7 @@ func (s *PipelineStore) DeletePipeline(id string) error { } func (s *PipelineStore) CreatePipeline(p *model.Pipeline) (*model.Pipeline, error) { + // Set up creation time, UUID and sql query for pipeline. newPipeline := *p now := s.time.Now().Unix() newPipeline.CreatedAtInSec = now @@ -183,30 +251,96 @@ func (s *PipelineStore) CreatePipeline(p *model.Pipeline) (*model.Pipeline, erro return nil, util.NewInternalServerError(err, "Failed to create a pipeline id.") } newPipeline.UUID = id.String() + // TODO(jingzhang36): remove default version id assignment after version API + // is ready. + newPipeline.DefaultVersionId = id.String() sql, args, err := sq. Insert("pipelines"). SetMap( sq.Eq{ - "UUID": newPipeline.UUID, - "CreatedAtInSec": newPipeline.CreatedAtInSec, - "Name": newPipeline.Name, - "Description": newPipeline.Description, - "Parameters": newPipeline.Parameters, - "Status": string(newPipeline.Status)}). + "UUID": newPipeline.UUID, + "CreatedAtInSec": newPipeline.CreatedAtInSec, + "Name": newPipeline.Name, + "Description": newPipeline.Description, + "Parameters": newPipeline.Parameters, + "Status": string(newPipeline.Status), + "DefaultVersionId": newPipeline.DefaultVersionId}). ToSql() if err != nil { return nil, util.NewInternalServerError(err, "Failed to create query to insert pipeline to pipeline table: %v", err.Error()) } - _, err = s.db.Exec(sql, args...) + + // Set up creation time, UUID and sql query for pipeline. + // TODO(jingzhang36): remove version related operations from CreatePipeline + // when version API is ready. Before that we create an implicit version + // inside CreatePipeline method. And this implicit version has the same UUID + // as pipeline; and thus FE can use either pipeline UUID or version UUID to + // retrieve pipeline package. + if newPipeline.DefaultVersion == nil { + newPipeline.DefaultVersion = &model.PipelineVersion{ + Name: newPipeline.Name, + Parameters: newPipeline.Parameters, + Status: model.PipelineVersionCreating, + CodeSourceUrl: ""} + } + newPipeline.DefaultVersion.CreatedAtInSec = now + newPipeline.DefaultVersion.PipelineId = id.String() + newPipeline.DefaultVersion.UUID = id.String() + sqlPipelineVersions, argsPipelineVersions, err := sq. + Insert("pipeline_versions"). + SetMap( + sq.Eq{ + "UUID": newPipeline.DefaultVersion.UUID, + "CreatedAtInSec": newPipeline.DefaultVersion.CreatedAtInSec, + "Name": newPipeline.DefaultVersion.Name, + "Parameters": newPipeline.DefaultVersion.Parameters, + "Status": string(newPipeline.DefaultVersion.Status), + "PipelineId": newPipeline.UUID, + "CodeSourceUrl": newPipeline.DefaultVersion.CodeSourceUrl}). + ToSql() + if err != nil { + return nil, util.NewInternalServerError(err, + `Failed to create query to insert pipeline version to + pipeline_versions table: %v`, err.Error()) + } + + // In a transaction, we insert into both pipelines and pipeline_versions. + tx, err := s.db.Begin() + if err != nil { + return nil, util.NewInternalServerError(err, + `Failed to start a transaction to create a new pipeline: %v`, + err.Error()) + } + _, err = tx.Exec(sql, args...) if err != nil { if s.db.IsDuplicateError(err) { + tx.Rollback() return nil, util.NewInvalidInputError( "Failed to create a new pipeline. The name %v already exist. Please specify a new name.", p.Name) } + tx.Rollback() return nil, util.NewInternalServerError(err, "Failed to add pipeline to pipeline table: %v", err.Error()) } + _, err = tx.Exec(sqlPipelineVersions, argsPipelineVersions...) + if err != nil { + if s.db.IsDuplicateError(err) { + tx.Rollback() + return nil, util.NewInvalidInputError( + `Failed to create a new pipeline version. The name %v already + exist. Please specify a new name.`, p.DefaultVersion.Name) + } + tx.Rollback() + return nil, util.NewInternalServerError(err, + "Failed to add pipeline version to pipeline_versions table: %v", + err.Error()) + } + if err := tx.Commit(); err != nil { + return nil, util.NewInternalServerError(err, + `Failed to update pipelines and pipeline_versions in a + transaction: %v`, err.Error()) + } return &newPipeline, nil } @@ -226,6 +360,67 @@ func (s *PipelineStore) UpdatePipelineStatus(id string, status model.PipelineSta return nil } +func (s *PipelineStore) UpdatePipelineVersionStatus(id string, status model.PipelineVersionStatus) error { + sql, args, err := sq. + Update("pipeline_versions"). + SetMap(sq.Eq{"Status": status}). + Where(sq.Eq{"UUID": id}). + ToSql() + if err != nil { + return util.NewInternalServerError(err, + `Failed to create query to update the pipeline version + metadata: %s`, err.Error()) + } + _, err = s.db.Exec(sql, args...) + if err != nil { + return util.NewInternalServerError(err, + "Failed to update the pipeline version metadata: %s", err.Error()) + } + return nil +} + +func (s *PipelineStore) UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error { + tx, err := s.db.Begin() + + sql, args, err := sq. + Update("pipelines"). + SetMap(sq.Eq{"Status": status}). + Where(sq.Eq{"UUID": id}). + ToSql() + if err != nil { + tx.Rollback() + return util.NewInternalServerError(err, "Failed to create query to update the pipeline status: %s", err.Error()) + } + _, err = tx.Exec(sql, args...) + if err != nil { + tx.Rollback() + return util.NewInternalServerError(err, "Failed to update the pipeline status: %s", err.Error()) + } + sql, args, err = sq. + Update("pipeline_versions"). + SetMap(sq.Eq{"Status": pipelineVersionStatus}). + Where(sq.Eq{"UUID": pipelineVersionId}). + ToSql() + if err != nil { + tx.Rollback() + return util.NewInternalServerError(err, + `Failed to create query to update the pipeline version + status: %s`, err.Error()) + } + _, err = tx.Exec(sql, args...) + if err != nil { + tx.Rollback() + return util.NewInternalServerError(err, + "Failed to update the pipeline version status: %s", err.Error()) + } + + if err := tx.Commit(); err != nil { + return util.NewInternalServerError(err, + "Failed to update pipeline status and its version status: %v", err) + } + return nil +} + // factory function for pipeline store func NewPipelineStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *PipelineStore { return &PipelineStore{db: db, time: time, uuid: uuid} diff --git a/backend/src/apiserver/storage/pipeline_store_test.go b/backend/src/apiserver/storage/pipeline_store_test.go index 4ee9d0f6a31..980acff52c3 100644 --- a/backend/src/apiserver/storage/pipeline_store_test.go +++ b/backend/src/apiserver/storage/pipeline_store_test.go @@ -32,7 +32,15 @@ const ( ) func createPipeline(name string) *model.Pipeline { - return &model.Pipeline{Name: name, Parameters: `[{"Name": "param1"}]`, Status: model.PipelineReady} + return &model.Pipeline{ + Name: name, + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersion: &model.PipelineVersion{ + Name: name, + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineVersionReady, + }} } func TestListPipelines_FilterOutNotReady(t *testing.T) { @@ -43,20 +51,43 @@ func TestListPipelines_FilterOutNotReady(t *testing.T) { pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDTwo, nil) pipelineStore.CreatePipeline(createPipeline("pipeline2")) pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDThree, nil) - pipelineStore.CreatePipeline(&model.Pipeline{Name: "pipeline3", Status: model.PipelineCreating}) + pipelineStore.CreatePipeline(&model.Pipeline{ + Name: "pipeline3", + Status: model.PipelineCreating, + DefaultVersion: &model.PipelineVersion{ + Name: "pipeline3", + Status: model.PipelineVersionCreating}}) expectedPipeline1 := &model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionReady, + }} expectedPipeline2 := &model.Pipeline{ - UUID: fakeUUIDTwo, - CreatedAtInSec: 2, - Name: "pipeline2", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDTwo, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDTwo, + Status: model.PipelineVersionReady, + }} pipelinesExpected := []*model.Pipeline{expectedPipeline1, expectedPipeline2} opts, err := list.NewOptions(&model.Pipeline{}, 10, "id", nil) @@ -82,17 +113,35 @@ func TestListPipelines_Pagination(t *testing.T) { pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(fakeUUIDFour, nil) pipelineStore.CreatePipeline(createPipeline("pipeline2")) expectedPipeline1 := &model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionReady, + }} expectedPipeline4 := &model.Pipeline{ - UUID: fakeUUIDFour, - CreatedAtInSec: 4, - Name: "pipeline2", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDFour, + CreatedAtInSec: 4, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDFour, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDFour, + CreatedAtInSec: 4, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDFour, + Status: model.PipelineVersionReady, + }} pipelinesExpected := []*model.Pipeline{expectedPipeline1, expectedPipeline4} opts, err := list.NewOptions(&model.Pipeline{}, 2, "name", nil) @@ -104,17 +153,35 @@ func TestListPipelines_Pagination(t *testing.T) { assert.Equal(t, pipelinesExpected, pipelines) expectedPipeline2 := &model.Pipeline{ - UUID: fakeUUIDTwo, - CreatedAtInSec: 2, - Name: "pipeline3", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline3", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDTwo, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline3", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDTwo, + Status: model.PipelineVersionReady, + }} expectedPipeline3 := &model.Pipeline{ - UUID: fakeUUIDThree, - CreatedAtInSec: 3, - Name: "pipeline4", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDThree, + CreatedAtInSec: 3, + Name: "pipeline4", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDThree, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDThree, + CreatedAtInSec: 3, + Name: "pipeline4", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDThree, + Status: model.PipelineVersionReady, + }} pipelinesExpected2 := []*model.Pipeline{expectedPipeline2, expectedPipeline3} opts, err = list.NewOptionsFromToken(nextPageToken, 2) @@ -140,17 +207,35 @@ func TestListPipelines_Pagination_Descend(t *testing.T) { pipelineStore.CreatePipeline(createPipeline("pipeline2")) expectedPipeline2 := &model.Pipeline{ - UUID: fakeUUIDTwo, - CreatedAtInSec: 2, - Name: "pipeline3", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline3", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDTwo, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDTwo, + CreatedAtInSec: 2, + Name: "pipeline3", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDTwo, + Status: model.PipelineVersionReady, + }} expectedPipeline3 := &model.Pipeline{ - UUID: fakeUUIDThree, - CreatedAtInSec: 3, - Name: "pipeline4", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDThree, + CreatedAtInSec: 3, + Name: "pipeline4", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDThree, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDThree, + CreatedAtInSec: 3, + Name: "pipeline4", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDThree, + Status: model.PipelineVersionReady, + }} pipelinesExpected := []*model.Pipeline{expectedPipeline3, expectedPipeline2} opts, err := list.NewOptions(&model.Pipeline{}, 2, "name desc", nil) @@ -162,17 +247,35 @@ func TestListPipelines_Pagination_Descend(t *testing.T) { assert.Equal(t, pipelinesExpected, pipelines) expectedPipeline1 := &model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionReady, + }} expectedPipeline4 := &model.Pipeline{ - UUID: fakeUUIDFour, - CreatedAtInSec: 4, - Name: "pipeline2", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUIDFour, + CreatedAtInSec: 4, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUIDFour, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUIDFour, + CreatedAtInSec: 4, + Name: "pipeline2", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUIDFour, + Status: model.PipelineVersionReady, + }} pipelinesExpected2 := []*model.Pipeline{expectedPipeline4, expectedPipeline1} opts, err = list.NewOptionsFromToken(nextPageToken, 2) @@ -190,11 +293,20 @@ func TestListPipelines_Pagination_LessThanPageSize(t *testing.T) { pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) pipelineStore.CreatePipeline(createPipeline("pipeline1")) expectedPipeline1 := &model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionReady, + }} pipelinesExpected := []*model.Pipeline{expectedPipeline1} opts, err := list.NewOptions(&model.Pipeline{}, 2, "", nil) @@ -223,12 +335,20 @@ func TestGetPipeline(t *testing.T) { pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) pipelineStore.CreatePipeline(createPipeline("pipeline1")) pipelineExpected := model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady, - } + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + PipelineId: fakeUUID, + Status: model.PipelineVersionReady, + }} pipeline, err := pipelineStore.GetPipeline(fakeUUID) assert.Nil(t, err) @@ -239,7 +359,14 @@ func TestGetPipeline_NotFound_Creating(t *testing.T) { db := NewFakeDbOrFatal() defer db.Close() pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) - pipelineStore.CreatePipeline(&model.Pipeline{Name: "pipeline3", Status: model.PipelineCreating}) + pipelineStore.CreatePipeline( + &model.Pipeline{ + Name: "pipeline3", + Status: model.PipelineCreating, + DefaultVersion: &model.PipelineVersion{ + Name: "pipeline3", + Status: model.PipelineVersionCreating, + }}) _, err := pipelineStore.GetPipeline(fakeUUID) assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode(), @@ -271,11 +398,20 @@ func TestCreatePipeline(t *testing.T) { defer db.Close() pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) pipelineExpected := model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineReady} + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineReady, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineVersionReady, + PipelineId: fakeUUID, + }} pipeline := createPipeline("pipeline1") pipeline, err := pipelineStore.CreatePipeline(pipeline) @@ -297,7 +433,9 @@ func TestCreatePipeline_DuplicateKey(t *testing.T) { } func TestCreatePipeline_InternalServerError(t *testing.T) { - pipeline := &model.Pipeline{Name: "Pipeline123"} + pipeline := &model.Pipeline{ + Name: "Pipeline123", + DefaultVersion: &model.PipelineVersion{}} db := NewFakeDbOrFatal() defer db.Close() pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(fakeUUID, nil)) @@ -335,14 +473,25 @@ func TestUpdatePipelineStatus(t *testing.T) { pipeline, err := pipelineStore.CreatePipeline(createPipeline("pipeline1")) assert.Nil(t, err) pipelineExpected := model.Pipeline{ - UUID: fakeUUID, - CreatedAtInSec: 1, - Name: "pipeline1", - Parameters: `[{"Name": "param1"}]`, - Status: model.PipelineDeleting, + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineDeleting, + DefaultVersionId: fakeUUID, + DefaultVersion: &model.PipelineVersion{ + UUID: fakeUUID, + CreatedAtInSec: 1, + Name: "pipeline1", + Parameters: `[{"Name": "param1"}]`, + Status: model.PipelineVersionDeleting, + PipelineId: fakeUUID, + }, } err = pipelineStore.UpdatePipelineStatus(pipeline.UUID, model.PipelineDeleting) assert.Nil(t, err) + err = pipelineStore.UpdatePipelineVersionStatus(pipeline.UUID, model.PipelineVersionDeleting) + assert.Nil(t, err) pipeline, err = pipelineStore.GetPipelineWithStatus(fakeUUID, model.PipelineDeleting) assert.Nil(t, err) assert.Equal(t, pipelineExpected, *pipeline) diff --git a/backend/src/apiserver/storage/run_store.go b/backend/src/apiserver/storage/run_store.go index 36dd3385241..12c138f60f6 100644 --- a/backend/src/apiserver/storage/run_store.go +++ b/backend/src/apiserver/storage/run_store.go @@ -76,7 +76,7 @@ type RunStore struct { // total_size. The total_size does not reflect the page size, but it does reflect the number of runs // matching the supplied filters and resource references. func (s *RunStore) ListRuns( - filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) { + filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) { errorF := func(err error) ([]*model.Run, int, string, error) { return nil, 0, "", util.NewInternalServerError(err, "Failed to list runs: %v", err) } @@ -142,7 +142,7 @@ func (s *RunStore) ListRuns( } func (s *RunStore) buildSelectRunsQuery(selectCount bool, opts *list.Options, - filterContext *common.FilterContext) (string, []interface{}, error) { + filterContext *common.FilterContext) (string, []interface{}, error) { filteredSelectBuilder, err := list.FilterOnResourceReference("run_details", runColumns, common.Run, selectCount, filterContext) if err != nil { @@ -218,7 +218,7 @@ func (s *RunStore) scanRowsToRunDetails(rows *sql.Rows) ([]*model.RunDetail, err var runs []*model.RunDetail for rows.Next() { var uuid, displayName, name, storageState, namespace, description, pipelineId, pipelineName, pipelineSpecManifest, - workflowSpecManifest, parameters, conditions, pipelineRuntimeManifest, workflowRuntimeManifest string + workflowSpecManifest, parameters, conditions, pipelineRuntimeManifest, workflowRuntimeManifest string var createdAtInSec, scheduledAtInSec, finishedAtInSec int64 var metricsInString, resourceReferencesInString sql.NullString err := rows.Scan( @@ -312,31 +312,31 @@ func (s *RunStore) CreateRun(r *model.RunDetail) (*model.RunDetail, error) { if r.StorageState == "" { r.StorageState = api.Run_STORAGESTATE_AVAILABLE.String() } else if r.StorageState != api.Run_STORAGESTATE_AVAILABLE.String() && - r.StorageState != api.Run_STORAGESTATE_ARCHIVED.String() { + r.StorageState != api.Run_STORAGESTATE_ARCHIVED.String() { return nil, util.NewInvalidInputError("Invalid value for StorageState field: %q.", r.StorageState) } runSql, runArgs, err := sq. Insert("run_details"). - SetMap(sq.Eq{ - "UUID": r.UUID, - "DisplayName": r.DisplayName, - "Name": r.Name, - "StorageState": r.StorageState, - "Namespace": r.Namespace, - "Description": r.Description, - "CreatedAtInSec": r.CreatedAtInSec, - "ScheduledAtInSec": r.ScheduledAtInSec, - "FinishedAtInSec": r.FinishedAtInSec, - "Conditions": r.Conditions, - "WorkflowRuntimeManifest": r.WorkflowRuntimeManifest, - "PipelineRuntimeManifest": r.PipelineRuntimeManifest, - "PipelineId": r.PipelineId, - "PipelineName": r.PipelineName, - "PipelineSpecManifest": r.PipelineSpecManifest, - "WorkflowSpecManifest": r.WorkflowSpecManifest, - "Parameters": r.Parameters, - }).ToSql() + SetMap(sq.Eq{ + "UUID": r.UUID, + "DisplayName": r.DisplayName, + "Name": r.Name, + "StorageState": r.StorageState, + "Namespace": r.Namespace, + "Description": r.Description, + "CreatedAtInSec": r.CreatedAtInSec, + "ScheduledAtInSec": r.ScheduledAtInSec, + "FinishedAtInSec": r.FinishedAtInSec, + "Conditions": r.Conditions, + "WorkflowRuntimeManifest": r.WorkflowRuntimeManifest, + "PipelineRuntimeManifest": r.PipelineRuntimeManifest, + "PipelineId": r.PipelineId, + "PipelineName": r.PipelineName, + "PipelineSpecManifest": r.PipelineSpecManifest, + "WorkflowSpecManifest": r.WorkflowSpecManifest, + "Parameters": r.Parameters, + }).ToSql() if err != nil { return nil, util.NewInternalServerError(err, "Failed to create query to store run to run table: '%v/%v", r.Namespace, r.Name) @@ -374,10 +374,10 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int sql, args, err := sq. Update("run_details"). - SetMap(sq.Eq{ - "Conditions": condition, - "FinishedAtInSec": finishedAtInSec, - "WorkflowRuntimeManifest": workflowRuntimeManifest}). + SetMap(sq.Eq{ + "Conditions": condition, + "FinishedAtInSec": finishedAtInSec, + "WorkflowRuntimeManifest": workflowRuntimeManifest}). Where(sq.Eq{"UUID": runID}). ToSql() if err != nil { @@ -429,9 +429,9 @@ func (s *RunStore) CreateOrUpdateRun(runDetail *model.RunDetail) error { func (s *RunStore) ArchiveRun(runId string) error { sql, args, err := sq. Update("run_details"). - SetMap(sq.Eq{ - "StorageState": api.Run_STORAGESTATE_ARCHIVED.String(), - }). + SetMap(sq.Eq{ + "StorageState": api.Run_STORAGESTATE_ARCHIVED.String(), + }). Where(sq.Eq{"UUID": runId}). ToSql() @@ -452,9 +452,9 @@ func (s *RunStore) ArchiveRun(runId string) error { func (s *RunStore) UnarchiveRun(runId string) error { sql, args, err := sq. Update("run_details"). - SetMap(sq.Eq{ - "StorageState": api.Run_STORAGESTATE_AVAILABLE.String(), - }). + SetMap(sq.Eq{ + "StorageState": api.Run_STORAGESTATE_AVAILABLE.String(), + }). Where(sq.Eq{"UUID": runId}). ToSql() @@ -511,13 +511,13 @@ func (s *RunStore) ReportMetric(metric *model.RunMetric) (err error) { } sql, args, err := sq. Insert("run_metrics"). - SetMap(sq.Eq{ - "RunUUID": metric.RunUUID, - "NodeID": metric.NodeID, - "Name": metric.Name, - "NumberValue": metric.NumberValue, - "Format": metric.Format, - "Payload": string(payloadBytes)}).ToSql() + SetMap(sq.Eq{ + "RunUUID": metric.RunUUID, + "NodeID": metric.NodeID, + "Name": metric.Name, + "NumberValue": metric.NumberValue, + "Format": metric.Format, + "Payload": string(payloadBytes)}).ToSql() if err != nil { return util.NewInternalServerError(err, "failed to create query for inserting metric: %+v", metric) diff --git a/backend/src/apiserver/storage/run_store_test.go b/backend/src/apiserver/storage/run_store_test.go index 8ea50479dac..e9e49b578ea 100644 --- a/backend/src/apiserver/storage/run_store_test.go +++ b/backend/src/apiserver/storage/run_store_test.go @@ -685,13 +685,13 @@ func TestGetRun_InvalidMetricPayload_Ignore(t *testing.T) { defer db.Close() sql, args, _ := sq. Insert("run_metrics"). - SetMap(sq.Eq{ - "RunUUID": "1", - "NodeID": "node1", - "Name": "accuracy", - "NumberValue": 0.88, - "Format": "RAW", - "Payload": "{ invalid; json,"}).ToSql() + SetMap(sq.Eq{ + "RunUUID": "1", + "NodeID": "node1", + "Name": "accuracy", + "NumberValue": 0.88, + "Format": "RAW", + "Payload": "{ invalid; json,"}).ToSql() db.Exec(sql, args...) runDetail, err := runStore.GetRun("1")