From dea343ab99213c79a888232c406adb12a5c8ba7e Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 14 Feb 2025 20:31:39 +0100 Subject: [PATCH] test fix --- .../periodic/PeriodicDeploymentManager.scala | 2 +- .../periodic/PeriodicProcessService.scala | 143 ++++++++++-------- ...eriodicProcessStateDefinitionManager.scala | 52 ++++++- .../api/AppApiHttpServiceBusinessSpec.scala | 25 ++- .../api/AppApiHttpServiceSecuritySpec.scala | 72 ++++----- .../ui/api/ProcessesResourcesSpec.scala | 17 ++- ...eriodicProcessServiceIntegrationTest.scala | 23 +-- .../PeriodicProcessesFetchingTest.scala | 2 +- .../flink/DeploymentManagerStub.scala | 5 +- .../flink/PeriodicDeploymentManagerTest.scala | 58 +++---- .../flink/PeriodicProcessServiceTest.scala | 22 +-- ...dicProcessStateDefinitionManagerTest.scala | 8 +- .../MockableDeploymentManagerProvider.scala | 25 ++- 13 files changed, 258 insertions(+), 196 deletions(-) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala index c3bb2a25cc3..d72ea7b049c 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala @@ -196,7 +196,7 @@ class PeriodicDeploymentManager private[periodic] ( override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - service.getStatusDetails(name).map(_.map(List(_))) + service.getMergedStatusDetails(name).map(_.map(List(_))) } override def processStateDefinitionManager: ProcessStateDefinitionManager = diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala index 585bbeb86c0..50b4bb22610 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala @@ -554,7 +554,7 @@ class PeriodicProcessService( private def now(): LocalDateTime = LocalDateTime.now(clock) - def getStatusDetails( + def getMergedStatusDetails( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[StatusDetails]] = { delegateDeploymentManager.getProcessStates(name).flatMap { statusesWithFreshness => @@ -746,10 +746,8 @@ class PeriodicProcessService( object PeriodicProcessService { - private implicit val localDateOrdering: Ordering[LocalDateTime] = Ordering.by(identity[ChronoLocalDateTime[_]]) - // TODO: some configuration? - private val MaxDeploymentsStatus = 5 + private[periodic] val MaxDeploymentsStatus = 5 private val DeploymentStatusesToReschedule = Set(PeriodicProcessDeploymentStatus.Deployed, PeriodicProcessDeploymentStatus.Failed) @@ -761,45 +759,52 @@ object PeriodicProcessService { // for each historical and active deployments. mergedStatusDetails and methods below are for purpose of presentation // of single, merged status similar to this available for streaming job. This merged status should be a straightforward derivative // of these deployments statuses so it will be easy to figure out it by user. - case class PeriodicProcessStatus( + private case class PeriodicProcessStatus( activeDeploymentsStatuses: List[PeriodicDeploymentStatus], inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus] ) extends LazyLogging { - def limitedAndSortedDeployments: List[PeriodicDeploymentStatus] = - (activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take( - MaxDeploymentsStatus - activeDeploymentsStatuses.size - )).sorted(PeriodicDeploymentStatus.ordering.reverse) - // Currently we don't present deployments - theirs statuses are available only in tooltip - because of that we have to pick // one "merged" status that will be presented to users def mergedStatusDetails: StatusDetails = { - pickMostImportantActiveDeployment + def toPeriodicProcessStatusWithMergedStatus(mergedStatus: StateStatus) = PeriodicProcessStatusWithMergedStatus( + activeDeploymentsStatuses, + inactiveDeploymentsStatuses, + mergedStatus + ) + + def createStatusDetails(mergedStatus: StateStatus, periodicDeploymentIdOpt: Option[PeriodicProcessDeploymentId]) = + StatusDetails( + status = toPeriodicProcessStatusWithMergedStatus(mergedStatus), + deploymentId = periodicDeploymentIdOpt.map(_.toString).map(DeploymentId(_)), + ) + + pickMostImportantActiveDeployment(activeDeploymentsStatuses) .map { deploymentStatus => - def createStatusDetails(status: StateStatus) = StatusDetails( - status = status, - deploymentId = Some(DeploymentId(deploymentStatus.deploymentId.toString)), - ) if (deploymentStatus.isWaitingForReschedule) { deploymentStatus.runtimeStatusOpt - .map(_.copy(status = WaitingForScheduleStatus)) - .getOrElse(createStatusDetails(WaitingForScheduleStatus)) + .map(_.copy(status = toPeriodicProcessStatusWithMergedStatus(WaitingForScheduleStatus))) + .getOrElse(createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId))) } else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.Scheduled) { - createStatusDetails(ScheduledStatus(deploymentStatus.runAt)) + createStatusDetails(ScheduledStatus(deploymentStatus.runAt), Some(deploymentStatus.deploymentId)) } else if (Set(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.FailedOnDeploy) .contains(deploymentStatus.status)) { - createStatusDetails(ProblemStateStatus.Failed) + createStatusDetails(ProblemStateStatus.Failed, Some(deploymentStatus.deploymentId)) } else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.RetryingDeploy) { - createStatusDetails(SimpleStateStatus.DuringDeploy) + createStatusDetails(SimpleStateStatus.DuringDeploy, Some(deploymentStatus.deploymentId)) } else { - deploymentStatus.runtimeStatusOpt.getOrElse { - createStatusDetails(WaitingForScheduleStatus) - } + deploymentStatus.runtimeStatusOpt + .map(runtimeDetails => + runtimeDetails.copy(status = toPeriodicProcessStatusWithMergedStatus(runtimeDetails.status)) + ) + .getOrElse { + createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId)) + } } } .getOrElse { if (inactiveDeploymentsStatuses.isEmpty) { - StatusDetails(SimpleStateStatus.NotDeployed, None) + createStatusDetails(SimpleStateStatus.NotDeployed, None) } else { val latestInactiveProcessId = inactiveDeploymentsStatuses.maxBy(_.scheduleId.processId.value).scheduleId.processId @@ -810,52 +815,64 @@ object PeriodicProcessService { if (latestDeploymentsForEachScheduleOfLatestProcessId.forall( _.status == PeriodicProcessDeploymentStatus.Finished )) { - StatusDetails(SimpleStateStatus.Finished, None) + createStatusDetails(SimpleStateStatus.Finished, None) } else { - StatusDetails(SimpleStateStatus.Canceled, None) + createStatusDetails(SimpleStateStatus.Canceled, None) } } } } - /** - * Returns latest deployment. It can be in any status (consult [[PeriodicProcessDeploymentStatus]]). - * For multiple schedules only single schedule is returned in the following order: - *
    - *
  1. If there are any deployed scenarios, then the first one is returned. Please be aware that deployment of previous - * schedule could fail.
  2. - *
  3. If there are any failed scenarios, then the last one is returned. We want to inform user, that some deployments - * failed and the scenario should be rescheduled/retried manually. - *
  4. If there are any scheduled scenarios, then the first one to be run is returned. - *
  5. If there are any finished scenarios, then the last one is returned. It should not happen because the scenario - * should be deactivated earlier. - *
- */ - def pickMostImportantActiveDeployment: Option[PeriodicDeploymentStatus] = { - val lastActiveDeploymentStatusForEachSchedule = - latestDeploymentForEachSchedule(activeDeploymentsStatuses).sorted - - def first(status: PeriodicProcessDeploymentStatus) = - lastActiveDeploymentStatusForEachSchedule.find(_.status == status) - - def last(status: PeriodicProcessDeploymentStatus) = - lastActiveDeploymentStatusForEachSchedule.reverse.find(_.status == status) - - first(PeriodicProcessDeploymentStatus.Deployed) - .orElse(last(PeriodicProcessDeploymentStatus.Failed)) - .orElse(last(PeriodicProcessDeploymentStatus.RetryingDeploy)) - .orElse(last(PeriodicProcessDeploymentStatus.FailedOnDeploy)) - .orElse(first(PeriodicProcessDeploymentStatus.Scheduled)) - .orElse(last(PeriodicProcessDeploymentStatus.Finished)) - } + } - private def latestDeploymentForEachSchedule(deploymentsStatuses: List[PeriodicDeploymentStatus]) = { - deploymentsStatuses - .groupBy(_.scheduleId) - .values - .toList - .map(_.min(PeriodicDeploymentStatus.ordering.reverse)) - } + /** + * Returns latest deployment. It can be in any status (consult [[PeriodicProcessDeploymentStatus]]). + * For multiple schedules only single schedule is returned in the following order: + *
    + *
  1. If there are any deployed scenarios, then the first one is returned. Please be aware that deployment of previous + * schedule could fail.
  2. + *
  3. If there are any failed scenarios, then the last one is returned. We want to inform user, that some deployments + * failed and the scenario should be rescheduled/retried manually. + *
  4. If there are any scheduled scenarios, then the first one to be run is returned. + *
  5. If there are any finished scenarios, then the last one is returned. It should not happen because the scenario + * should be deactivated earlier. + *
+ */ + private[periodic] def pickMostImportantActiveDeployment( + activeDeploymentsStatuses: List[PeriodicDeploymentStatus] + ): Option[PeriodicDeploymentStatus] = { + val lastActiveDeploymentStatusForEachSchedule = + latestDeploymentForEachSchedule(activeDeploymentsStatuses).sorted + + def first(status: PeriodicProcessDeploymentStatus) = + lastActiveDeploymentStatusForEachSchedule.find(_.status == status) + + def last(status: PeriodicProcessDeploymentStatus) = + lastActiveDeploymentStatusForEachSchedule.reverse.find(_.status == status) + + first(PeriodicProcessDeploymentStatus.Deployed) + .orElse(last(PeriodicProcessDeploymentStatus.Failed)) + .orElse(last(PeriodicProcessDeploymentStatus.RetryingDeploy)) + .orElse(last(PeriodicProcessDeploymentStatus.FailedOnDeploy)) + .orElse(first(PeriodicProcessDeploymentStatus.Scheduled)) + .orElse(last(PeriodicProcessDeploymentStatus.Finished)) + } + + private def latestDeploymentForEachSchedule(deploymentsStatuses: List[PeriodicDeploymentStatus]) = { + deploymentsStatuses + .groupBy(_.scheduleId) + .values + .toList + .map(_.min(PeriodicDeploymentStatus.ordering.reverse)) + } + + case class PeriodicProcessStatusWithMergedStatus( + activeDeploymentsStatuses: List[PeriodicDeploymentStatus], + inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus], + mergedStatus: StateStatus + ) extends StateStatus { + + override def name: StatusName = mergedStatus.name } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala index f999a485af0..3784a80adc4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala @@ -7,7 +7,13 @@ import pl.touk.nussknacker.engine.api.deployment.{ ScenarioActionName, StateStatus } -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{PeriodicDeploymentStatus, PeriodicProcessStatus} +import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{ + MaxDeploymentsStatus, + PeriodicDeploymentStatus, + PeriodicProcessStatusWithMergedStatus +} + +import java.net.URI class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionManager) extends OverridingProcessStateDefinitionManager( @@ -20,10 +26,39 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana delegate = delegate ) { + override def statusActions(processStatus: ProcessStateDefinitionManager.ProcessStatus): List[ScenarioActionName] = { + super.statusActions(processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus)) + } + + override def actionTooltips( + processStatus: ProcessStateDefinitionManager.ProcessStatus + ): Map[ScenarioActionName, String] = { + super.actionTooltips( + processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus) + ) + } + + override def statusIcon(stateStatus: StateStatus): URI = { + super.statusIcon(extractPeriodicStatus(stateStatus).mergedStatus) + } + + override def statusDescription(stateStatus: StateStatus): String = { + super.statusDescription(extractPeriodicStatus(stateStatus).mergedStatus) + } + override def statusTooltip(stateStatus: StateStatus): String = { + val periodicStatus = extractPeriodicStatus(stateStatus) + PeriodicProcessStateDefinitionManager.statusTooltip( + activeDeploymentsStatuses = periodicStatus.activeDeploymentsStatuses, + inactiveDeploymentsStatuses = periodicStatus.inactiveDeploymentsStatuses + ) + } + + private def extractPeriodicStatus(stateStatus: StateStatus) = { stateStatus match { - case periodic: PeriodicProcessStatus => PeriodicProcessStateDefinitionManager.statusTooltip(periodic) - case other => throw new IllegalStateException(s"Unexpected status: $other") + case periodic: PeriodicProcessStatusWithMergedStatus => + periodic + case other => throw new IllegalStateException(s"Unexpected status: $other") } } @@ -31,8 +66,15 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana object PeriodicProcessStateDefinitionManager { - def statusTooltip(processStatus: PeriodicProcessStatus): String = { - processStatus.limitedAndSortedDeployments + def statusTooltip( + activeDeploymentsStatuses: List[PeriodicDeploymentStatus], + inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus] + ): String = { + val limitedAndSortedDeployments: List[PeriodicDeploymentStatus] = + (activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take( + MaxDeploymentsStatus - activeDeploymentsStatuses.size + )).sorted(PeriodicDeploymentStatus.ordering.reverse) + limitedAndSortedDeployments .map { case d @ PeriodicDeploymentStatus(_, scheduleId, _, runAt, status, _, _) => val refinedStatus = { if (d.isCanceled) { diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceBusinessSpec.scala index d9f0245c76e..49c2e396516 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceBusinessSpec.scala @@ -8,6 +8,7 @@ import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse import org.hamcrest.Matchers._ import org.scalatest.freespec.AnyFreeSpecLike import org.scalatest.matchers.must.Matchers.be +import pl.touk.nussknacker.development.manager.BasicStatusDetails import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -42,7 +43,7 @@ class AppApiHttpServiceBusinessSpec createDeployedExampleScenario(ProcessName("id1")) MockableDeploymentManager.configureScenarioStatuses( - Map("id1" -> SimpleStateStatus.Running) + Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) } .when() @@ -70,9 +71,8 @@ class AppApiHttpServiceBusinessSpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -94,10 +94,6 @@ class AppApiHttpServiceBusinessSpec .applicationState { createDeployedCanceledExampleScenario(ProcessName("id1")) createDeployedExampleScenario(ProcessName("id2")) - - MockableDeploymentManager.configureScenarioStatuses( - Map("id2" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin")) - ) } .when() .basicAuthAllPermUser() @@ -120,8 +116,8 @@ class AppApiHttpServiceBusinessSpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> SimpleStateStatus.Running, - "id2" -> SimpleStateStatus.Running, + "id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -144,7 +140,7 @@ class AppApiHttpServiceBusinessSpec createDeployedExampleScenario(ProcessName("id1")) MockableDeploymentManager.configureScenarioStatuses( - Map("id1" -> SimpleStateStatus.Running) + Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) } .when() @@ -169,7 +165,7 @@ class AppApiHttpServiceBusinessSpec createDeployedExampleScenario(ProcessName("id1")) MockableDeploymentManager.configureScenarioStatuses( - Map("id1" -> SimpleStateStatus.Running) + Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) } .basicAuthAllPermUser() @@ -259,9 +255,8 @@ class AppApiHttpServiceBusinessSpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceSecuritySpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceSecuritySpec.scala index d59d0388878..c2a22681d0f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceSecuritySpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/AppApiHttpServiceSecuritySpec.scala @@ -6,6 +6,7 @@ import io.restassured.RestAssured._ import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse import org.hamcrest.Matchers._ import org.scalatest.freespec.AnyFreeSpecLike +import pl.touk.nussknacker.development.manager.BasicStatusDetails import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -39,8 +40,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> SimpleStateStatus.Running, - "id2" -> SimpleStateStatus.Running + "id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))) ) ) } @@ -70,9 +71,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -100,9 +100,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -124,9 +123,9 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> SimpleStateStatus.ProblemStateStatus.Failed, - "id2" -> SimpleStateStatus.ProblemStateStatus.Failed, - "id3" -> SimpleStateStatus.ProblemStateStatus.Failed + "id1" -> BasicStatusDetails(SimpleStateStatus.ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.ProblemStateStatus.Failed, None), + "id3" -> BasicStatusDetails(SimpleStateStatus.ProblemStateStatus.Failed, None) ) ) } @@ -156,8 +155,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> SimpleStateStatus.NotDeployed, - "id2" -> SimpleStateStatus.NotDeployed + "id1" -> BasicStatusDetails(SimpleStateStatus.NotDeployed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.NotDeployed, None) ) ) } @@ -185,9 +184,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -205,13 +203,6 @@ class AppApiHttpServiceSecuritySpec .applicationState { createDeployedExampleScenario(ProcessName("id1"), category = Category1) createDeployedExampleScenario(ProcessName("id2"), category = Category2) - - MockableDeploymentManager.configureScenarioStatuses( - Map( - "id1" -> SimpleStateStatus.NotDeployed, - "id2" -> SimpleStateStatus.NotDeployed - ) - ) } .when() .noAuth() @@ -319,9 +310,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -347,9 +337,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -371,9 +360,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -402,9 +390,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.FailedToGet, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -423,9 +410,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.FailedToGet, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -447,9 +433,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.FailedToGet, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } @@ -471,9 +456,8 @@ class AppApiHttpServiceSecuritySpec MockableDeploymentManager.configureScenarioStatuses( Map( - "id1" -> ProblemStateStatus.FailedToGet, - "id2" -> SimpleStateStatus.Running, - "id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"), + "id1" -> BasicStatusDetails(ProblemStateStatus.FailedToGet, None), + "id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))), ) ) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala index 02bd8ce4d3b..94ee7344b50 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ProcessesResourcesSpec.scala @@ -12,6 +12,7 @@ import org.scalatest.LoneElement._ import org.scalatest._ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.development.manager.BasicStatusDetails import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine.api.ProcessAdditionalFields import pl.touk.nussknacker.engine.api.component.ProcessingMode @@ -179,7 +180,7 @@ class ProcessesResourcesSpec test("return single process") { createDeployedExampleScenario(processName, category = Category1) MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) forScenarioReturned(processName) { process => @@ -282,7 +283,7 @@ class ProcessesResourcesSpec test("not allow to archive still running process") { createDeployedExampleScenario(processName, category = Category1) MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) archiveProcess(processName) { status => @@ -343,7 +344,7 @@ class ProcessesResourcesSpec test("should not allow to rename deployed process") { createDeployedExampleScenario(processName, category = Category1) MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) val newName = ProcessName("ProcessChangedName") @@ -369,7 +370,7 @@ class ProcessesResourcesSpec ignore("should not allow to rename process with running state") { createEmptyScenario(processName, category = Category1) MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) val newName = ProcessName("ProcessChangedName") @@ -602,8 +603,8 @@ class ProcessesResourcesSpec MockableDeploymentManager.configureScenarioStatuses( Map( - secondProcessor.value -> SimpleStateStatus.Canceled, - thirdProcessor.value -> SimpleStateStatus.Running + secondProcessor.value -> BasicStatusDetails(SimpleStateStatus.Canceled, Some(VersionId(1))), + thirdProcessor.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))) ) ) @@ -1265,7 +1266,7 @@ class ProcessesResourcesSpec test("should return status for single deployed process") { createDeployedExampleScenario(processName, category = Category1) MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) forScenarioStatus(processName) { (code, state) => @@ -1346,7 +1347,7 @@ class ProcessesResourcesSpec private def verifyProcessWithStateOnList(expectedName: ProcessName, expectedStatus: Option[StateStatus]): Unit = { MockableDeploymentManager.configureScenarioStatuses( - Map(processName.value -> SimpleStateStatus.Running) + Map(processName.value -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1)))) ) forScenariosReturned(ScenarioQuery.empty) { processes => diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessServiceIntegrationTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessServiceIntegrationTest.scala index 3fb99e944f4..881360a3aba 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -30,7 +30,7 @@ import pl.touk.nussknacker.test.base.it.WithClock import pl.touk.nussknacker.test.utils.domain.TestFactory import pl.touk.nussknacker.test.utils.domain.TestFactory.newWriteProcessRepository import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatus +import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatusWithMergedStatus import pl.touk.nussknacker.ui.process.periodic.flink.{DeploymentManagerStub, ScheduledExecutionPerformerStub} import pl.touk.nussknacker.ui.process.periodic.legacy.db.{LegacyDbInitializer, SlickLegacyPeriodicProcessesRepository} import pl.touk.nussknacker.ui.process.periodic.model._ @@ -669,14 +669,19 @@ class PeriodicProcessServiceIntegrationTest val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS) val timeToTriggerSchedule2 = startTime.plus(2, ChronoUnit.HOURS) - def mostImportantActiveDeployment = service - .getStatusDetails(processName) - .futureValue - .value - .status - .asInstanceOf[PeriodicProcessStatus] - .pickMostImportantActiveDeployment - .value + def mostImportantActiveDeployment = { + PeriodicProcessService + .pickMostImportantActiveDeployment( + service + .getMergedStatusDetails(processName) + .futureValue + .value + .status + .asInstanceOf[PeriodicProcessStatusWithMergedStatus] + .activeDeploymentsStatuses + ) + .value + } val schedule1 = "schedule1" val schedule2 = "schedule2" diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala index b296f1271ef..2ca848596e6 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala @@ -88,7 +88,7 @@ class PeriodicProcessesFetchingTest implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.CanBeCached for (i <- 1 to n) { - f.periodicProcessService.getStatusDetails(processName(i)).futureValue + f.periodicProcessService.getMergedStatusDetails(processName(i)).futureValue } getLatestDeploymentQueryCount.get() shouldEqual 2 * n diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala index c494da1c3ec..455c5118433 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala @@ -86,8 +86,9 @@ class DeploymentManagerStub extends BaseDeploymentManager { case _: DMValidateScenarioCommand => Future.successful(()) case _: DMRunDeploymentCommand => Future.successful(None) case _: DMCancelScenarioCommand => Future.successful(()) - case _: DMStopScenarioCommand | _: DMStopDeploymentCommand | _: DMCancelDeploymentCommand | - _: DMMakeScenarioSavepointCommand | _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => + case _: DMCancelDeploymentCommand => Future.successful(()) + case _: DMStopScenarioCommand | _: DMStopDeploymentCommand | _: DMMakeScenarioSavepointCommand | + _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => notImplemented } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala index 1b825bbcec2..1ddd4b360b9 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala @@ -7,20 +7,20 @@ import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Inside, OptionValues} import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} +import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{EmptyListener, ProcessConfigEnricher} +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, User} import pl.touk.nussknacker.test.PatientScalaFutures -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatus +import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatusWithMergedStatus import pl.touk.nussknacker.ui.process.periodic.PeriodicStateStatus.{ScheduledStatus, WaitingForScheduleStatus} import pl.touk.nussknacker.ui.process.periodic._ +import pl.touk.nussknacker.ui.process.periodic.cron.CronSchedulePropertyExtractor import pl.touk.nussknacker.ui.process.periodic.flink.db.InMemPeriodicProcessesRepository import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentStatus -import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{EmptyListener, ProcessConfigEnricher} -import pl.touk.nussknacker.ui.process.periodic.cron.CronSchedulePropertyExtractor import java.time.{Clock, LocalDateTime, ZoneOffset} import java.util.UUID @@ -99,19 +99,21 @@ class PeriodicDeploymentManagerTest def getMergedStatusDetails: StatusDetails = periodicProcessService - .getStatusDetails(processName) + .getMergedStatusDetails(processName) .futureValue .value - .status - .asInstanceOf[PeriodicProcessStatus] - .mergedStatusDetails } + implicit class MergedStatusDetailsOps(statusDetails: StatusDetails) { + def mergedStatus: StateStatus = + statusDetails.status.asInstanceOf[PeriodicProcessStatusWithMergedStatus].mergedStatus + } + test("getProcessState - should return not deployed for no job") { val f = new Fixture - val state = f.getMergedStatusDetails.status + val state = f.getMergedStatusDetails.mergedStatus state shouldEqual SimpleStateStatus.NotDeployed } @@ -120,7 +122,7 @@ class PeriodicDeploymentManagerTest val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled, processingType = "other") - val state = f.getMergedStatusDetails.status + val state = f.getMergedStatusDetails.mergedStatus state shouldEqual SimpleStateStatus.NotDeployed } @@ -130,7 +132,7 @@ class PeriodicDeploymentManagerTest f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe a[ScheduledStatus] + statusDetails.mergedStatus shouldBe a[ScheduledStatus] f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel, ScenarioActionName.Deploy @@ -140,7 +142,7 @@ class PeriodicDeploymentManagerTest .futureValue .value .loneElement - .status shouldBe a[ScheduledStatus] + .mergedStatus shouldBe a[ScheduledStatus] } test("getProcessState - should be scheduled when scenario scheduled and job finished on Flink") { @@ -149,7 +151,7 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe a[ScheduledStatus] + statusDetails.mergedStatus shouldBe a[ScheduledStatus] f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel, ScenarioActionName.Deploy @@ -174,7 +176,7 @@ class PeriodicDeploymentManagerTest .value .loneElement - statusDetails.status shouldBe SimpleStateStatus.Finished + statusDetails.mergedStatus shouldBe SimpleStateStatus.Finished val state = f.periodicDeploymentManager.processStateDefinitionManager.processState( statusDetails, processVersion.versionId, @@ -190,7 +192,7 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Running, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe SimpleStateStatus.Running + statusDetails.mergedStatus shouldBe SimpleStateStatus.Running f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel ) @@ -202,7 +204,7 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe WaitingForScheduleStatus + statusDetails.mergedStatus shouldBe WaitingForScheduleStatus f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel ) @@ -213,7 +215,7 @@ class PeriodicDeploymentManagerTest f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Failed) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe ProblemStateStatus.Failed + statusDetails.mergedStatus shouldBe ProblemStateStatus.Failed f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel ) @@ -300,7 +302,7 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails - statusDetails.status shouldBe ProblemStateStatus.Failed + statusDetails.mergedStatus shouldBe ProblemStateStatus.Failed f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel ) @@ -311,7 +313,7 @@ class PeriodicDeploymentManagerTest val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) val statusDetailsBeforeRedeploy = f.getMergedStatusDetails - statusDetailsBeforeRedeploy.status shouldBe ProblemStateStatus.Failed + statusDetailsBeforeRedeploy.mergedStatus shouldBe ProblemStateStatus.Failed f.getAllowedActions( statusDetailsBeforeRedeploy, processVersion.versionId, @@ -339,7 +341,7 @@ class PeriodicDeploymentManagerTest ) val statusDetailsAfterRedeploy = f.getMergedStatusDetails // Previous job is still visible as Failed. - statusDetailsAfterRedeploy.status shouldBe a[ScheduledStatus] + statusDetailsAfterRedeploy.mergedStatus shouldBe a[ScheduledStatus] f.getAllowedActions( statusDetailsAfterRedeploy, processVersion.versionId, @@ -435,7 +437,7 @@ class PeriodicDeploymentManagerTest // this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue - f.getMergedStatusDetails.status shouldEqual ProblemStateStatus.Failed + f.getMergedStatusDetails.mergedStatus shouldEqual ProblemStateStatus.Failed f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed f.repository.processEntities.loneElement.active shouldBe true @@ -444,7 +446,7 @@ class PeriodicDeploymentManagerTest f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.getMergedStatusDetails.status shouldEqual SimpleStateStatus.Canceled + f.getMergedStatusDetails.mergedStatus shouldEqual SimpleStateStatus.Canceled } test("should reschedule failed job after RescheduleActor handles finished when configured") { @@ -455,7 +457,7 @@ class PeriodicDeploymentManagerTest // this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue - f.getMergedStatusDetails.status shouldBe a[ScheduledStatus] + f.getMergedStatusDetails.mergedStatus shouldBe a[ScheduledStatus] f.repository.deploymentEntities.map(_.status) shouldBe List( PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.Scheduled @@ -472,7 +474,7 @@ class PeriodicDeploymentManagerTest f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.getMergedStatusDetails.status shouldEqual SimpleStateStatus.Canceled + f.getMergedStatusDetails.mergedStatus shouldEqual SimpleStateStatus.Canceled } test("should cancel failed scenario after disappeared from Flink console") { @@ -486,7 +488,7 @@ class PeriodicDeploymentManagerTest // after some time Flink stops returning job status f.delegateDeploymentManagerStub.jobStatus.clear() - f.getMergedStatusDetails.status shouldEqual ProblemStateStatus.Failed + f.getMergedStatusDetails.mergedStatus shouldEqual ProblemStateStatus.Failed f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed f.repository.processEntities.loneElement.active shouldBe true @@ -494,7 +496,7 @@ class PeriodicDeploymentManagerTest f.repository.processEntities.loneElement.active shouldBe false f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed - f.getMergedStatusDetails.status shouldBe SimpleStateStatus.Canceled + f.getMergedStatusDetails.mergedStatus shouldBe SimpleStateStatus.Canceled } test("should take into account only latest deployments of active schedules during merged status computation") { @@ -508,7 +510,7 @@ class PeriodicDeploymentManagerTest firstDeploymentRunAt.plusHours(1) ) - f.getMergedStatusDetails.status shouldBe WaitingForScheduleStatus + f.getMergedStatusDetails.mergedStatus shouldBe WaitingForScheduleStatus } test( @@ -523,7 +525,7 @@ class PeriodicDeploymentManagerTest f.repository.addOnlyDeployment(secProcessId, PeriodicProcessDeploymentStatus.Finished) f.repository.markInactive(secProcessId) - f.getMergedStatusDetails.status shouldBe SimpleStateStatus.Finished + f.getMergedStatusDetails.mergedStatus shouldBe SimpleStateStatus.Finished } } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessServiceTest.scala index 9a8e39abdf8..1e8b348405e 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessServiceTest.scala @@ -9,16 +9,15 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.scheduler.model.ScheduledDeploymentDetails -import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ -import pl.touk.nussknacker.engine.api.deployment.scheduler.services.AdditionalDeploymentDataProvider import pl.touk.nussknacker.engine.api.deployment.scheduler.services.ProcessConfigEnricher.EnrichedProcessConfig +import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, ProcessActionId, ProcessingTypeActionServiceStub} import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.test.PatientScalaFutures -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatus +import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatusWithMergedStatus import pl.touk.nussknacker.ui.process.periodic._ import pl.touk.nussknacker.ui.process.periodic.flink.db.InMemPeriodicProcessesRepository import pl.touk.nussknacker.ui.process.periodic.flink.db.InMemPeriodicProcessesRepository.createPeriodicProcessDeployment @@ -514,13 +513,16 @@ class PeriodicProcessServiceTest val activeSchedules = f.periodicProcessService.getLatestDeploymentsForActiveSchedules(processName).futureValue activeSchedules should have size (schedules.size) - val deployment = f.periodicProcessService - .getStatusDetails(processName) - .futureValue - .value - .status - .asInstanceOf[PeriodicProcessStatus] - .pickMostImportantActiveDeployment + val deployment = PeriodicProcessService + .pickMostImportantActiveDeployment( + f.periodicProcessService + .getMergedStatusDetails(processName) + .futureValue + .value + .status + .asInstanceOf[PeriodicProcessStatusWithMergedStatus] + .activeDeploymentsStatuses + ) .value deployment.status shouldBe expectedStatus diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala index 49f51d166d1..600b9c5c284 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.P import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.VersionId -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{PeriodicDeploymentStatus, PeriodicProcessStatus} +import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicDeploymentStatus import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessStateDefinitionManager.statusTooltip import pl.touk.nussknacker.ui.process.periodic.PeriodicStateStatus import pl.touk.nussknacker.ui.process.periodic.PeriodicStateStatus.ScheduledStatus @@ -39,8 +39,7 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher processActive = true, None ) - val status = PeriodicProcessStatus(List(deploymentStatus), List.empty) - statusTooltip(status) shouldEqual "Scheduled at: 2023-01-01 10:00 status: Scheduled" + statusTooltip(List(deploymentStatus), List.empty) shouldEqual "Scheduled at: 2023-01-01 10:00 status: Scheduled" } test("display sorted periodic deployment status for named schedules") { @@ -64,8 +63,7 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher processActive = true, None ) - val status = PeriodicProcessStatus(List(firstDeploymentStatus, secDeploymentStatus), List.empty) - statusTooltip(status) shouldEqual + statusTooltip(List(firstDeploymentStatus, secDeploymentStatus), List.empty) shouldEqual s"""Schedule ${secScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Scheduled, |Schedule ${firstScheduleId.scheduleName.display} scheduled at: 2023-01-01 10:00 status: Deployed""".stripMargin } diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index 53b44e1dce1..a686795e216 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -8,10 +8,11 @@ import io.circe.Json import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager import pl.touk.nussknacker.engine._ +import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner @@ -86,8 +87,20 @@ object MockableDeploymentManagerProvider { override def getProcessStates(name: ProcessName)( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - val status = MockableDeploymentManager.scenarioStatuses.get().getOrElse(name.value, SimpleStateStatus.NotDeployed) - Future.successful(WithDataFreshnessStatus.fresh(List(StatusDetails(status, None)))) + val statusDetails = MockableDeploymentManager.scenarioStatuses + .get() + .getOrElse(name.value, BasicStatusDetails(SimpleStateStatus.NotDeployed, version = None)) + Future.successful( + WithDataFreshnessStatus.fresh( + List( + StatusDetails( + statusDetails.status, + None, + version = statusDetails.version.map(vId => ProcessVersion.empty.copy(versionId = vId)) + ) + ) + ) + ) } override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = { @@ -138,12 +151,12 @@ object MockableDeploymentManagerProvider { // improved, but there is no need to do it ATM. object MockableDeploymentManager { - private val scenarioStatuses = new AtomicReference[Map[ScenarioName, StateStatus]](Map.empty) + private val scenarioStatuses = new AtomicReference[Map[ScenarioName, BasicStatusDetails]](Map.empty) private val testResults = new AtomicReference[Map[ScenarioName, TestResults[Json]]](Map.empty) private val deploymentResults = new AtomicReference[Map[DeploymentId, Try[Option[ExternalDeploymentId]]]](Map.empty) private val managerSpecificScenarioActivities = new AtomicReference[List[ScenarioActivity]](List.empty) - def configureScenarioStatuses(scenarioStates: Map[ScenarioName, StateStatus]): Unit = { + def configureScenarioStatuses(scenarioStates: Map[ScenarioName, BasicStatusDetails]): Unit = { MockableDeploymentManager.scenarioStatuses.set(scenarioStates) } @@ -169,3 +182,5 @@ object MockableDeploymentManagerProvider { } } + +case class BasicStatusDetails(status: StateStatus, version: Option[VersionId])