Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process should be scheduled if previous, failed job is still accessible via Flink API #1913

Merged
merged 1 commit into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,38 @@ class PeriodicProcessManager(val delegate: ProcessManager,
delegate.test(name, json, testData, variableEncoder)

override def findJobStatus(name: ProcessName): Future[Option[ProcessState]] = {
def createScheduledProcessState(processDeployment: PeriodicProcessDeployment): ProcessState = {
ProcessState(
Some(ExternalDeploymentId("future")),
status = ScheduledStatus(processDeployment.runAt),
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
//TODO: this date should be passed/handled through attributes
startTime = Option(processDeployment.runAt.toEpochSecond(ZoneOffset.UTC)),
attributes = Option.empty,
errors = List.empty
)
}

def createFailedProcessState(processDeployment: PeriodicProcessDeployment): ProcessState = {
ProcessState(
Some(ExternalDeploymentId("future")),
status = SimpleStateStatus.Failed,
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
startTime = Option.empty,
attributes = Option.empty,
errors = List.empty
)
}

def handleFailed(original: Option[ProcessState]): Future[Option[ProcessState]] = {
service.getLatestDeployment(name).map {
// this method returns only active schedules, so 'None' means this process has been already canceled
case None => original.map(_.copy(status = SimpleStateStatus.Canceled))
// Previous, failed job is still accessible via Flink API but process has been scheduled to run again in future.
case Some(processDeployment) if processDeployment.state.status == PeriodicProcessDeploymentStatus.Scheduled =>
Some(createScheduledProcessState(processDeployment))
case _ => original
}
}
Expand All @@ -128,25 +156,8 @@ class PeriodicProcessManager(val delegate: ProcessManager,
service.getLatestDeployment(name).map { maybeProcessDeployment =>
maybeProcessDeployment.map { processDeployment =>
processDeployment.state.status match {
case PeriodicProcessDeploymentStatus.Scheduled => Some(ProcessState(
Some(ExternalDeploymentId("future")),
status = ScheduledStatus(processDeployment.runAt),
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
//TODO: this date should be passed/handled through attributes
startTime = Option(processDeployment.runAt.toEpochSecond(ZoneOffset.UTC)),
attributes = Option.empty,
errors = List.empty
))
case PeriodicProcessDeploymentStatus.Failed => Some(ProcessState(
Some(ExternalDeploymentId("future")),
status = SimpleStateStatus.Failed,
version = Option(processDeployment.periodicProcess.processVersion),
definitionManager = processStateDefinitionManager,
startTime = Option.empty,
attributes = Option.empty,
errors = List.empty
))
case PeriodicProcessDeploymentStatus.Scheduled => Some(createScheduledProcessState(processDeployment))
case PeriodicProcessDeploymentStatus.Failed => Some(createFailedProcessState(processDeployment))
case PeriodicProcessDeploymentStatus.Deployed | PeriodicProcessDeploymentStatus.Finished =>
original.map(o => o.copy(status = WaitingForScheduleStatus))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,18 @@ class PeriodicProcessManagerTest extends FunSuite
val f = new Fixture
f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed)
f.delegateProcessManagerStub.setStateStatus(FlinkStateStatus.Failed)
f.getAllowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it
val failedProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value
failedProcessState.status shouldBe FlinkStateStatus.Failed
failedProcessState.allowedActions shouldBe List(ProcessActionType.Cancel) // redeploy is blocked in GUI but API allows it

f.periodicProcessManager.deploy(processVersion, DeploymentData.empty, PeriodicProcessGen(), None).futureValue

f.repository.processEntities.map(_.active) shouldBe List(false, true)
f.repository.deploymentEntities.map(_.status) shouldBe List(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.Scheduled)
val scheduledProcessState = f.periodicProcessManager.findJobStatus(processName).futureValue.value
// Previous job is still visible as Failed.
scheduledProcessState.status shouldBe a[ScheduledStatus]
scheduledProcessState.allowedActions shouldBe List(ProcessActionType.Cancel, ProcessActionType.Deploy)
}

test("should redeploy scheduled process") {
Expand Down