Skip to content

Commit

Permalink
cancel duplicate non terminal processes on Flink
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomsky committed Mar 24, 2021
1 parent b51dc52 commit 64dc8a4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName:
logger.warn(s"Trying to cancel ${processName.value} which is not present in Flink")
Future.successful(())
},
whenDuplicates = { _ =>
//TODO cancel all these jobs
logger.warn(s"Trying to cancel ${processName.value} which maps to multiple jobs running in Flink")
Future.successful(())
whenDuplicates = { overviews =>
Future.sequence(overviews.filter(isNotFinished).map(o => ExternalDeploymentId(o.jid)).map(cancel)).map(_ => ())
},
whenSingle = overview => {
val status = mapJobStatus(overview)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,23 @@ class FlinkRestManagerSpec extends FunSuite with Matchers with PatientScalaFutur
statuses.map(_.jid).foreach(id => history should contain(HistoryEntry("cancel", Some(id))))
}

test("cancel duplicate processes which are in non terminal state") {
val jobStatuses = List(
JobStatus.RUNNING.name(),
JobStatus.RUNNING.name(),
JobStatus.FAILED.name()
)
statuses = jobStatuses.map(status => JobOverview(UUID.randomUUID().toString, "test", 10L, 10L, status))

val (manager, history) = createManagerWithHistory(statuses)

manager.cancel(ProcessName("test"), User("test_id", "Jack")).futureValue shouldBe (())

history.filter(_.operation == "cancel").map(_.jobId.get) should contain theSameElementsAs
statuses.filter(_.state == JobStatus.RUNNING.name()).map(_.jid)
}


test("allow cancel but do not sent cancel request if process is failed") {
statuses = List(JobOverview("2343", "p1", 10L, 10L, JobStatus.FAILED.name()))
val (manager, history) = createManagerWithHistory(statuses, acceptCancel = false)
Expand Down

0 comments on commit 64dc8a4

Please sign in to comment.