From b7ef2d5e15fdeb74a69b62dbe9989dbb06b43c8f Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 1 Nov 2018 00:02:26 -0500 Subject: [PATCH 1/2] batch jobs GC removes terminal allocs if job modifyindex is older than running job --- nomad/core_sched.go | 32 ++++++--- nomad/core_sched_test.go | 152 +++++++++++++++++++++++++++++++++++---- 2 files changed, 163 insertions(+), 21 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 5fecb56f23c..c5614b721dd 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -286,6 +286,14 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return false, nil, err } + // Get the allocations by eval + allocs, err := c.snap.AllocsByEval(ws, eval.ID) + if err != nil { + c.logger.Error("failed to get allocs for eval", + "eval_id", eval.ID, "error", err) + return false, nil, err + } + // If the eval is from a running "batch" job we don't want to garbage // collect its allocations. If there is a long running batch job and its // terminal allocations get GC'd the scheduler would re-run the @@ -311,18 +319,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, // We don't want to gc anything related to a job which is not dead // If the batch job doesn't exist we can GC it regardless of allowBatch if !collect { - return false, nil, nil + // Find allocs associated with older (based on modifyindex) and GC them if terminal + oldAllocs := olderVersionTerminalAllocs(allocs, job) + return false, oldAllocs, nil } } - // Get the allocations by eval - allocs, err := c.snap.AllocsByEval(ws, eval.ID) - if err != nil { - c.logger.Error("failed to get allocs for eval", - "eval_id", eval.ID, "error", err) - return false, nil, err - } - // Scan the allocations to ensure they are terminal and old gcEval := true var gcAllocIDs []string @@ -340,6 +342,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return gcEval, gcAllocIDs, nil } +// olderVersionTerminalAllocs returns terminal allocations whose job modify index +// is older than the job's modify index +func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string { + var ret []string + for _, alloc := range allocs { + if alloc.Job != nil && alloc.Job.JobModifyIndex < job.JobModifyIndex && alloc.TerminalStatus() { + ret = append(ret, alloc.ID) + } + } + return ret +} + // evalReap contacts the leader and issues a reap on the passed evals and // allocs. func (c *CoreScheduler) evalReap(evals, allocs []string) error { diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index cb65dc0b8eb..81c2784e597 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -141,6 +141,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) { // Insert failed alloc with an old reschedule attempt, can be GCed alloc := mock.Alloc() + alloc.Job = job alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusFailed @@ -158,6 +159,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) { } alloc2 := mock.Alloc() + alloc2.Job = job alloc2.EvalID = eval.ID alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.ClientStatus = structs.AllocClientStatusFailed @@ -315,12 +317,14 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { // Insert "failed" alloc alloc := mock.Alloc() + alloc.Job = job alloc.JobID = job.ID alloc.EvalID = eval.ID alloc.DesiredStatus = structs.AllocDesiredStatusStop // Insert "lost" alloc alloc2 := mock.Alloc() + alloc2.Job = job alloc2.JobID = job.ID alloc2.EvalID = eval.ID alloc2.DesiredStatus = structs.AllocDesiredStatusRun @@ -384,6 +388,128 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { } } +// An EvalGC should reap allocations from jobs with an older modify index +func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { + t.Parallel() + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + // Insert a "dead" job + state := s1.fsm.State() + job := mock.Job() + job.Type = structs.JobTypeBatch + job.Status = structs.JobStatusDead + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "complete" eval + eval := mock.Eval() + eval.Status = structs.EvalStatusComplete + eval.Type = structs.JobTypeBatch + eval.JobID = job.ID + err = state.UpsertEvals(1001, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "failed" alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusStop + + // Insert "lost" alloc + alloc2 := mock.Alloc() + alloc2.Job = job + alloc2.JobID = job.ID + alloc2.EvalID = eval.ID + alloc2.DesiredStatus = structs.AllocDesiredStatusRun + alloc2.ClientStatus = structs.AllocClientStatusLost + + // Insert alloc with older job modifyindex + alloc3 := mock.Alloc() + job2 := job.Copy() + + alloc3.Job = job2 + alloc3.JobID = job2.ID + alloc3.EvalID = eval.ID + job2.JobModifyIndex = 500 + alloc3.DesiredStatus = structs.AllocDesiredStatusRun + alloc3.ClientStatus = structs.AllocClientStatusLost + + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2, alloc3}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Alloc1 and 2 should be there, and alloc3 should be gone + ws := memdb.NewWatchSet() + out, err := state.EvalByID(ws, eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA == nil { + t.Fatalf("bad: %v", outA) + } + + outA2, err := state.AllocByID(ws, alloc2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA2 == nil { + t.Fatalf("bad: %v", outA2) + } + + outA3, err := state.AllocByID(ws, alloc3.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA3 != nil { + t.Fatalf("expected alloc to be nil:%v", outA2) + } + + outB, err := state.JobByID(ws, job.Namespace, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outB == nil { + t.Fatalf("bad: %v", outB) + } +} + // An EvalGC should reap a batch job that has been stopped func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) { t.Parallel() @@ -1798,18 +1924,20 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) { // Tests various scenarios when allocations are eligible to be GCed func TestAllocation_GCEligible(t *testing.T) { type testCase struct { - Desc string - GCTime time.Time - ClientStatus string - DesiredStatus string - JobStatus string - JobStop bool - ModifyIndex uint64 - NextAllocID string - ReschedulePolicy *structs.ReschedulePolicy - RescheduleTrackers []*structs.RescheduleEvent - ThresholdIndex uint64 - ShouldGC bool + Desc string + GCTime time.Time + ClientStatus string + DesiredStatus string + JobStatus string + JobStop bool + AllocJobModifyIndex uint64 + JobModifyIndex uint64 + ModifyIndex uint64 + NextAllocID string + ReschedulePolicy *structs.ReschedulePolicy + RescheduleTrackers []*structs.RescheduleEvent + ThresholdIndex uint64 + ShouldGC bool } fail := time.Now() From 2409f72d0d42a3225d59347d2604f005bc132636 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Fri, 9 Nov 2018 11:44:21 -0600 Subject: [PATCH 2/2] Use create index as trigger condition to gc old terminal allocs --- nomad/core_sched.go | 8 ++++---- nomad/core_sched_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index c5614b721dd..a3fbe4010da 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -319,7 +319,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, // We don't want to gc anything related to a job which is not dead // If the batch job doesn't exist we can GC it regardless of allowBatch if !collect { - // Find allocs associated with older (based on modifyindex) and GC them if terminal + // Find allocs associated with older (based on createindex) and GC them if terminal oldAllocs := olderVersionTerminalAllocs(allocs, job) return false, oldAllocs, nil } @@ -342,12 +342,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return gcEval, gcAllocIDs, nil } -// olderVersionTerminalAllocs returns terminal allocations whose job modify index -// is older than the job's modify index +// olderVersionTerminalAllocs returns terminal allocations whose job create index +// is older than the job's create index func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string { var ret []string for _, alloc := range allocs { - if alloc.Job != nil && alloc.Job.JobModifyIndex < job.JobModifyIndex && alloc.TerminalStatus() { + if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() { ret = append(ret, alloc.ID) } } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 81c2784e597..02ea29ea367 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -440,7 +440,7 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { alloc3.Job = job2 alloc3.JobID = job2.ID alloc3.EvalID = eval.ID - job2.JobModifyIndex = 500 + job2.CreateIndex = 500 alloc3.DesiredStatus = structs.AllocDesiredStatusRun alloc3.ClientStatus = structs.AllocClientStatusLost