diff --git a/src/scheduler.js b/src/scheduler.js index 49eab6b..8ed44b4 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -135,8 +135,13 @@ async function stopJob (job, reason) { if (job.state >= jobStates.STARTING && job.state <= jobStates.STOPPING) { await job.setState(jobStates.STOPPING, reason) await pitRunner.stopPit(job.id) - } else { + } else if (job.state == jobStates.PREPARING) { await job.setState(jobStates.STOPPING, reason) + stopPreparation(job.id) + } else if (job.state == jobStates.WAITING) { + await cleanJob(job, reason) + } else { + await job.setState(jobStates.DONE, reason) } } exports.stopJob = stopJob @@ -171,23 +176,7 @@ async function cleanJob (job, reason) { async function tick () { log.debug('Tick...') - for(let jobId of Object.keys(preparations)) { - let job = await Job.findByPk(jobId) - if (job) { - if (job.state == jobStates.PREPARING) { - if (job.since.getTime() + config.maxPrepDuration < Date.now()) { - await job.setState(jobStates.STOPPING, 'Exceeded max preparation time') - stopPreparation(jobId) - log.error('Preparation timeout for job', jobId) - } - } else if (job.state == jobStates.STOPPING) { - stopPreparation(job.id) - } - } else { - stopPreparation(jobId) - log.error('Stopped orphan preparation process for job', jobId) - } - } + for(let job of (await Job.findAll({ where: { state: jobStates.NEW } }))) { if (Object.keys(preparations).length < config.maxParallelPrep) { log.debug('Preparing job', job.id) @@ -196,6 +185,22 @@ async function tick () { break } } + + let isPreparing = {} + for(let job of (await Job.findAll({ where: { state: jobStates.PREPARING } }))) { + if (job.since.getTime() + config.maxPrepDuration < Date.now()) { + await stopJob(job, 'Exceeded max preparation time') + } else { + isPreparing[job.id] = true + } + } + for(let jobId of Object.keys(preparations)) { + if (!isPreparing[jobId]) { + stopPreparation(jobId) + log.error('Stopped orphan preparation process for job', jobId) + } + } + let job = await Job.findOne({ where: { state: jobStates.WAITING }, order: ['rank'] }) if (job) { log.debug('Trying to allocate job', job.id)