Skip to content

Commit

Permalink
Process should be scheduled if previous, failed job is still accessib…
Browse files Browse the repository at this point in the history
…le via Flink API (#1913)
  • Loading branch information
jedrz authored Jul 16, 2021
1 parent eb26044 commit ede6db9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,38 @@ class PeriodicProcessManager(val delegate: ProcessManager,
delegate.test(name, json, testData, variableEncoder)

override def findJobStatus(name: ProcessName): Future[Option[ProcessState]] = {
def createScheduledProcessState(processDeployment: PeriodicProcessDeployment): ProcessState = {
ProcessState(
Some(ExternalDeploymentId("future")),
status = ScheduledStatus(processDeployment.runAt),
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
//TODO: this date should be passed/handled through attributes
startTime = Option(processDeployment.runAt.toEpochSecond(ZoneOffset.UTC)),
attributes = Option.empty,
errors = List.empty
)
}

def createFailedProcessState(processDeployment: PeriodicProcessDeployment): ProcessState = {
ProcessState(
Some(ExternalDeploymentId("future")),
status = SimpleStateStatus.Failed,
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
startTime = Option.empty,
attributes = Option.empty,
errors = List.empty
)
}

def handleFailed(original: Option[ProcessState]): Future[Option[ProcessState]] = {
service.getLatestDeployment(name).map {
// this method returns only active schedules, so 'None' means this process has been already canceled
case None => original.map(_.copy(status = SimpleStateStatus.Canceled))
// Previous, failed job is still accessible via Flink API but process has been scheduled to run again in future.
case Some(processDeployment) if processDeployment.state.status == PeriodicProcessDeploymentStatus.Scheduled =>
Some(createScheduledProcessState(processDeployment))
case _ => original
}
}
Expand All @@ -128,25 +156,8 @@ class PeriodicProcessManager(val delegate: ProcessManager,
service.getLatestDeployment(name).map { maybeProcessDeployment =>
maybeProcessDeployment.map { processDeployment =>
processDeployment.state.status match {
case PeriodicProcessDeploymentStatus.Scheduled => Some(ProcessState(
Some(ExternalDeploymentId("future")),
status = ScheduledStatus(processDeployment.runAt),
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
//TODO: this date should be passed/handled through attributes
startTime = Option(processDeployment.runAt.toEpochSecond(ZoneOffset.UTC)),
attributes = Option.empty,
errors = List.empty
))
case PeriodicProcessDeploymentStatus.Failed => Some(ProcessState(
Some(ExternalDeploymentId("future")),
status = SimpleStateStatus.Failed,
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
startTime = Option.empty,
attributes = Option.empty,
errors = List.empty
))
case PeriodicProcessDeploymentStatus.Scheduled => Some(createScheduledProcessState(processDeployment))
case PeriodicProcessDeploymentStatus.Failed => Some(createFailedProcessState(processDeployment))
case PeriodicProcessDeploymentStatus.Deployed | PeriodicProcessDeploymentStatus.Finished =>
original.map(o => o.copy(status = WaitingForScheduleStatus))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,18 @@ class PeriodicProcessManagerTest extends FunSuite
val f = new Fixture
f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed)
f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed)
f.getAllowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it
val failedProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value
failedProcessState.status shouldBe FlinkStateStatus.Failed
failedProcessState.allowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it

f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue

f.repository.processEntities.map(_.active) shouldBe List(false, true)
f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.Scheduled)
val scheduledProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value
// Previous job is still visible as Failed.
scheduledProcessState.status shouldBe a[ScheduledStatus]
scheduledProcessState.allowedActions shouldBe List(ProcessActionType.Cancel, ProcessActionType.Deploy)
}

test("should redeploy scheduled process") {
Expand Down

0 comments on commit ede6db9

Please sign in to comment.