Skip to content

Commit

Permalink
Allows creation of jobs without experiments
Browse files Browse the repository at this point in the history
Such jobs will be placed within the Default experiment
  • Loading branch information
rileyjbauer committed Jul 30, 2019
1 parent 609c7a6 commit 7c0352a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 23 deletions.
30 changes: 21 additions & 9 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,13 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
}

// Add a reference to the default experiment if run does not already have a containing experiment
err = r.setExperimentIfNotPresent(apiRun)
ref, err := r.getDefaultExperimentIfNoExperiment(apiRun.ResourceReferences)
if err != nil {
return nil, err
}
if ref != nil {
apiRun.ResourceReferences = append(apiRun.ResourceReferences, ref)
}

// Store run metadata into database
runDetail, err := ToModelRunDetail(apiRun, util.NewWorkflow(newWorkflow), string(workflowSpecManifestBytes))
Expand Down Expand Up @@ -365,6 +368,16 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a scheduled workflow for (%s)", scheduledWorkflow.Name)
}

// Add a reference to the default experiment if run does not already have a containing experiment
ref, err := r.getDefaultExperimentIfNoExperiment(apiJob.ResourceReferences)
if err != nil {
return nil, err
}
if ref != nil {
apiJob.ResourceReferences = append(apiJob.ResourceReferences, ref)
}

job, err := ToModelJob(apiJob, util.NewScheduledWorkflow(newScheduledWorkflow), string(workflowSpecManifestBytes))
if err != nil {
return nil, util.Wrap(err, "Create job failed")
Expand Down Expand Up @@ -550,26 +563,26 @@ func (r *ResourceManager) CreateDefaultExperiment() (string, error) {
return experiment.UUID, nil
}

// setExperimentIfNotPresent If the provided run does not include a reference to a containing
// getDefaultExperimentIfNoExperiment If the provided run does not include a reference to a containing
// experiment, then we fetch the default experiment's ID and create a reference to that.
func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error {
func (r *ResourceManager) getDefaultExperimentIfNoExperiment(references []*api.ResourceReference) (*api.ResourceReference, error) {
// First check if there is already a referenced experiment
for _, ref := range apiRun.ResourceReferences {
for _, ref := range references {
if ref.Key.Type == api.ResourceType_EXPERIMENT && ref.Relationship == api.Relationship_OWNER {
return nil
return nil, nil
}
}

// Create reference to the default experiment
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
return util.NewInternalServerError(err, "Failed to retrieve default experiment")
return nil, util.NewInternalServerError(err, "Failed to retrieve default experiment")
}
if defaultExperimentId == "" {
glog.Info("No default experiment was found. Creating a new default experiment")
defaultExperimentId, err = r.CreateDefaultExperiment()
if defaultExperimentId == "" || err != nil {
return util.NewInternalServerError(err, "Failed to create new default experiment")
return nil, util.NewInternalServerError(err, "Failed to create new default experiment")
}
}
defaultExperimentRef := &api.ResourceReference{
Expand All @@ -579,9 +592,8 @@ func (r *ResourceManager) setExperimentIfNotPresent(apiRun *api.Run) error {
},
Relationship: api.Relationship_OWNER,
}
apiRun.ResourceReferences = append(apiRun.ResourceReferences, defaultExperimentRef)

return nil
return defaultExperimentRef, nil
}

func (r *ResourceManager) ReportMetric(metric *api.RunMetric, runUUID string) error {
Expand Down
73 changes: 63 additions & 10 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -605,9 +605,10 @@ func TestCreateJob_ThroughWorkflowSpec(t *testing.T) {
}

func TestCreateJob_ThroughPipelineID(t *testing.T) {
store, _, pipeline := initWithPipeline(t)
store, manager, pipeline := initWithPipeline(t)
defer store.Close()
manager := NewResourceManager(store)
experiment := &model.Experiment{Name: "e1"}
experiment, err := manager.CreateExperiment(experiment)
job := &api.Job{
Name: "j1",
Enabled: true,
Expand All @@ -617,6 +618,12 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) {
{Name: "param1", Value: "world"},
},
},
ResourceReferences: []*api.ResourceReference{
{
Key: &api.ResourceKey{Type: api.ResourceType_EXPERIMENT, Id: experiment.UUID},
Relationship: api.Relationship_OWNER,
},
},
}
newJob, err := manager.CreateJob(job)
expectedJob := &model.Job{
Expand All @@ -625,14 +632,23 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) {
Name: "j1",
Namespace: "default",
Enabled: true,
CreatedAtInSec: 2,
UpdatedAtInSec: 2,
CreatedAtInSec: 3,
UpdatedAtInSec: 3,
Conditions: "NO_STATUS",
PipelineSpec: model.PipelineSpec{
PipelineId: pipeline.UUID,
WorkflowSpecManifest: testWorkflow.ToStringForStore(),
Parameters: "[{\"name\":\"param1\",\"value\":\"world\"}]",
},
ResourceReferences: []*model.ResourceReference{
{
ResourceUUID: "123",
ResourceType: common.Job,
ReferenceUUID: experiment.UUID,
ReferenceType: common.Experiment,
Relationship: common.Owner,
},
},
}
assert.Nil(t, err)
assert.Equal(t, expectedJob, newJob)
Expand Down Expand Up @@ -904,14 +920,15 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_Success(t *testing.T
assert.Equal(t, expectedRunDetail, runDetail)
}

func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_ExperimentNotFound(t *testing.T) {
func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)
job := &api.Job{
Name: "j1",
Enabled: true,
PipelineSpec: &api.PipelineSpec{WorkflowManifest: testWorkflow.ToStringForStore()},
// no experiment reference
}
newJob, err := manager.CreateJob(job)

Expand All @@ -930,11 +947,47 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_ExperimentNotFound(t
CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()),
},
})

