diff --git a/backend/test/integration/BUILD.bazel b/backend/test/integration/BUILD.bazel index fd6644e8f74f..e331cb08d5f5 100644 --- a/backend/test/integration/BUILD.bazel +++ b/backend/test/integration/BUILD.bazel @@ -28,6 +28,7 @@ go_test( "//backend/test:go_default_library", "@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library", "@com_github_ghodss_yaml//:go_default_library", + "@com_github_go_openapi_strfmt//:go_default_library", "@com_github_golang_glog//:go_default_library", "@com_github_stretchr_testify//assert:go_default_library", "@com_github_stretchr_testify//suite:go_default_library", diff --git a/backend/test/integration/job_api_test.go b/backend/test/integration/job_api_test.go index 6aa796c82674..724afeb6413d 100644 --- a/backend/test/integration/job_api_test.go +++ b/backend/test/integration/job_api_test.go @@ -7,6 +7,7 @@ import ( "github.com/kubeflow/pipelines/backend/test" + "github.com/go-openapi/strfmt" "github.com/golang/glog" experimentparams "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_client/experiment_service" "github.com/kubeflow/pipelines/backend/api/go_http_client/experiment_model" @@ -23,6 +24,12 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" ) +const ( + second = 1 + minute = 60 * second + hour = 60 * minute +) + type JobApiTestSuite struct { suite.Suite namespace string @@ -214,6 +221,109 @@ func (s *JobApiTestSuite) TestJobApis() { s.checkArgParamsRun(t, argParamsRun, argParamsExperiment.ID, argParamsExperiment.Name, argParamsJob.ID, argParamsJob.Name) } +func (s *JobApiTestSuite) TestJobApis_noCatchupOption() { + t := s.T() + + /* ---------- Upload pipelines YAML ---------- */ + pipeline, err := s.pipelineUploadClient.UploadFile("../resources/hello-world.yaml", uploadParams.NewUploadPipelineParams()) + assert.Nil(t, err) + + /* ---------- Create a periodic job with start and end date in the past and catchup = true ---------- */ + experiment := &experiment_model.APIExperiment{Name: "periodic catchup true"} + periodicCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + job := jobInThePastForTwoMinutes(jobOptions{ + pipelineId: pipeline.ID, + experimentId: periodicCatchupTrueExperiment.ID, + periodic: true, + }) + job.Name = "periodic-catchup-true-" + job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule." + job.NoCatchup = false // This is the key difference. + createJobRequest := &jobparams.CreateJobParams{Body: job} + _, err = s.jobClient.Create(createJobRequest) + assert.Nil(t, err) + + /* -------- Create another periodic job with start and end date in the past but catchup = false ------ */ + experiment = &experiment_model.APIExperiment{Name: "periodic catchup false"} + periodicCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + job = jobInThePastForTwoMinutes(jobOptions{ + pipelineId: pipeline.ID, + experimentId: periodicCatchupFalseExperiment.ID, + periodic: true, + }) + job.Name = "periodic-catchup-false-" + job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule." + job.NoCatchup = true // This is the key difference. + createJobRequest = &jobparams.CreateJobParams{Body: job} + _, err = s.jobClient.Create(createJobRequest) + assert.Nil(t, err) + + /* ---------- Create a cron job with start and end date in the past and catchup = true ---------- */ + experiment = &experiment_model.APIExperiment{Name: "cron catchup true"} + cronCatchupTrueExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + job = jobInThePastForTwoMinutes(jobOptions{ + pipelineId: pipeline.ID, + experimentId: cronCatchupTrueExperiment.ID, + periodic: false, + }) + job.Name = "cron-catchup-true-" + job.Description = "A job with NoCatchup=false will backfill each past interval when behind schedule." + job.NoCatchup = false // This is the key difference. + createJobRequest = &jobparams.CreateJobParams{Body: job} + _, err = s.jobClient.Create(createJobRequest) + assert.Nil(t, err) + + /* -------- Create another cron job with start and end date in the past but catchup = false ------ */ + experiment = &experiment_model.APIExperiment{Name: "cron catchup false"} + cronCatchupFalseExperiment, err := s.experimentClient.Create(&experimentparams.CreateExperimentParams{Body: experiment}) + assert.Nil(t, err) + + job = jobInThePastForTwoMinutes(jobOptions{ + pipelineId: pipeline.ID, + experimentId: cronCatchupFalseExperiment.ID, + periodic: false, + }) + job.Name = "cron-catchup-false-" + job.Description = "A job with NoCatchup=true only schedules the last interval when behind schedule." + job.NoCatchup = true // This is the key difference. + createJobRequest = &jobparams.CreateJobParams{Body: job} + _, err = s.jobClient.Create(createJobRequest) + assert.Nil(t, err) + + // The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent. + // This could take a few seconds to finish. + // TODO: Retry list run every 5 seconds instead of sleeping for 40 seconds. + time.Sleep(40 * time.Second) + + /* ---------- Assert number of runs when catchup = true ---------- */ + _, runsWhenCatchupTrue, _, err := s.runClient.List(&runParams.ListRunsParams{ + ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)), + ResourceReferenceKeyID: util.StringPointer(periodicCatchupTrueExperiment.ID)}) + assert.Nil(t, err) + assert.Equal(t, 2, runsWhenCatchupTrue) + _, runsWhenCatchupTrue, _, err = s.runClient.List(&runParams.ListRunsParams{ + ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)), + ResourceReferenceKeyID: util.StringPointer(cronCatchupTrueExperiment.ID)}) + + /* ---------- Assert number of runs when catchup = false ---------- */ + _, runsWhenCatchupFalse, _, err := s.runClient.List(&runParams.ListRunsParams{ + ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)), + ResourceReferenceKeyID: util.StringPointer(periodicCatchupFalseExperiment.ID)}) + assert.Nil(t, err) + assert.Equal(t, 1, runsWhenCatchupFalse) + _, runsWhenCatchupFalse, _, err = s.runClient.List(&runParams.ListRunsParams{ + ResourceReferenceKeyType: util.StringPointer(string(run_model.APIResourceTypeEXPERIMENT)), + ResourceReferenceKeyID: util.StringPointer(cronCatchupFalseExperiment.ID)}) + assert.Nil(t, err) + assert.Equal(t, 1, runsWhenCatchupFalse) +} + func (s *JobApiTestSuite) checkHelloWorldJob(t *testing.T, job *job_model.APIJob, experimentID string, experimentName string, pipelineID string) { // Check workflow manifest is not empty assert.Contains(t, job.PipelineSpec.WorkflowManifest, "whalesay") @@ -320,9 +430,65 @@ func (s *JobApiTestSuite) TearDownSuite() { } } +/** ======== the following are util functions ========= **/ + func (s *JobApiTestSuite) cleanUp() { test.DeleteAllExperiments(s.experimentClient, s.T()) test.DeleteAllPipelines(s.pipelineClient, s.T()) test.DeleteAllJobs(s.jobClient, s.T()) test.DeleteAllRuns(s.runClient, s.T()) } + +func defaultApiJob(pipelineId, experimentId string) *job_model.APIJob { + return &job_model.APIJob{ + Name: "default-pipeline-name", + Description: "This is a default pipeline", + PipelineSpec: &job_model.APIPipelineSpec{ + PipelineID: pipelineId, + }, + ResourceReferences: []*job_model.APIResourceReference{ + {Key: &job_model.APIResourceKey{Type: job_model.APIResourceTypeEXPERIMENT, ID: experimentId}, + Relationship: job_model.APIRelationshipOWNER}, + }, + MaxConcurrency: 10, + NoCatchup: false, + Trigger: &job_model.APITrigger{ + PeriodicSchedule: &job_model.APIPeriodicSchedule{ + StartTime: strfmt.NewDateTime(), + EndTime: strfmt.NewDateTime(), + IntervalSecond: 60, + }, + }, + Enabled: true, + } +} + +type jobOptions struct { + pipelineId, experimentId string + periodic bool +} + +func jobInThePastForTwoMinutes(options jobOptions) *job_model.APIJob { + startTime := strfmt.DateTime(time.Unix(10*hour, 0)) + endTime := strfmt.DateTime(time.Unix(10*hour+2*minute, 0)) + + job := defaultApiJob(options.pipelineId, options.experimentId) + if options.periodic { + job.Trigger = &job_model.APITrigger{ + PeriodicSchedule: &job_model.APIPeriodicSchedule{ + StartTime: startTime, + EndTime: endTime, + IntervalSecond: 60, // Runs every 1 minute. + }, + } + } else { + job.Trigger = &job_model.APITrigger{ + CronSchedule: &job_model.APICronSchedule{ + StartTime: startTime, + EndTime: endTime, + Cron: "0 * * * * ?", // Runs every 1 minute. + }, + } + } + return job +}