Skip to content

Commit

Permalink
Fix #149 - Fixing state machine to avoid STO zombies
Browse files Browse the repository at this point in the history
  • Loading branch information
tilmankamp committed Apr 10, 2019
1 parent ddfb643 commit 4ef7f5f
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions src/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,13 @@ async function stopJob (job, reason) {
if (job.state >= jobStates.STARTING && job.state <= jobStates.STOPPING) {
await job.setState(jobStates.STOPPING, reason)
await pitRunner.stopPit(job.id)
} else {
} else if (job.state == jobStates.PREPARING) {
await job.setState(jobStates.STOPPING, reason)
stopPreparation(job.id)
} else if (job.state == jobStates.WAITING) {
await cleanJob(job, reason)
} else {
await job.setState(jobStates.DONE, reason)
}
}
exports.stopJob = stopJob
Expand Down Expand Up @@ -171,23 +176,7 @@ async function cleanJob (job, reason) {

async function tick () {
log.debug('Tick...')
for(let jobId of Object.keys(preparations)) {
let job = await Job.findByPk(jobId)
if (job) {
if (job.state == jobStates.PREPARING) {
if (job.since.getTime() + config.maxPrepDuration < Date.now()) {
await job.setState(jobStates.STOPPING, 'Exceeded max preparation time')
stopPreparation(jobId)
log.error('Preparation timeout for job', jobId)
}
} else if (job.state == jobStates.STOPPING) {
stopPreparation(job.id)
}
} else {
stopPreparation(jobId)
log.error('Stopped orphan preparation process for job', jobId)
}
}

for(let job of (await Job.findAll({ where: { state: jobStates.NEW } }))) {
if (Object.keys(preparations).length < config.maxParallelPrep) {
log.debug('Preparing job', job.id)
Expand All @@ -196,6 +185,22 @@ async function tick () {
break
}
}

let isPreparing = {}
for(let job of (await Job.findAll({ where: { state: jobStates.PREPARING } }))) {
if (job.since.getTime() + config.maxPrepDuration < Date.now()) {
await stopJob(job, 'Exceeded max preparation time')
} else {
isPreparing[job.id] = true
}
}
for(let jobId of Object.keys(preparations)) {
if (!isPreparing[jobId]) {
stopPreparation(jobId)
log.error('Stopped orphan preparation process for job', jobId)
}
}

let job = await Job.findOne({ where: { state: jobStates.WAITING }, order: ['rank'] })
if (job) {
log.debug('Trying to allocate job', job.id)
Expand Down

0 comments on commit 4ef7f5f

Please sign in to comment.