-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pipeline version to job/run integration test so that job/run is c… #3270
Changes from 4 commits
373e792
0f828a2
9585484
14bafd8
ec1c820
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,15 @@ func (s *JobApiTestSuite) TestJobApis() { | |
helloWorldPipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams()) | ||
assert.Nil(t, err) | ||
|
||
/* ---------- Upload pipeline version YAML ---------- */ | ||
time.Sleep(1 * time.Second) | ||
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion( | ||
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{ | ||
Name: util.StringPointer("hello-world-version"), | ||
Pipelineid: util.StringPointer(helloWorldPipeline.ID), | ||
}) | ||
assert.Nil(t, err) | ||
|
||
/* ---------- Create a new hello world experiment ---------- */ | ||
experiment := &experiment_model.APIExperiment{Name: "hello world experiment"} | ||
helloWorldExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) | ||
|
@@ -97,24 +106,23 @@ func (s *JobApiTestSuite) TestJobApis() { | |
createJobRequest := &jobparams.CreateJobParams{Body: &job_model.APIJob{ | ||
Name: "hello world", | ||
Description: "this is hello world", | ||
PipelineSpec: &job_model.APIPipelineSpec{ | ||
PipelineID: helloWorldPipeline.ID, | ||
}, | ||
ResourceReferences: []*job_model.APIResourceReference{ | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: helloWorldExperiment.ID}, | ||
Relationship: job_model.APIRelationshipOWNER}, | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: helloWorldPipelineVersion.ID}, | ||
Relationship: job_model.APIRelationshipCREATOR}, | ||
}, | ||
MaxConcurrency: 10, | ||
Enabled: true, | ||
}} | ||
helloWorldJob, err := s.jobClient.Create(createJobRequest) | ||
assert.Nil(t, err) | ||
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID) | ||
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name) | ||
|
||
/* ---------- Get hello world job ---------- */ | ||
helloWorldJob, err = s.jobClient.Get(&jobparams.GetJobParams{ID: helloWorldJob.ID}) | ||
assert.Nil(t, err) | ||
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipeline.ID) | ||
s.checkHelloWorldJob(t, helloWorldJob, helloWorldExperiment.ID, helloWorldExperiment.Name, helloWorldPipelineVersion.ID, helloWorldPipelineVersion.Name) | ||
|
||
/* ---------- Create a new argument parameter experiment ---------- */ | ||
experiment = &experiment_model.APIExperiment{Name: "argument parameter experiment"} | ||
|
@@ -228,15 +236,24 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { | |
pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams()) | ||
assert.Nil(t, err) | ||
|
||
/* ---------- Upload pipeline version YAML ---------- */ | ||
time.Sleep(1 * time.Second) | ||
helloWorldPipelineVersion, err := s.pipelineUploadClient.UploadPipelineVersion( | ||
"../resources/hello-world.yaml", &uploadParams.UploadPipelineVersionParams{ | ||
Name: util.StringPointer("hello-world-version"), | ||
Pipelineid: util.StringPointer(pipeline.ID), | ||
}) | ||
assert.Nil(t, err) | ||
|
||
/* ---------- Create a periodic job with start and end date in the past and catchup = true ---------- */ | ||
experiment := &experiment_model.APIExperiment{Name: "periodic catchup true"} | ||
periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) | ||
assert.Nil(t, err) | ||
|
||
job := jobInThePastForTwoMinutes(jobOptions{ | ||
pipelineId: pipeline.ID, | ||
experimentId: periodicCatchupTrueExperiment.ID, | ||
periodic: true, | ||
pipelineVersionId: helloWorldPipelineVersion.ID, | ||
experimentId: periodicCatchupTrueExperiment.ID, | ||
periodic: true, | ||
}) | ||
job.Name = "periodic-catchup-true-" | ||
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule." | ||
|
@@ -251,9 +268,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { | |
assert.Nil(t, err) | ||
|
||
job = jobInThePastForTwoMinutes(jobOptions{ | ||
pipelineId: pipeline.ID, | ||
experimentId: periodicCatchupFalseExperiment.ID, | ||
periodic: true, | ||
pipelineVersionId: helloWorldPipelineVersion.ID, | ||
experimentId: periodicCatchupFalseExperiment.ID, | ||
periodic: true, | ||
}) | ||
job.Name = "periodic-catchup-false-" | ||
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule." | ||
|
@@ -268,9 +285,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { | |
assert.Nil(t, err) | ||
|
||
job = jobInThePastForTwoMinutes(jobOptions{ | ||
pipelineId: pipeline.ID, | ||
experimentId: cronCatchupTrueExperiment.ID, | ||
periodic: false, | ||
pipelineVersionId: helloWorldPipelineVersion.ID, | ||
experimentId: cronCatchupTrueExperiment.ID, | ||
periodic: false, | ||
}) | ||
job.Name = "cron-catchup-true-" | ||
job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule." | ||
|
@@ -285,9 +302,9 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { | |
assert.Nil(t, err) | ||
|
||
job = jobInThePastForTwoMinutes(jobOptions{ | ||
pipelineId: pipeline.ID, | ||
experimentId: cronCatchupFalseExperiment.ID, | ||
periodic: false, | ||
pipelineVersionId: helloWorldPipelineVersion.ID, | ||
experimentId: cronCatchupFalseExperiment.ID, | ||
periodic: false, | ||
}) | ||
job.Name = "cron-catchup-false-" | ||
job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule." | ||
|
@@ -324,31 +341,37 @@ func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { | |
assert.Equal(t, 1, runsWhenCatchupFalse) | ||
} | ||
|
||
func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineID string) { | ||
func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineVersionId string, pipelineVersionName string) { | ||
// Check workflow manifest is not empty | ||
assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay") | ||
|
||
// Check resource references contain experiment and pipeline version. | ||
resourceReferences := []*job_model.APIResourceReference{ | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentID}, | ||
Name: experimentName, Relationship: job_model.APIRelationshipOWNER, | ||
}, | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId}, | ||
Name: pipelineVersionName, Relationship: job_model.APIRelationshipCREATOR}, | ||
} | ||
assert.Len(t, job.ResourceReferences, 2) | ||
assert.Subset(t, job.ResourceReferences, resourceReferences) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did a little searching, is this a better choice: https://godoc.org/github.com/stretchr/testify/assert#Assertions.ElementsMatch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
// Check other fields in job object (other than resource references) | ||
job.ResourceReferences = nil | ||
expectedJob := &job_model.APIJob{ | ||
ID: job.ID, | ||
Name: "hello world", | ||
Description: "this is hello world", | ||
PipelineSpec: &job_model.APIPipelineSpec{ | ||
PipelineID: pipelineID, | ||
PipelineName: "hello-world.yaml", | ||
WorkflowManifest: job.PipelineSpec.WorkflowManifest, | ||
}, | ||
ResourceReferences: []*job_model.APIResourceReference{ | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentID}, | ||
Name: experimentName, Relationship: job_model.APIRelationshipOWNER, | ||
}, | ||
}, | ||
MaxConcurrency: 10, | ||
Enabled: true, | ||
CreatedAt: job.CreatedAt, | ||
UpdatedAt: job.UpdatedAt, | ||
Status: job.Status, | ||
Trigger: &job_model.APITrigger{}, | ||
} | ||
|
||
assert.Equal(t, expectedJob, job) | ||
} | ||
|
||
|
@@ -420,8 +443,6 @@ func TestJobApi(t *testing.T) { | |
suite.Run(t, new(JobApiTestSuite)) | ||
} | ||
|
||
// TODO(jingzhang36): include UploadPipelineVersion in integration test | ||
|
||
func (s *JobApiTestSuite) TearDownSuite() { | ||
if *runIntegrationTests { | ||
if !*isDevMode { | ||
|
@@ -439,16 +460,15 @@ func (s *JobApiTestSuite) cleanUp() { | |
test.DeleteAllRuns(s.runClient, s.T()) | ||
} | ||
|
||
func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob { | ||
func defaultApiJob(pipelineVersionId, experimentId string) *job_model.APIJob { | ||
return &job_model.APIJob{ | ||
Name: "default-pipeline-name", | ||
Description: "This is a default pipeline", | ||
PipelineSpec: &job_model.APIPipelineSpec{ | ||
PipelineID: pipelineId, | ||
}, | ||
ResourceReferences: []*job_model.APIResourceReference{ | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentId}, | ||
Relationship: job_model.APIRelationshipOWNER}, | ||
{Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypePIPELINEVERSION, ID: pipelineVersionId}, | ||
Relationship: job_model.APIRelationshipCREATOR}, | ||
}, | ||
MaxConcurrency: 10, | ||
NoCatchup: false, | ||
|
@@ -464,15 +484,15 @@ func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob { | |
} | ||
|
||
type jobOptions struct { | ||
pipelineId, experimentId string | ||
periodic bool | ||
pipelineVersionId, experimentId string | ||
periodic bool | ||
} | ||
|
||
func jobInThePastForTwoMinutes(options jobOptions) *job_model.APIJob { | ||
startTime := strfmt.DateTime(time.Unix(10*hour, 0)) | ||
endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0)) | ||
|
||
job := defaultApiJob(options.pipelineId, options.experimentId) | ||
job := defaultApiJob(options.pipelineVersionId, options.experimentId) | ||
if options.periodic { | ||
job.Trigger = &job_model.APITrigger{ | ||
PeriodicSchedule: &job_model.APIPeriodicSchedule{ | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion:
Shall we keep existing tests and add a new one testing using pipeline version?
So that we can keep test coverage for using pipelines in Job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we are going to deprecate the usage of pipeline id to create run/job. And all runs and jobs must be from some version. The to-be-deprecated approach to creating from pipeline is actually using its default version as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
I'm curious why we are deprecating that usage, I thought we are going to be backward-compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a little bit of history. Once upon a time, we are trying to follow the practice of CMLE api. In that approach, a pipeline is allowed to contain no pipeline versions at all. Then we figure it might be more flexible and more extensible if we always go from versions. No matter we allow pipeline to have empty versions or not, creating from versions works. And moreover, we always have a default version, so from the UI, user open a pipeline, its details page actually shows its default version and when run is created from there, we naturally use default version to create run. And blahblah... and also, if we really really really just want pipeline, it should be specified in resource reference field instead of the pipeline spec field, since that resource reference field is meant and added to have a uniform way to specify dependencies between our objects/resources...
So in short, specifying pipeline in the pipeline spec is going to be deprecated for actually two reasons: (1) we want creation from pipeline versions (2) we want to use resource reference field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so it will be supported to specify pipeline id from resource references? (is it already supported?)
So it's still a breaking change, but we have a migration path, is that right?
Do you have a tracking list for TODO items like updating SDK to not use the spec field?
e.g. here
pipelines/sdk/python/kfp/_client.py
Lines 333 to 338 in 119e329
Also, the pipeline spec field also accepts json string of the entire pipeline manifest to run. I think that usage is required for cloning runs. Will we deprecate that?
Looks like you have migrated all UI usages of pipeline_spec with pipeline_id, SGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the SDK, two cases:
Our own script can be fixed by us
User script will get warning that pipeline id in pipeline spec.
Another important reason why we prefer to refer to version instead of pipeline is that default version can change. When user specify a pipeline, we use the default version since all real manifests are conceptually linked to versions. Later the default version changes, and if GetRun only give back the pipeline id used to create the run, user wouldn't know which version was used at the creation time and they'll have to rely the manifest associated with the run at creation time. It is ok but messy.
So even when we allow the user still uses pipeline id in pipeline spec, internally we'll still record the run with a reference to the default version at the creation time. Hence implement the TODO at
pipelines/backend/src/apiserver/resource/resource_manager.go
Line 771 in 1d42e3f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, an opportunity for us to totally remove all legacies is to introduce breaking changes (that is, surface the internal change to the sdk client interface) at KFP 1.0......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree on these, just want to double check if you have put the following in your plan like:
For this part, it's totally reasonable, but it doesn't convince me we need to stop people from using pipeline id to create runs. Your mentioned recording pipeline version id at run creation time sounds like the right fix (and is not hacky at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
(1) the breaking change is optional, we can choose not to. That's my original plan (not to make the sdk client interface change). When we keep the SDK client interface the same, we only have a different implementation underhood (use resource reference instead of pipeline id in spec).
(2) the reason I talked about breaking change is IF we want it, we can leverage KF 1.0 as a proper time to do. Alternatively, as I suggested before, we put warning message without committing any change for a period of time. And the actual commit of the breaking change can wait until 2.0 or never....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline: we will fix sdk to workaround the breaking change