Skip to content

Commit

Permalink
Add necessary data types to api and database to support pipeline vers…
Browse files Browse the repository at this point in the history
…ion. (#1873)

* Add necessary data types/tables for pipeline version. Mostly based
on Yang's branch at https://github.com/IronPan/pipelines/tree/kfpci/.
Backward compatible.

* Modified comment

* Modify api converter according with new pipeline (version) definition

* Change pipeline_store for DefaultVersionId field

* Add pipeline spec to pipeline version

* fix model converter

* fix a comment

* Add foreign key, pagination of list request, refactor code source

* Refactor code source

* Foreign key

* Change code source and package source type

* Fix ; separator

* Add versions table and modify existing pipeline apis

* Remove api pipeline defintiion change and leave it for later PR

* Add comment

* Make schema changing and data backfilling a single transaction

* Tolerate null default version id in code

* fix status

* Revise delete pipeline func

* Use raw query to migrate data

* No need to update versions status

* rename and minor changes

* accidentally removed a where clause

* Fix a model name prefix

* Refine comments

* Revise if condition

* Address comments

* address more comments

* Rearrange pipeline and version related parts inside CreatePipeline, to make them more separate.

* Add package url to pipeline version. Required when calling CreatePipelineVersionRequest

* Single code source url; remove pipeline id as sorting field; reformat

* resolve remote branch and local branch diff

* remove unused func

* Remove an empty line
  • Loading branch information
jingzhang36 authored and k8s-ci-robot committed Sep 26, 2019
1 parent 7735a14 commit 7aaecb1
Show file tree
Hide file tree
Showing 27 changed files with 1,041 additions and 193 deletions.
69 changes: 68 additions & 1 deletion backend/api/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package api;
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "backend/api/parameter.proto";
import "backend/api/error.proto";
import "backend/api/parameter.proto";
import "backend/api/pipeline_spec.proto";
import "backend/api/resource_reference.proto";
import "protoc-gen-swagger/options/annotations.proto";

option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
Expand Down Expand Up @@ -136,6 +138,32 @@ message GetTemplateResponse {
string template = 1;
}

message GetPipelineVersionTemplateRequest{
string version_id = 1;
}

message CreatePipelineVersionRequest {
// ResourceReference inside PipelineVersion specifies the pipeline that this
// version belongs to.
PipelineVersion version = 1;
}

message GetPipelineVersionRequest {
string version_id = 1;
}

message ListPipelineVersionsRequest {
// ResourceKey specifies the pipeline whose versions are to be listed.
ResourceKey resource_key = 1;
int32 page_size = 2;
string page_token = 3;
}

message ListPipelineVersionsResponse {
repeated PipelineVersion versions = 1;
string next_page_token = 2;
}

message Pipeline {
// Output. Unique pipeline ID. Generated by API server.
string id = 1;
Expand All @@ -151,14 +179,53 @@ message Pipeline {
string description = 4;

// Output. The input parameters for this pipeline.
// TODO(jingzhang36): replace this parameters field with the parameters field
// inside PipelineVersion when all usage of the former has been changed to use
// the latter.
repeated Parameter parameters = 5;

// The URL to the source of the pipeline. This is required when creating the
// pipeine through CreatePipeline API.
// TODO(jingzhang36): replace this url field with the code_source_urls field
// inside PipelineVersion when all usage of the former has been changed to use
// the latter.
Url url = 7;

// In case any error happens retrieving a pipeline field, only pipeline ID
// and the error message is returned. Client has the flexibility of choosing
// how to handle error. This is especially useful during listing call.
string error = 6;

// Output only. The default version of the pipeline. As of now, the latest
// version is used as default. (In the future, if desired by customers, we
// can allow them to set default version.)
// TODO(jingzhang36): expose this in API pipeline definition with FE changes.
// PipelineVersion default_version = 8;
}

message PipelineVersion {
// Output. Unique version ID. Generated by API server.
string id = 1;

// Optional input field. Version name provided by user.
string name = 2;

// Output. The time this pipeline version is created.
google.protobuf.Timestamp created_at = 3;

// Output. The input parameters for this pipeline.
repeated Parameter parameters = 4;

// Input. Optional. Pipeline version code source.
string code_source_url = 5;

// Input. Required. Pipeline version package url.
// Whe calling CreatePipelineVersion API method, need to provide one package
// file location.
Url package_url = 6;

// Input. Required. E.g., specify which pipeline this pipeline version belongs
// to.
repeated ResourceReference resource_references = 7;
}

1 change: 1 addition & 0 deletions backend/api/resource_reference.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ enum ResourceType {
UNKNOWN_RESOURCE_TYPE = 0;
EXPERIMENT = 1;
JOB = 2;
PIPELINE = 3;
}

enum Relationship {
Expand Down
1 change: 1 addition & 0 deletions backend/src/apiserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_jinzhu_gorm//:go_default_library",
"@com_github_jinzhu_gorm//dialects/sqlite:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_spf13_viper//:go_default_library",
"@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
Expand Down
56 changes: 56 additions & 0 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,24 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
db, err := gorm.Open(driverName, arg)
util.TerminateIfError(err)

// If pipeline_versions table is introduced into DB for the first time,
// it needs initialization or data backfill.
var tableNames []string
var initializePipelineVersions = true
db.Raw(`show tables`).Pluck("Tables_in_mlpipeline", &tableNames)
for _, tableName := range tableNames {
if tableName == "pipeline_versions" {
initializePipelineVersions = false
break
}
}

// Create table
response := db.AutoMigrate(
&model.Experiment{},
&model.Job{},
&model.Pipeline{},
&model.PipelineVersion{},
&model.ResourceReference{},
&model.RunDetail{},
&model.RunMetric{},
Expand All @@ -201,6 +214,18 @@ func initDBClient(initConnectionTimeout time.Duration) *storage.DB {
if response.Error != nil {
glog.Fatalf("Failed to create a foreign key for RunID in run_metrics table. Error: %s", response.Error)
}
response = db.Model(&model.PipelineVersion{}).
AddForeignKey("PipelineId", "pipelines(UUID)", "CASCADE" /* onDelete */, "CASCADE" /* update */)
if response.Error != nil {
glog.Fatalf("Failed to create a foreign key for PipelineId in pipeline_versions table. Error: %s", response.Error)
}

// Data backfill for pipeline_versions if this is the first time for
// pipeline_versions to enter mlpipeline DB.
if initializePipelineVersions {
initPipelineVersionsFromPipelines(db)
}

return storage.NewDB(db.DB(), storage.NewMySQLDialect())
}

Expand Down Expand Up @@ -288,3 +313,34 @@ func newClientManager() ClientManager {

return clientManager
}

// Data migration in 2 steps to introduce pipeline_versions table. This
// migration shall be called only once when pipeline_versions table is created
// for the first time in DB.
func initPipelineVersionsFromPipelines(db *gorm.DB) {
tx := db.Begin()

// Step 1: duplicate pipelines to pipeline versions.
// The pipeline versions created here are not through KFP pipeine version
// API, and are only for the legacy pipelines that are created
// before pipeline version API is introduced.
// For those legacy pipelines, who don't have versions before, we create one
// implicit version for each of them. Given a legacy pipeline, the implicit
// version created here is assigned an ID the same as the pipeline ID. This
// way we don't need to move the minio file of pipeline package around,
// since the minio file's path is based on the pipeline ID (and now on the
// implicit version ID too). Meanwhile, IDs are required to be unique inside
// the same resource type, so pipeline and pipeline version as two different
// resources useing the same ID is OK.
// On the other hand, pipeline and its pipeline versions created after
// pipeline version API is introduced will have different Ids; and the minio
// file will be put directly into the directories for pipeline versions.
tx.Exec(`INSERT INTO
pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId)
SELECT UUID, Name, CreatedAtInSec, Parameters, Status, UUID FROM pipelines;`)

// Step 2: modifiy pipelines table after pipeline_versions are populated.
tx.Exec("update pipelines set DefaultVersionId=UUID;")

tx.Commit()
}
29 changes: 22 additions & 7 deletions backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type token struct {
KeyFieldValue interface{}
// IsDesc is true if the sorting order should be descending.
IsDesc bool
// ModelName is the table where ***FieldName belongs to.
ModelName string
// Filter represents the filtering that should be applied in the query.
Filter *filter.Filter
}
Expand Down Expand Up @@ -125,7 +127,9 @@ func NewOptions(listable Listable, pageSize int, sortBy string, filterProto *api
return nil, err
}

token := &token{KeyFieldName: listable.PrimaryKeyColumnName()}
token := &token{
KeyFieldName: listable.PrimaryKeyColumnName(),
ModelName: listable.GetModelName()}

// Ignore the case of the letter. Split query string by space.
queryList := strings.Fields(strings.ToLower(sortBy))
Expand Down Expand Up @@ -177,15 +181,23 @@ func (o *Options) AddPaginationToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBu
// containing these.
func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuilder {
// If next row's value is specified, set those values in the clause.
var modelNamePrefix string
if len(o.ModelName) == 0 {
modelNamePrefix = ""
} else {
modelNamePrefix = o.ModelName + "."
}
if o.SortByFieldValue != nil && o.KeyFieldValue != nil {
if o.IsDesc {
sqlBuilder = sqlBuilder.
Where(sq.Or{sq.Lt{o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.LtOrEq{o.KeyFieldName: o.KeyFieldValue}}})
Where(sq.Or{sq.Lt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.LtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}})
} else {
sqlBuilder = sqlBuilder.
Where(sq.Or{sq.Gt{o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{o.SortByFieldName: o.SortByFieldValue}, sq.GtOrEq{o.KeyFieldName: o.KeyFieldValue}}})
Where(sq.Or{sq.Gt{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.And{sq.Eq{modelNamePrefix + o.SortByFieldName: o.SortByFieldValue},
sq.GtOrEq{modelNamePrefix + o.KeyFieldName: o.KeyFieldValue}}})
}
}

Expand All @@ -194,8 +206,8 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
order = "DESC"
}
sqlBuilder = sqlBuilder.
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldName, order))
OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.SortByFieldName, order)).
OrderBy(fmt.Sprintf("%v %v", modelNamePrefix+o.KeyFieldName, order))

