diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 9d4b6a10b05..0d793b29a24 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -229,10 +229,13 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) { } // Add a reference to the default experiment if run does not already have a containing experiment - err = r.setExperimentIfNotPresent(apiRun) + ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.ResourceReferences) if err != nil { return nil, err } + if ref != nil { + apiRun.ResourceReferences = append(apiRun.ResourceReferences, ref) + } // Store run metadata into database runDetail, err := ToModelRunDetail(apiRun, util.NewWorkflow(newWorkflow), string(workflowSpecManifestBytes)) @@ -365,6 +368,16 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) { if err != nil { return nil, util.NewInternalServerError(err, "Failed to create a scheduled workflow for (%s)", scheduledWorkflow.Name) } + + // Add a reference to the default experiment if run does not already have a containing experiment + ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.ResourceReferences) + if err != nil { + return nil, err + } + if ref != nil { + apiJob.ResourceReferences = append(apiJob.ResourceReferences, ref) + } + job, err := ToModelJob(apiJob, util.NewScheduledWorkflow(newScheduledWorkflow), string(workflowSpecManifestBytes)) if err != nil { return nil, util.Wrap(err, "Create job failed") @@ -550,26 +563,26 @@ func (r *ResourceManager) CreateDefaultExperiment() (string, error) { return experiment.UUID, nil } -// setExperimentIfNotPresent If the provided run does not include a reference to a containing +// getDefaultExperimentIfNoExperiment If the provided run does not include a reference to a containing // experiment, then we fetch the default experiment's ID and create a reference to that. -func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error { +func (r *ResourceManager) getDefaultExperimentIfNoExperiment(references []*api.ResourceReference) (*api.ResourceReference, error) { // First check if there is already a referenced experiment - for _, ref := range apiRun.ResourceReferences { + for _, ref := range references { if ref.Key.Type == api.ResourceType_EXPERIMENT && ref.Relationship == api.Relationship_OWNER { - return nil + return nil, nil } } // Create reference to the default experiment defaultExperimentId, err := r.GetDefaultExperimentId() if err != nil { - return util.NewInternalServerError(err, "Failed to retrieve default experiment") + return nil, util.NewInternalServerError(err, "Failed to retrieve default experiment") } if defaultExperimentId == "" { glog.Info("No default experiment was found. Creating a new default experiment") defaultExperimentId, err = r.CreateDefaultExperiment() if defaultExperimentId == "" || err != nil { - return util.NewInternalServerError(err, "Failed to create new default experiment") + return nil, util.NewInternalServerError(err, "Failed to create new default experiment") } } defaultExperimentRef := &api.ResourceReference{ @@ -579,9 +592,8 @@ func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error { }, Relationship: api.Relationship_OWNER, } - apiRun.ResourceReferences = append(apiRun.ResourceReferences, defaultExperimentRef) - return nil + return defaultExperimentRef, nil } func (r *ResourceManager) ReportMetric(metric *api.RunMetric, runUUID string) error { diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 7ea44e72ea0..954419484ce 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -32,7 +32,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -605,9 +605,10 @@ func TestCreateJob_ThroughWorkflowSpec(t *testing.T) { } func TestCreateJob_ThroughPipelineID(t *testing.T) { - store, _, pipeline := initWithPipeline(t) + store, manager, pipeline := initWithPipeline(t) defer store.Close() - manager := NewResourceManager(store) + experiment := &model.Experiment{Name: "e1"} + experiment, err := manager.CreateExperiment(experiment) job := &api.Job{ Name: "j1", Enabled: true, @@ -617,6 +618,12 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) { {Name: "param1", Value: "world"}, }, }, + ResourceReferences: []*api.ResourceReference{ + { + Key: &api.ResourceKey{Type: api.ResourceType_EXPERIMENT, Id: experiment.UUID}, + Relationship: api.Relationship_OWNER, + }, + }, } newJob, err := manager.CreateJob(job) expectedJob := &model.Job{ @@ -625,14 +632,23 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) { Name: "j1", Namespace: "default", Enabled: true, - CreatedAtInSec: 2, - UpdatedAtInSec: 2, + CreatedAtInSec: 3, + UpdatedAtInSec: 3, Conditions: "NO_STATUS", PipelineSpec: model.PipelineSpec{ PipelineId: pipeline.UUID, WorkflowSpecManifest: testWorkflow.ToStringForStore(), Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]", }, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: "123", + ResourceType: common.Job, + ReferenceUUID: experiment.UUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, + }, } assert.Nil(t, err) assert.Equal(t, expectedJob, newJob) @@ -904,7 +920,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_Success(t *testing.T assert.Equal(t, expectedRunDetail, runDetail) } -func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_ExperimentNotFound(t *testing.T) { +func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success(t *testing.T) { store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) defer store.Close() manager := NewResourceManager(store) @@ -912,6 +928,7 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_ExperimentNotFound(t Name: "j1", Enabled: true, PipelineSpec: &api.PipelineSpec{WorkflowManifest: testWorkflow.ToStringForStore()}, + // no experiment reference } newJob, err := manager.CreateJob(job) @@ -930,11 +947,47 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_ExperimentNotFound(t CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), }, }) + err = manager.ReportWorkflowResource(workflow) - println(err.Error()) - assert.NotNil(t, err) - assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode()) - assert.Contains(t, err.Error(), "Experiment not found") + assert.Nil(t, err) + + runDetail, err := manager.GetRun("WORKFLOW_1") + assert.Nil(t, err) + + expectedRunDetail := &model.RunDetail{ + Run: model.Run{ + UUID: "WORKFLOW_1", + DisplayName: "MY_NAME", + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + Name: "MY_NAME", + Namespace: "MY_NAMESPACE", + CreatedAtInSec: 11, + ScheduledAtInSec: 0, + FinishedAtInSec: 0, + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), + }, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: "WORKFLOW_1", + ResourceType: common.Run, + ReferenceUUID: newJob.UUID, + ReferenceType: common.Job, + Relationship: common.Creator, + }, + { + ResourceUUID: "WORKFLOW_1", + ResourceType: common.Run, + ReferenceUUID: DefaultFakeUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, + }, + }, + PipelineRuntime: model.PipelineRuntime{WorkflowRuntimeManifest: workflow.ToStringForStore()}, + } + + assert.Equal(t, expectedRunDetail, runDetail) } func TestReportScheduledWorkflowResource_Success(t *testing.T) { diff --git a/backend/src/apiserver/server/job_server.go b/backend/src/apiserver/server/job_server.go index 9cd5331c143..40858ede950 100644 --- a/backend/src/apiserver/server/job_server.go +++ b/backend/src/apiserver/server/job_server.go @@ -85,10 +85,6 @@ func (s *JobServer) DeleteJob(ctx context.Context, request *api.DeleteJobRequest func (s *JobServer) validateCreateJobRequest(request *api.CreateJobRequest) error { job := request.Job - // Job must be created under an experiment. - if err := ValidateExperimentResourceReference(s.resourceManager, job.ResourceReferences); err != nil { - return util.Wrap(err, "The job must have a valid experiment resource reference.") - } if err := ValidatePipelineSpec(s.resourceManager, job.PipelineSpec); err != nil { return util.Wrap(err, "The pipeline spec is invalid.")