err = manager.ReportWorkflowResource(workflow)
println(err.Error())
assert.NotNil(t, err)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
assert.Contains(t, err.Error(), "Experiment not found")
assert.Nil(t, err)

runDetail, err := manager.GetRun("WORKFLOW_1")
assert.Nil(t, err)

expectedRunDetail := &model.RunDetail{
Run: model.Run{
UUID: "WORKFLOW_1",
DisplayName: "MY_NAME",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
Name: "MY_NAME",
Namespace: "MY_NAMESPACE",
CreatedAtInSec: 11,
ScheduledAtInSec: 0,
FinishedAtInSec: 0,
PipelineSpec: model.PipelineSpec{
WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(),
},
ResourceReferences: []*model.ResourceReference{
{
ResourceUUID: "WORKFLOW_1",
ResourceType: common.Run,
ReferenceUUID: newJob.UUID,
ReferenceType: common.Job,
Relationship: common.Creator,
},
{
ResourceUUID: "WORKFLOW_1",
ResourceType: common.Run,
ReferenceUUID: DefaultFakeUUID,
ReferenceType: common.Experiment,
Relationship: common.Owner,
},
},
},
PipelineRuntime: model.PipelineRuntime{WorkflowRuntimeManifest: workflow.ToStringForStore()},
}

assert.Equal(t, expectedRunDetail, runDetail)
}

func TestReportScheduledWorkflowResource_Success(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions backend/src/apiserver/server/job_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ func (s *JobServer) DeleteJob(ctx context.Context, request *api.DeleteJobRequest

func (s *JobServer) validateCreateJobRequest(request *api.CreateJobRequest) error {
job := request.Job
// Job must be created under an experiment.
if err := ValidateExperimentResourceReference(s.resourceManager, job.ResourceReferences); err != nil {
return util.Wrap(err, "The job must have a valid experiment resource reference.")
}

if err := ValidatePipelineSpec(s.resourceManager, job.PipelineSpec); err != nil {
return util.Wrap(err, "The pipeline spec is invalid.")
Expand Down

0 comments on commit 7c0352a

Please sign in to comment.