diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 63e6b1a66ee..fb88b72e745 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -117,10 +117,8 @@ class FlinkRestManager(config: FlinkConfig, modelData: ModelData, mainClassName: logger.warn(s"Trying to cancel ${processName.value} which is not present in Flink") Future.successful(()) }, - whenDuplicates = { _ => - //TODO cancel all these jobs - logger.warn(s"Trying to cancel ${processName.value} which maps to multiple jobs running in Flink") - Future.successful(()) + whenDuplicates = { overviews => + Future.sequence(overviews.filter(isNotFinished).map(o => ExternalDeploymentId(o.jid)).map(cancel)).map(_ => ()) }, whenSingle = overview => { val status = mapJobStatus(overview) diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 82c767c7144..8943b2a8919 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -198,6 +198,23 @@ class FlinkRestManagerSpec extends FunSuite with Matchers with PatientScalaFutur statuses.map(_.jid).foreach(id => history should contain(HistoryEntry("cancel", Some(id)))) } + test("cancel duplicate processes which are in non terminal state") { + val jobStatuses = List( + JobStatus.RUNNING.name(), + JobStatus.RUNNING.name(), + JobStatus.FAILED.name() + ) + statuses = jobStatuses.map(status => JobOverview(UUID.randomUUID().toString, "test", 10L, 10L, status)) + + val (manager, history) = createManagerWithHistory(statuses) + + manager.cancel(ProcessName("test"), User("test_id", "Jack")).futureValue shouldBe (()) + + history.filter(_.operation == "cancel").map(_.jobId.get) should contain theSameElementsAs + statuses.filter(_.state == JobStatus.RUNNING.name()).map(_.jid) + } + + test("allow cancel but do not sent cancel request if process is failed") { statuses = List(JobOverview("2343", "p1", 10L, 10L, JobStatus.FAILED.name())) val (manager, history) = createManagerWithHistory(statuses, acceptCancel = false)