return sqlBuilder
}
Expand Down Expand Up @@ -258,6 +270,8 @@ type Listable interface {
// APIToModelFieldMap returns a map from field names in the API representation
// of the model to its corresponding field name in the model itself.
APIToModelFieldMap() map[string]string
// GetModelName returns table name used as sort field prefix.
GetModelName() string
}

// NextPageToken returns a string that can be used to fetch the subsequent set
Expand Down Expand Up @@ -292,6 +306,7 @@ func (o *Options) nextPageToken(listable Listable) (*token, error) {
KeyFieldValue: keyField.Interface(),
IsDesc: o.IsDesc,
Filter: o.Filter,
ModelName: o.ModelName,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions backend/src/apiserver/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (f *fakeListable) APIToModelFieldMap() map[string]string {
return fakeAPIToModelMap
}

func (f *fakeListable) GetModelName() string {
return ""
}

func TestNextPageToken_ValidTokens(t *testing.T) {
l := &fakeListable{PrimaryKey: "uuid123", FakeName: "Fake", CreatedTimestamp: 1234}

Expand Down
22 changes: 21 additions & 1 deletion backend/src/apiserver/model/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -10,10 +10,30 @@ go_library(
"listable_model.go",
"pipeline.go",
"pipeline_spec.go",
"pipeline_version.go",
"resource_reference.go",
"run.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
deps = ["//backend/src/apiserver/common:go_default_library"],
)

go_test(
name = "go_default_test",
srcs = [
"pipeline_version_test.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/model",
visibility = ["//visibility:public"],
embed = [":go_default_library"],
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/filter:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
"@com_github_masterminds_squirrel//:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
],
)
5 changes: 5 additions & 0 deletions backend/src/apiserver/model/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ var experimentAPIToModelFieldMap = map[string]string{
func (e *Experiment) APIToModelFieldMap() map[string]string {
return experimentAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (e *Experiment) GetModelName() string {
return "experiments"
}
5 changes: 5 additions & 0 deletions backend/src/apiserver/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ var jobAPIToModelFieldMap = map[string]string{
func (k *Job) APIToModelFieldMap() map[string]string {
return jobAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (j *Job) GetModelName() string {
return "jobs"
}
12 changes: 12 additions & 0 deletions backend/src/apiserver/model/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ type Pipeline struct {
CreatedAtInSec int64 `gorm:"column:CreatedAtInSec; not null"`
Name string `gorm:"column:Name; not null; unique"`
Description string `gorm:"column:Description; not null"`
// TODO(jingzhang36): remove Parameters when no code is accessing this
// field. Should use PipelineVersion.Parameters instead.
/* Set size to 65535 so it will be stored as longtext. https://dev.mysql.com/doc/refman/8.0/en/column-count-limit.html */
Parameters string `gorm:"column:Parameters; not null; size:65535"`
Status PipelineStatus `gorm:"column:Status; not null"`
// Default version of this pipeline. It could be null.
DefaultVersionId string `gorm:"column:DefaultVersionId;"`
DefaultVersion *PipelineVersion `gorm:"-"`
}

func (p Pipeline) GetValueOfPrimaryKey() string {
Expand All @@ -59,10 +64,17 @@ var pipelineAPIToModelFieldMap = map[string]string{
"name": "Name",
"created_at": "CreatedAtInSec",
"description": "Description",
// TODO(jingzhang36): uncomment this field when we expose it to API
// "default_version_id": "DefaultVersionId",
}

// APIToModelFieldMap returns a map from API names to field names for model
// Pipeline.
func (p *Pipeline) APIToModelFieldMap() map[string]string {
return pipelineAPIToModelFieldMap
}

// GetModelName returns table name used as sort field prefix
func (p *Pipeline) GetModelName() string {
return "pipelines"
}
Loading

0 comments on commit 7aaecb1

Please sign in to comment.