From 064fa831c4c4af9879555ee082050890837259f5 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 17 Feb 2025 15:40:32 +0100 Subject: [PATCH] separate presentation from business logic --- .../api/deployment/DeploymentManager.scala | 2 +- ...rridingProcessStateDefinitionManager.scala | 40 +-- .../ProcessStateDefinitionManager.scala | 91 ++++--- .../engine/api/deployment/StatusDetails.scala | 18 ++ ...CachingProcessStateDeploymentManager.scala | 12 +- .../SimpleProcessStateDefinitionManager.scala | 27 +- .../deployment/simple/SimpleStateStatus.scala | 4 +- .../testing/DeploymentManagerStub.scala | 6 +- ...ingProcessStateDefinitionManagerTest.scala | 14 +- ...cala => SimpleScenarioStatusDtoSpec.scala} | 25 +- ...ingProcessStateDeploymentManagerSpec.scala | 23 +- .../scenariodetails/ScenarioStatusDto.scala | 48 +--- .../scenariodetails/ScenarioWithDetails.scala | 4 +- .../StateStatusCodingSpec.scala | 8 +- .../ui/api/AppApiHttpService.scala | 6 +- .../ui/process/deployment/ActionService.scala | 9 +- .../deployment/ScenarioStateProvider.scala | 234 ++++++++---------- .../exception/ProcessIllegalAction.scala | 5 +- .../periodic/PeriodicDeploymentManager.scala | 6 +- .../periodic/PeriodicProcessService.scala | 18 +- ...eriodicProcessStateDefinitionManager.scala | 58 +++-- .../periodic/PeriodicStateStatus.scala | 61 +++-- .../InvalidDeploymentManagerStub.scala | 2 +- .../test/mock/MockDeploymentManager.scala | 6 +- .../test/mock/StubScenarioStateProvider.scala | 10 +- .../NotificationServiceTest.scala | 14 +- .../ProcessStateDefinitionServiceSpec.scala | 6 +- .../deployment/DeploymentServiceSpec.scala | 3 +- .../flink/DeploymentManagerStub.scala | 6 +- .../flink/PeriodicDeploymentManagerTest.scala | 26 +- ...dicProcessStateDefinitionManagerTest.scala | 16 +- ...DevelopmentDeploymentManagerProvider.scala | 31 +-- ...lopmentProcessStateDefinitionManager.scala | 10 +- .../MockableDeploymentManagerProvider.scala | 4 +- .../BaseFlinkDeploymentManagerSpec.scala | 10 +- .../JavaConfigDeploymentManagerSpec.scala | 2 +- .../streaming/StreamingDockerTest.scala | 4 +- .../management/FlinkDeploymentManager.scala | 16 +- .../engine/management/FlinkStateStatus.scala | 8 +- .../FlinkDeploymentManagerSpec.scala | 16 +- ...scala => FlinkScenarioStatusDtoSpec.scala} | 22 +- .../embedded/EmbeddedDeploymentManager.scala | 6 +- ...mbeddedProcessStateDefinitionManager.scala | 4 +- ...esponseEmbeddedDeploymentManagerTest.scala | 6 +- ...EmbeddedDeploymentManagerRestartTest.scala | 6 +- ...reamingEmbeddedDeploymentManagerTest.scala | 20 +- .../k8s/manager/K8sDeploymentManager.scala | 8 +- .../manager/K8sDeploymentStatusMapper.scala | 18 +- .../BaseK8sDeploymentManagerTest.scala | 4 +- .../K8sDeploymentManagerKafkaTest.scala | 4 +- .../K8sDeploymentManagerOnMocksTest.scala | 4 +- .../K8sDeploymentManagerReqRespTest.scala | 2 +- .../engine/api/deployment/StateStatus.scala | 20 ++ 53 files changed, 534 insertions(+), 499 deletions(-) create mode 100644 designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala rename designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/{SimpleProcessStateSpec.scala => SimpleScenarioStatusDtoSpec.scala} (55%) rename extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala => designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioStatusDto.scala (67%) rename designer/{deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment => restmodel/src/test/scala/pl/touk/nussknacker/restmodel/scenariodetails}/StateStatusCodingSpec.scala (86%) rename engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/{FlinkProcessStateSpec.scala => FlinkScenarioStatusDtoSpec.scala} (64%) create mode 100644 extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index 2274ae5aa88..ac96afec600 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -23,7 +23,7 @@ trait DeploymentManager extends AutoCloseable { * from the cache or not. If you use any kind of cache in your DM implementation please wrap result data * with WithDataFreshnessStatus.cached(data) in opposite situation use WithDataFreshnessStatus.fresh(data) */ - def getProcessStates(name: ProcessName)( + def getScenarioDeploymentsStatuses(scenarioName: ProcessName)( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[List[StatusDetails]]] diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala index c07966c9d30..9142eb5779e 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManager.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.api.deployment -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import java.net.URI @@ -21,32 +21,42 @@ import java.net.URI */ class OverridingProcessStateDefinitionManager( delegate: ProcessStateDefinitionManager, - statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = PartialFunction.empty, + statusActionsPF: PartialFunction[ScenarioStatusWithScenarioContext, List[ScenarioActionName]] = + PartialFunction.empty, statusIconsPF: PartialFunction[StateStatus, URI] = PartialFunction.empty, statusTooltipsPF: PartialFunction[StateStatus, String] = PartialFunction.empty, statusDescriptionsPF: PartialFunction[StateStatus, String] = PartialFunction.empty, customStateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty, customVisibleActions: Option[List[ScenarioActionName]] = None, - customActionTooltips: Option[ProcessStatus => Map[ScenarioActionName, String]] = None, + customActionTooltips: Option[ScenarioStatusWithScenarioContext => Map[ScenarioActionName, String]] = None, ) extends ProcessStateDefinitionManager { - override def visibleActions: List[ScenarioActionName] = - customVisibleActions.getOrElse(delegate.visibleActions) + override def visibleActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = + customVisibleActions.getOrElse(delegate.visibleActions(input)) - override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = - statusActionsPF.applyOrElse(processStatus, delegate.statusActions) + override def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = + statusActionsPF.applyOrElse(input, delegate.statusActions) - override def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String] = - customActionTooltips.map(_(processStatus)).getOrElse(delegate.actionTooltips(processStatus)) + override def actionTooltips(input: ScenarioStatusWithScenarioContext): Map[ScenarioActionName, String] = + customActionTooltips.map(_(input)).getOrElse(delegate.actionTooltips(input)) - override def statusIcon(stateStatus: StateStatus): URI = - statusIconsPF.orElse(stateDefinitionsPF(_.icon)).applyOrElse(stateStatus, delegate.statusIcon) + override def statusIcon(input: ScenarioStatusWithScenarioContext): URI = + statusIconsPF + .orElse(stateDefinitionsPF(_.icon)) + .lift(input.status) + .getOrElse(delegate.statusIcon(input)) - override def statusTooltip(stateStatus: StateStatus): String = - statusTooltipsPF.orElse(stateDefinitionsPF(_.tooltip)).applyOrElse(stateStatus, delegate.statusTooltip) + override def statusTooltip(input: ScenarioStatusWithScenarioContext): String = + statusTooltipsPF + .orElse(stateDefinitionsPF(_.tooltip)) + .lift(input.status) + .getOrElse(delegate.statusTooltip(input)) - override def statusDescription(stateStatus: StateStatus): String = - statusDescriptionsPF.orElse(stateDefinitionsPF(_.description)).applyOrElse(stateStatus, delegate.statusDescription) + override def statusDescription(input: ScenarioStatusWithScenarioContext): String = + statusDescriptionsPF + .orElse(stateDefinitionsPF(_.description)) + .lift(input.status) + .getOrElse(delegate.statusDescription(input)) override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = delegate.stateDefinitions ++ customStateDefinitions diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala index 709a34615f6..e09924bcc9d 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala @@ -1,12 +1,19 @@ package pl.touk.nussknacker.engine.api.deployment -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ProcessStatus, defaultVisibleActions} +import io.circe.Json +import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ + ScenarioStatusPresentationDetails, + ScenarioStatusWithScenarioContext, + defaultVisibleActions +} import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.process.VersionId +import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import java.net.URI -//@TODO: In future clean up it. +// TODO: Some cleanups such as rename to sth close to presentation /** * Used to specify status definitions (for filtering and scenario status visualization) and status transitions (actions). */ @@ -29,54 +36,44 @@ trait ProcessStateDefinitionManager { * Override those methods to customize varying state properties or custom visualizations, * e.g. handle schedule date in [[PeriodicProcessStateDefinitionManager]] */ - def statusTooltip(stateStatus: StateStatus): String = - stateDefinitions(stateStatus.name).tooltip + def statusTooltip(input: ScenarioStatusWithScenarioContext): String = + stateDefinitions(input.status.name).tooltip + + def statusDescription(input: ScenarioStatusWithScenarioContext): String = + stateDefinitions(input.status.name).description - def statusDescription(stateStatus: StateStatus): String = - stateDefinitions(stateStatus.name).description + def statusIcon(input: ScenarioStatusWithScenarioContext): URI = + statusIcon(input.status) - def statusIcon(stateStatus: StateStatus): URI = - stateDefinitions(stateStatus.name).icon + private[nussknacker] def statusIcon(status: StateStatus): URI = + stateDefinitions(status.name).icon /** * Actions that are applicable to scenario in general. They may be available only in particular states, as defined by `def statusActions` */ - def visibleActions: List[ScenarioActionName] = defaultVisibleActions + def visibleActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = defaultVisibleActions /** * Custom tooltips for actions */ - def actionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String] = Map.empty + def actionTooltips(input: ScenarioStatusWithScenarioContext): Map[ScenarioActionName, String] = Map.empty /** * Allowed transitions between states. */ - def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] + def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] /** - * Enhances raw [[StateStatus]] with scenario properties, including deployment info. + * Returns presentations details of status */ - // FIXME abr: extract other class without most of fields from ProcessState - def processState( - statusDetails: StatusDetails, - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): ProcessState = { - val status = ProcessStatus(statusDetails.status, latestVersionId, deployedVersionId, currentlyPresentedVersionId) - ProcessState( - statusDetails.externalDeploymentId, - statusDetails.status, - statusDetails.version, - visibleActions, - statusActions(status), - actionTooltips(status), - statusIcon(statusDetails.status), - statusTooltip(statusDetails.status), - statusDescription(statusDetails.status), - statusDetails.startTime, - statusDetails.attributes, - statusDetails.errors + def statusPresentation(input: ScenarioStatusWithScenarioContext): ScenarioStatusPresentationDetails = { + ScenarioStatusPresentationDetails( + visibleActions(input), + statusActions(input), + actionTooltips(input), + statusIcon(input), + statusTooltip(input), + statusDescription(input), ) } @@ -87,15 +84,37 @@ object ProcessStateDefinitionManager { /** * ProcessStatus contains status of the scenario, it is used as argument of ProcessStateDefinitionManager methods * - * @param stateStatus current scenario state + * @param statusDetails current scenario state * @param latestVersionId latest saved versionId for the scenario * @param deployedVersionId currently deployed versionId of the scenario */ - final case class ProcessStatus( - stateStatus: StateStatus, + final case class ScenarioStatusWithScenarioContext( + private val statusDetails: StatusDetails, latestVersionId: VersionId, deployedVersionId: Option[VersionId], currentlyPresentedVersionId: Option[VersionId], + ) { + def status: StateStatus = statusDetails.status + def deploymentId: Option[DeploymentId] = statusDetails.deploymentId + def externalDeploymentId: Option[ExternalDeploymentId] = statusDetails.externalDeploymentId + def version: Option[ProcessVersion] = statusDetails.version + def startTime: Option[Long] = statusDetails.startTime + def attributes: Option[Json] = statusDetails.attributes + def errors: List[String] = statusDetails.errors + + def withStatus(newStatus: StateStatus): ScenarioStatusWithScenarioContext = + copy(statusDetails = statusDetails.copy(status = newStatus)) + + } + + final case class ScenarioStatusPresentationDetails( + visibleActions: List[ScenarioActionName], + // This one is not exactly a part of presentation but for now we keep in this class + allowedActions: List[ScenarioActionName], + actionTooltips: Map[ScenarioActionName, String], + icon: URI, + tooltip: String, + description: String ) /** diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala new file mode 100644 index 00000000000..98e58eecb2f --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala @@ -0,0 +1,18 @@ +package pl.touk.nussknacker.engine.api.deployment + +import io.circe.Json +import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} + +case class StatusDetails( + status: StateStatus, + deploymentId: Option[DeploymentId], + externalDeploymentId: Option[ExternalDeploymentId] = None, + version: Option[ProcessVersion] = None, + startTime: Option[Long] = None, + attributes: Option[Json] = None, + errors: List[String] = List.empty +) { + def externalDeploymentIdUnsafe: ExternalDeploymentId = + externalDeploymentId.getOrElse(throw new IllegalStateException(s"externalDeploymentId is missing")) +} diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index 9b520ee6f3d..bd656476e57 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -24,12 +24,12 @@ class CachingProcessStateDeploymentManager( .expireAfterWrite(java.time.Duration.ofMillis(cacheTTL.toMillis)) .buildAsync[ProcessName, List[StatusDetails]] - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { def fetchAndUpdateCache(): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - val resultFuture = delegate.getProcessStates(name) - cache.put(name, resultFuture.map(_.value).toJava.toCompletableFuture) + val resultFuture = delegate.getScenarioDeploymentsStatuses(scenarioName) + cache.put(scenarioName, resultFuture.map(_.value).toJava.toCompletableFuture) resultFuture } @@ -37,7 +37,7 @@ class CachingProcessStateDeploymentManager( case DataFreshnessPolicy.Fresh => fetchAndUpdateCache() case DataFreshnessPolicy.CanBeCached => - Option(cache.getIfPresent(name)) + Option(cache.getIfPresent(scenarioName)) .map(_.toScala.map(WithDataFreshnessStatus.cached)) .getOrElse( fetchAndUpdateCache() @@ -72,7 +72,7 @@ object CachingProcessStateDeploymentManager extends LazyLogging { ) } .getOrElse { - logger.debug(s"Skipping ProcessState caching for DeploymentManager: $delegate") + logger.debug(s"Skipping state caching for DeploymentManager: $delegate") delegate } } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala index 9baefe26122..d3e9edae46a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleProcessStateDefinitionManager.scala @@ -1,19 +1,15 @@ package pl.touk.nussknacker.engine.api.deployment.simple -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.DefaultActions import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.{ProblemStateStatus, statusActionsPF} import pl.touk.nussknacker.engine.api.deployment.{ - ProcessState, ProcessStateDefinitionManager, ScenarioActionName, StateDefinitionDetails, - StateStatus, - StatusDetails + StateStatus } -import pl.touk.nussknacker.engine.api.process.VersionId /** * Base [[ProcessStateDefinitionManager]] with basic state definitions and state transitions. @@ -22,23 +18,24 @@ import pl.touk.nussknacker.engine.api.process.VersionId */ object SimpleProcessStateDefinitionManager extends ProcessStateDefinitionManager { - override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = - statusActionsPF.applyOrElse(processStatus, (_: ProcessStatus) => DefaultActions) + override def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = + statusActionsPF.lift(input.status).getOrElse(DefaultActions) - override def statusDescription(stateStatus: StateStatus): String = stateStatus match { + override def statusDescription(input: ScenarioStatusWithScenarioContext): String = statusDescription(input.status) + + private[nussknacker] def statusDescription(status: StateStatus): String = status match { case _ @ProblemStateStatus(message, _) => message - case _ => SimpleStateStatus.definitions(stateStatus.name).description + case _ => SimpleStateStatus.definitions(status.name).description } - override def statusTooltip(stateStatus: StateStatus): String = stateStatus match { + override def statusTooltip(input: ScenarioStatusWithScenarioContext): String = statusTooltip(input.status) + + private[nussknacker] def statusTooltip(status: StateStatus): String = status match { case _ @ProblemStateStatus(message, _) => message - case _ => SimpleStateStatus.definitions(stateStatus.name).tooltip + case _ => SimpleStateStatus.definitions(status.name).tooltip } override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = SimpleStateStatus.definitions - def errorFailedToGet(versionId: VersionId): ProcessState = - processState(StatusDetails(FailedToGet, None), versionId, None, None) - } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala index 2e2c59f67dd..3e98490cb7d 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.engine.api.deployment.simple -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus.defaultActions @@ -8,6 +7,7 @@ import pl.touk.nussknacker.engine.api.process.VersionId import java.net.URI +// FIXME abr separate core statuses and DM statuses - the same for presentation object SimpleStateStatus { def fromDeploymentStatus(deploymentStatus: DeploymentStatus): StateStatus = { @@ -92,7 +92,7 @@ object SimpleStateStatus { status ) - val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { + val statusActionsPF: PartialFunction[StateStatus, List[ScenarioActionName]] = { case SimpleStateStatus.NotDeployed => List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) case SimpleStateStatus.DuringDeploy => List(ScenarioActionName.Deploy, ScenarioActionName.Cancel) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 8a463ad5bf6..26f943830a8 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -40,11 +40,11 @@ class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymen notImplemented } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { Future.successful( - WithDataFreshnessStatus.fresh(scenarioStatusMap.get(name).map(StatusDetails(_, None)).toList) + WithDataFreshnessStatus.fresh(scenarioStatusMap.get(scenarioName).map(StatusDetails(_, None)).toList) ) } diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala index 79f411db975..b6a0eb2fc2b 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/OverridingProcessStateDefinitionManagerTest.scala @@ -2,9 +2,10 @@ package pl.touk.nussknacker.engine.api.deployment import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName +import pl.touk.nussknacker.engine.api.process.VersionId class OverridingProcessStateDefinitionManagerTest extends AnyFunSuite with Matchers { @@ -28,7 +29,7 @@ class OverridingProcessStateDefinitionManagerTest extends AnyFunSuite with Match ) ) - override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = Nil + override def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = Nil } test("should combine delegate state definitions with custom overrides") { @@ -58,10 +59,13 @@ class OverridingProcessStateDefinitionManagerTest extends AnyFunSuite with Match definitionsMap(CustomState.name).description shouldBe "Custom description" definitionsMap(CustomStateThatOverrides.name).description shouldBe "Custom description that overrides" + def toInput(status: StateStatus) = + ScenarioStatusWithScenarioContext(StatusDetails(status, None), VersionId(1), None, None) + // Description assigned to a scenario, with custom calculations - manager.statusDescription(DefaultState) shouldBe "Calculated description for default, e.g. schedule date" - manager.statusDescription(CustomState) shouldBe "Calculated description for custom, e.g. schedule date" - manager.statusDescription(CustomStateThatOverrides) shouldBe "Custom description that overrides" + manager.statusDescription(toInput(DefaultState)) shouldBe "Calculated description for default, e.g. schedule date" + manager.statusDescription(toInput(CustomState)) shouldBe "Calculated description for custom, e.g. schedule date" + manager.statusDescription(toInput(CustomStateThatOverrides)) shouldBe "Custom description that overrides" } } diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleScenarioStatusDtoSpec.scala similarity index 55% rename from designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala rename to designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleScenarioStatusDtoSpec.scala index d7f0767090b..5fc501f6969 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleProcessStateSpec.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/SimpleScenarioStatusDtoSpec.scala @@ -3,34 +3,35 @@ package pl.touk.nussknacker.engine.api.deployment import org.scalatest.Inside import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ScenarioStatusPresentationDetails, ScenarioStatusWithScenarioContext} import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId -import scala.collection.immutable.List +class SimpleScenarioStatusDtoSpec extends AnyFunSpec with Matchers with Inside { -class SimpleProcessStateSpec extends AnyFunSpec with Matchers with Inside { - - def createProcessState(stateStatus: StateStatus): ProcessState = - SimpleProcessStateDefinitionManager.processState( - StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12"))), - VersionId(1), - None, - None, + def createInput(status: StateStatus): ScenarioStatusPresentationDetails = + SimpleProcessStateDefinitionManager.statusPresentation( + ScenarioStatusWithScenarioContext( + StatusDetails(status, None, Some(ExternalDeploymentId("12"))), + VersionId(1), + None, + None, + ) ) it("scenario state should be during deploy") { - val state = createProcessState(SimpleStateStatus.DuringDeploy) + val state = createInput(SimpleStateStatus.DuringDeploy) state.allowedActions shouldBe List(ScenarioActionName.Deploy, ScenarioActionName.Cancel) } it("scenario state should be running") { - val state = createProcessState(SimpleStateStatus.Running) + val state = createInput(SimpleStateStatus.Running) state.allowedActions shouldBe List(ScenarioActionName.Cancel, ScenarioActionName.Pause, ScenarioActionName.Deploy) } it("scenario state should be finished") { - val state = createProcessState(SimpleStateStatus.Finished) + val state = createInput(SimpleStateStatus.Finished) state.allowedActions shouldBe List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) } diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala index c24c358ff9e..f2467b1c280 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala @@ -41,7 +41,7 @@ class CachingProcessStateDeploymentManagerSpec results.map(_.cached) should contain only false results.map(_.value).distinct should have size 2 - verify(delegate, times(2)).getProcessStates(any[ProcessName])(any[DataFreshnessPolicy]) + verify(delegate, times(2)).getScenarioDeploymentsStatuses(any[ProcessName])(any[DataFreshnessPolicy]) } test("should cache state for DataFreshnessPolicy.CanBeCached") { @@ -60,7 +60,7 @@ class CachingProcessStateDeploymentManagerSpec secondInvocation.cached shouldBe true List(firstInvocation, secondInvocation).map(_.value).distinct should have size 1 - verify(delegate, times(1)).getProcessStates(any[ProcessName])(any[DataFreshnessPolicy]) + verify(delegate, times(1)).getScenarioDeploymentsStatuses(any[ProcessName])(any[DataFreshnessPolicy]) } test("should reuse state updated by DataFreshnessPolicy.Fresh during reading with DataFreshnessPolicy.CanBeCached") { @@ -79,13 +79,13 @@ class CachingProcessStateDeploymentManagerSpec resultForCanBeCached.cached shouldBe true List(resultForFresh, resultForCanBeCached).map(_.value).distinct should have size 1 - verify(delegate, times(1)).getProcessStates(any[ProcessName])(any[DataFreshnessPolicy]) + verify(delegate, times(1)).getScenarioDeploymentsStatuses(any[ProcessName])(any[DataFreshnessPolicy]) } implicit class DeploymentManagerOps(dm: DeploymentManager) { def getProcessStatesDeploymentIdNow(freshnessPolicy: DataFreshnessPolicy): WithDataFreshnessStatus[List[String]] = - dm.getProcessStates(ProcessName("foo"))(freshnessPolicy) + dm.getScenarioDeploymentsStatuses(ProcessName("foo"))(freshnessPolicy) .futureValue .map(_.map(_.externalDeploymentId.value.value)) @@ -93,13 +93,14 @@ class CachingProcessStateDeploymentManagerSpec private def prepareDMReturningRandomStates: DeploymentManager = { val delegate = mock[DeploymentManager] - when(delegate.getProcessStates(any[ProcessName])(any[DataFreshnessPolicy])).thenAnswer { _: InvocationOnMock => - val randomState = StatusDetails( - SimpleStateStatus.Running, - deploymentId = None, - externalDeploymentId = Some(ExternalDeploymentId(UUID.randomUUID().toString)) - ) - Future.successful(WithDataFreshnessStatus.fresh(List(randomState))) + when(delegate.getScenarioDeploymentsStatuses(any[ProcessName])(any[DataFreshnessPolicy])).thenAnswer { + _: InvocationOnMock => + val randomState = StatusDetails( + SimpleStateStatus.Running, + deploymentId = None, + externalDeploymentId = Some(ExternalDeploymentId(UUID.randomUUID().toString)) + ) + Future.successful(WithDataFreshnessStatus.fresh(List(randomState))) } delegate } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioStatusDto.scala similarity index 67% rename from extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala rename to designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioStatusDto.scala index d9bbec4b1b8..45f06d4b3f0 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessState.scala +++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioStatusDto.scala @@ -1,11 +1,15 @@ -package pl.touk.nussknacker.engine.api.deployment +package pl.touk.nussknacker.restmodel.scenariodetails import io.circe._ import io.circe.generic.JsonCodec import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName -import pl.touk.nussknacker.engine.api.process.VersionId -import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} +import pl.touk.nussknacker.engine.api.deployment.{ + NoAttributesStateStatus, + ScenarioActionName, + ScenarioVersionId, + StateStatus +} +import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import java.net.URI @@ -18,12 +22,12 @@ import java.net.URI * - which actions are allowed: allowedActions * - additional properties: attributes, errors * - * Statuses definition, allowed actions and current scenario ProcessState is defined by [[ProcessStateDefinitionManager]]. + * Statuses definition, allowed actions and current scenario presentation is defined by [[ProcessStateDefinitionManager]]. * @param description Short message displayed in top right panel of scenario diagram panel. * @param tooltip Message displayed when mouse is hoovering over an icon (both scenarios and diagram panel). * May contain longer, detailed status description. */ -@JsonCodec case class ProcessState( +@JsonCodec case class ScenarioStatusDto( externalDeploymentId: Option[ExternalDeploymentId], status: StateStatus, version: Option[ProcessVersion], @@ -38,7 +42,7 @@ import java.net.URI errors: List[String] ) -object ProcessState { +object ScenarioStatusDto { implicit val uriEncoder: Encoder[URI] = Encoder.encodeString.contramap(_.toString) implicit val uriDecoder: Decoder[URI] = Decoder.decodeString.map(URI.create) implicit val scenarioVersionIdEncoder: Encoder[ScenarioVersionId] = Encoder.encodeLong.contramap(_.value) @@ -54,11 +58,6 @@ object ProcessState { implicit val scenarioActionNameKeyEncoder: KeyEncoder[ScenarioActionName] = (name: ScenarioActionName) => ScenarioActionName.serialize(name) -} - -object StateStatus { - type StatusName = String - // StateStatus has to have Decoder defined because it is decoded along with ProcessState in the migration process // (see StandardRemoteEnvironment class). // In all cases (this one and for FE purposes) only info about the status name is essential. We could encode status @@ -69,29 +68,4 @@ object StateStatus { implicit val statusDecoder: Decoder[StateStatus] = Decoder.decodeString.at("name").map(NoAttributesStateStatus) - // Temporary methods to simplify status creation - def apply(statusName: StatusName): StateStatus = NoAttributesStateStatus(statusName) - -} - -trait StateStatus { - // Status identifier, should be unique among all states registered within all processing types. - def name: StatusName -} - -case class NoAttributesStateStatus(name: StatusName) extends StateStatus { - override def toString: String = name -} - -case class StatusDetails( - status: StateStatus, - deploymentId: Option[DeploymentId], - externalDeploymentId: Option[ExternalDeploymentId] = None, - version: Option[ProcessVersion] = None, - startTime: Option[Long] = None, - attributes: Option[Json] = None, - errors: List[String] = List.empty -) { - def externalDeploymentIdUnsafe: ExternalDeploymentId = - externalDeploymentId.getOrElse(throw new IllegalStateException(s"externalDeploymentId is missing")) } diff --git a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioWithDetails.scala b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioWithDetails.scala index d884e8feac9..45ab3ebbc5e 100644 --- a/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioWithDetails.scala +++ b/designer/restmodel/src/main/scala/pl/touk/nussknacker/restmodel/scenariodetails/ScenarioWithDetails.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.restmodel.scenariodetails import io.circe.{Decoder, Encoder} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.component.ProcessingMode -import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessState} +import pl.touk.nussknacker.engine.api.deployment.ProcessAction import pl.touk.nussknacker.engine.api.graph.ScenarioGraph import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.deployment.EngineSetupName @@ -40,7 +40,7 @@ final case class ScenarioWithDetails( override val validationResult: Option[ValidationResult], override val history: Option[List[ScenarioVersion]], override val modelVersion: Option[Int], - state: Option[ProcessState] + state: Option[ScenarioStatusDto] ) extends BaseScenarioWithDetailsForMigrations { def parameters: ScenarioParameters = ScenarioParameters(processingMode, processCategory, engineSetupName) diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/StateStatusCodingSpec.scala b/designer/restmodel/src/test/scala/pl/touk/nussknacker/restmodel/scenariodetails/StateStatusCodingSpec.scala similarity index 86% rename from designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/StateStatusCodingSpec.scala rename to designer/restmodel/src/test/scala/pl/touk/nussknacker/restmodel/scenariodetails/StateStatusCodingSpec.scala index 61707541eaf..83699f78022 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/StateStatusCodingSpec.scala +++ b/designer/restmodel/src/test/scala/pl/touk/nussknacker/restmodel/scenariodetails/StateStatusCodingSpec.scala @@ -1,18 +1,20 @@ -package pl.touk.nussknacker.engine.api.deployment +package pl.touk.nussknacker.restmodel.scenariodetails import io.circe.Json import io.circe.syntax._ import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.deployment.{NoAttributesStateStatus, StateStatus} import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.test.EitherValuesDetailedMessage class StateStatusCodingSpec extends AnyFunSuite with Matchers with EitherValuesDetailedMessage with OptionValues { + import ScenarioStatusDto._ + test("simple status coding") { - val givenStatus: StateStatus = SimpleStateStatus.Running + val givenStatus: StateStatus = NoAttributesStateStatus("RUNNING") val statusJson = givenStatus.asJson statusJson.hcursor.get[String]("name").rightValue shouldEqual "RUNNING" diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala index 1892984ab2f..7f9692b944a 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/AppApiHttpService.scala @@ -3,13 +3,13 @@ package pl.touk.nussknacker.ui.api import com.typesafe.config.{Config, ConfigRenderOptions} import com.typesafe.scalalogging.LazyLogging import io.circe.parser -import pl.touk.nussknacker.engine.api.deployment.ProcessState import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType} import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime import pl.touk.nussknacker.engine.util.Implicits.RichTupleList import pl.touk.nussknacker.engine.version.BuildInfo +import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto import pl.touk.nussknacker.ui.api.description.AppApiEndpoints import pl.touk.nussknacker.ui.api.description.AppApiEndpoints.Dtos._ import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions @@ -165,7 +165,7 @@ class AppApiHttpService( } } - private def problemStateByProcessName(implicit user: LoggedUser): Future[Map[ProcessName, ProcessState]] = { + private def problemStateByProcessName(implicit user: LoggedUser): Future[Map[ProcessName, ScenarioStatusDto]] = { for { processes <- processService.getLatestProcessesWithDetails( ScenarioQuery.deployed, @@ -173,7 +173,7 @@ class AppApiHttpService( ) statusMap = processes.flatMap(process => process.state.map(process.name -> _)).toMap withProblem = statusMap.collect { - case (name, processStatus @ ProcessState(_, _ @ProblemStateStatus(_, _), _, _, _, _, _, _, _, _, _, _)) => + case (name, processStatus @ ScenarioStatusDto(_, _ @ProblemStateStatus(_, _), _, _, _, _, _, _, _, _, _, _)) => (name, processStatus) } } yield withProblem diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala index 7839d166a53..370bb1ef781 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala @@ -5,7 +5,8 @@ import db.util.DBIOActionInstances.DB import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, ProcessingType, VersionId} +import pl.touk.nussknacker.engine.api.process._ +import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser} @@ -195,8 +196,8 @@ class ActionService( _ = checkIfCanPerformActionOnScenario(actionName, processDetails) // 1.4. check if action is allowed for current state processState <- scenarioStateProvider.getProcessStateDBIO( - processDetails, - None + processDetails = processDetails, + currentlyPresentedVersionId = None ) _ = checkIfCanPerformActionInState(actionName, processDetails, processState) // 1.5. calculate which scenario version is affected by the action: latest for deploy, deployed for cancel @@ -231,7 +232,7 @@ class ActionService( private def checkIfCanPerformActionInState( actionName: ScenarioActionName, processDetails: ScenarioWithDetailsEntity[LatestScenarioDetailsShape], - ps: ProcessState + ps: ScenarioStatusDto ): Unit = { val allowedActions = ps.allowedActions.toSet if (!allowedActions.contains(actionName)) { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala index bc80022bd0a..1e501443e8f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala @@ -6,17 +6,17 @@ import cats.implicits.{toFoldableOps, toTraverseOps} import cats.syntax.functor._ import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus.FailedToGet import pl.touk.nussknacker.engine.api.process._ -import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.{ - WithDataFreshnessStatusMapOps, - WithDataFreshnessStatusOps -} -import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails +import pl.touk.nussknacker.engine.deployment.DeploymentId +import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps +import pl.touk.nussknacker.restmodel.scenariodetails.{ScenarioStatusDto, ScenarioWithDetails} import pl.touk.nussknacker.ui.BadRequestError import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions.Ops import pl.touk.nussknacker.ui.process.deployment.ScenarioStateProvider.FragmentStateException @@ -40,17 +40,17 @@ trait ScenarioStateProvider { def getProcessState( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] def getProcessState( processIdWithName: ProcessIdWithName, currentlyPresentedVersionId: Option[VersionId], - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] def getProcessStateDBIO(processDetails: ScenarioWithDetailsEntity[_], currentlyPresentedVersionId: Option[VersionId])( implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy - ): DB[ProcessState] + ): DB[ScenarioStatusDto] } @@ -85,38 +85,39 @@ private class ScenarioStateProviderImpl( def getProcessState( processIdWithName: ProcessIdWithName, currentlyPresentedVersionId: Option[VersionId], - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = { + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] = { dbioRunner.run(for { processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id) processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processIdWithName.name)) - result <- getProcessStateDBIO( + dto <- getProcessStateDBIO( processDetails, currentlyPresentedVersionId ) - } yield result) + } yield dto) } def getProcessStateDBIO( processDetails: ScenarioWithDetailsEntity[_], currentlyPresentedVersionId: Option[VersionId] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ProcessState] = { + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ScenarioStatusDto] = { for { inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessStateFetchingStatusFromManager( + statusDetails <- getProcessStateFetchingStatusFromManager( processDetails, - inProgressActionNames, - currentlyPresentedVersionId + inProgressActionNames ) - } yield result + dto = toDto(statusDetails, processDetails, currentlyPresentedVersionId) + } yield dto } def getProcessState( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = { + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] = { dbioRunner.run(for { inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None) - } yield result) + statusDetails <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames) + dto = toDto(statusDetails, processDetails, None) + } yield dto) } def enrichDetailsWithProcessState[F[_]: Traverse](processTraverse: F[ScenarioWithDetails])( @@ -139,20 +140,21 @@ private class ScenarioStateProviderImpl( // then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails]. prefetchedState = prefetchedStatesForProcessingType.getOrElse(process.name, List.empty) } yield prefetchedState - prefetchedState match { + (prefetchedState match { case Some(prefetchedStatusDetails) => getProcessStateUsingPrefetchedStatus( process.toEntity, actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), - None, prefetchedStatusDetails, - ).map(state => process.copy(state = Some(state))) + ) case None => getProcessStateFetchingStatusFromManager( process.toEntity, actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), - None, - ).map(state => process.copy(state = Some(state))) + ) + }).map { statusDetails => + val dto = toDto(statusDetails, process.toEntity, None) + process.copy(state = Some(dto)) } } .sequence[DB, ScenarioWithDetails] @@ -160,23 +162,50 @@ private class ScenarioStateProviderImpl( ) } + private def toDto( + statusDetails: StatusDetails, + processDetails: ScenarioWithDetailsEntity[_], + currentlyPresentedVersionId: Option[VersionId] + )(implicit user: LoggedUser) = { + val presentation = dispatcher + .deploymentManagerUnsafe(processDetails.processingType) + .processStateDefinitionManager + .statusPresentation( + ScenarioStatusWithScenarioContext( + statusDetails = statusDetails, + latestVersionId = processDetails.processVersionId, + deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId = currentlyPresentedVersionId + ) + ) + ScenarioStatusDto( + externalDeploymentId = statusDetails.externalDeploymentId, + status = statusDetails.status, + version = statusDetails.version, + visibleActions = presentation.visibleActions, + allowedActions = presentation.allowedActions, + actionTooltips = presentation.actionTooltips, + icon = presentation.icon, + tooltip = presentation.tooltip, + description = presentation.description, + startTime = statusDetails.startTime, + attributes = statusDetails.attributes, + errors = statusDetails.errors, + ) + } + private def getProcessStateFetchingStatusFromManager( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], - currentlyPresentedVersionId: Option[VersionId], - )(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[ProcessState] = { - getProcessState( + )(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[StatusDetails] = { + getScenarioStatusDetails( processDetails, inProgressActionNames, - currentlyPresentedVersionId, manager => getStateFromDeploymentManager( manager, processDetails.idWithName, processDetails.lastStateAction, - processDetails.processVersionId, - processDetails.lastDeployedAction.map(_.processVersionId), - currentlyPresentedVersionId, ) ) } @@ -241,7 +270,7 @@ private class ScenarioStateProviderImpl( .getInProgressActionNames(head.processIdUnsafe) .map(actionNames => Map(head.processIdUnsafe -> actionNames)) case _ => - // We are getting only Deploy and Cancel InProgress actions as only these two impact ProcessState + // We are getting only Deploy and Cancel InProgress actions as only these two impact scenario status actionRepository.getInProgressActionNames(Set(Deploy, Cancel)) } } @@ -249,66 +278,43 @@ private class ScenarioStateProviderImpl( private def getProcessStateUsingPrefetchedStatus( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], - currentlyPresentedVersionId: Option[VersionId], prefetchedStatusDetails: WithDataFreshnessStatus[List[StatusDetails]], - )(implicit user: LoggedUser): DB[ProcessState] = { - getProcessState( + )(implicit user: LoggedUser): DB[StatusDetails] = { + getScenarioStatusDetails( processDetails, inProgressActionNames, - currentlyPresentedVersionId, - { manager => - // FIXME abr: handle finished + { _ => + // FIXME abr: handle finished, it has no sense for periodic but it shouldn't hurt us Future { prefetchedStatusDetails.map { prefetchedStatusDetailsValue => - val resolved = - InconsistentStateDetector.resolve(prefetchedStatusDetailsValue, processDetails.lastStateAction) - manager.processStateDefinitionManager.processState( - resolved, - processDetails.processVersionId, - processDetails.lastDeployedAction.map(_.processVersionId), - currentlyPresentedVersionId - ) + // FIXME abr: resolved states shouldn't be handled here + InconsistentStateDetector.resolve(prefetchedStatusDetailsValue, processDetails.lastStateAction) } } } ) } - private def getProcessState( + private def getScenarioStatusDetails( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], - currentlyPresentedVersionId: Option[VersionId], - fetchState: DeploymentManager => Future[WithDataFreshnessStatus[ProcessState]], - )(implicit user: LoggedUser): DB[ProcessState] = { - val processVersionId = processDetails.processVersionId - val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) + fetchState: DeploymentManager => Future[WithDataFreshnessStatus[StatusDetails]], + )(implicit user: LoggedUser): DB[StatusDetails] = { dispatcher .deploymentManager(processDetails.processingType) .map { manager => if (processDetails.isFragment) { throw FragmentStateException } else if (processDetails.isArchived) { - getArchivedProcessState(processDetails, currentlyPresentedVersionId)(manager) + DBIOAction.successful(getArchivedProcessState(processDetails)) } else if (inProgressActionNames.contains(ScenarioActionName.Deploy)) { logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringDeploy}") DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.DuringDeploy, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + StatusDetails(SimpleStateStatus.DuringDeploy, None) + ) // FIXME abr: deploymentId from inProgressActionNames } else if (inProgressActionNames.contains(ScenarioActionName.Cancel)) { logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringCancel}") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.DuringCancel, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + DBIOAction.successful(StatusDetails(SimpleStateStatus.DuringCancel, None)) } else { processDetails.lastStateAction match { case Some(_) => @@ -322,101 +328,61 @@ private class ScenarioStateProviderImpl( statusWithFreshness.value } case _ => // We assume that the process never deployed should have no state at the engine + // FIXME abr: it is a part of deployment => scenario status resolution logger.debug(s"Status for never deployed: '${processDetails.name}' is: ${SimpleStateStatus.NotDeployed}") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.NotDeployed, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + DBIOAction.successful(StatusDetails(SimpleStateStatus.NotDeployed, None)) } } } + // FIXME abr: it is a part of deployment => scenario status resolution .getOrElse( - DBIOAction.successful(SimpleProcessStateDefinitionManager.errorFailedToGet(processVersionId)) + DBIOAction.successful(StatusDetails(FailedToGet, None)) ) } // We assume that checking the state for archived doesn't make sense, and we compute the state based on the last state action - private def getArchivedProcessState( - processDetails: ScenarioWithDetailsEntity[_], - currentlyPresentedVersionId: Option[VersionId], - )(implicit manager: DeploymentManager) = { - val processVersionId = processDetails.processVersionId - val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) - processDetails.lastStateAction.map(a => (a.actionName, a.state)) match { - case Some((Cancel, _)) => + private def getArchivedProcessState(processDetails: ScenarioWithDetailsEntity[_]): StatusDetails = { + processDetails.lastStateAction.map(a => (a.actionName, a.state, a.id)) match { + case Some((Cancel, _, _)) => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.Canceled}") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.Canceled, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) - case Some((Deploy, ProcessActionState.ExecutionFinished)) => + StatusDetails(SimpleStateStatus.Canceled, None) + case Some((Deploy, ProcessActionState.ExecutionFinished, deploymentActionId)) => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.Finished} ") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.Finished, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId.fromActionId(deploymentActionId))) case Some(_) => logger.warn(s"Status for: '${processDetails.name}' is: ${ProblemStateStatus.ArchivedShouldBeCanceled}") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(ProblemStateStatus.ArchivedShouldBeCanceled, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) - case _ => + // FIXME abr: it is a part of deployment => scenario status resolution + StatusDetails(ProblemStateStatus.ArchivedShouldBeCanceled, None) + case None => logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.NotDeployed}") - DBIOAction.successful( - manager.processStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.NotDeployed, None), - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + // FIXME abr: it is a part of deployment => scenario status resolution + StatusDetails(SimpleStateStatus.NotDeployed, None) } } private def getStateFromDeploymentManager( deploymentManager: DeploymentManager, processIdWithName: ProcessIdWithName, - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], + lastStateAction: Option[ProcessAction] )( implicit freshnessPolicy: DataFreshnessPolicy - ): Future[WithDataFreshnessStatus[ProcessState]] = { + ): Future[WithDataFreshnessStatus[StatusDetails]] = { - // FIXME abr: handle finished + // FIXME abr: handle finished, it has no sense for periodic but it shouldn't hurt us val state = deploymentManager - .getProcessStates(processIdWithName.name) + .getScenarioDeploymentsStatuses(processIdWithName.name) .map(_.map { statusDetails => - val resolved = InconsistentStateDetector.resolve(statusDetails, lastStateAction) - deploymentManager.processStateDefinitionManager - .processState(resolved, latestVersionId, deployedVersionId, currentlyPresentedVersionId) + // FIXME abr: resolved states shouldn't be handled here + InconsistentStateDetector.resolve(statusDetails, lastStateAction) }) .recover { case NonFatal(e) => logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) - failedToGetProcessState(latestVersionId) + failedToGetProcessState } scenarioStateTimeout .map { timeout => - state.withTimeout(timeout, timeoutResult = failedToGetProcessState(latestVersionId)).map { + state.withTimeout(timeout, timeoutResult = failedToGetProcessState).map { case CompletedNormally(value) => value case CompletedByTimeout(value) => @@ -428,8 +394,8 @@ private class ScenarioStateProviderImpl( .getOrElse(state) } - private def failedToGetProcessState(versionId: VersionId) = - WithDataFreshnessStatus.fresh(SimpleProcessStateDefinitionManager.errorFailedToGet(versionId)) + private val failedToGetProcessState = + WithDataFreshnessStatus.fresh(StatusDetails(FailedToGet, None)) private def existsOrFail[T](checkThisOpt: Option[T], failWith: => Exception): DB[T] = { checkThisOpt match { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala index ffad195cc2f..d054508fbbd 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala @@ -1,7 +1,8 @@ package pl.touk.nussknacker.ui.process.exception -import pl.touk.nussknacker.engine.api.deployment.{ProcessState, ScenarioActionName, StateStatus} +import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, StateStatus} import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto import pl.touk.nussknacker.ui.IllegalOperationError final case class ProcessIllegalAction(message: String) extends IllegalOperationError(message, details = "") @@ -11,7 +12,7 @@ object ProcessIllegalAction { def apply( actionName: ScenarioActionName, processName: ProcessName, - state: ProcessState + state: ScenarioStatusDto ): ProcessIllegalAction = apply(actionName, processName, state.status.name, state.allowedActions.toSet) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala index d72ea7b049c..6052c27139b 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala @@ -193,10 +193,10 @@ class PeriodicDeploymentManager private[periodic] ( override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = service.stateQueryForAllScenariosSupport - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - service.getMergedStatusDetails(name).map(_.map(List(_))) + service.getMergedStatusDetails(scenarioName).map(_.map(List(_))) } override def processStateDefinitionManager: ProcessStateDefinitionManager = diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala index 50b4bb22610..ef18474d558 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala @@ -219,7 +219,7 @@ class PeriodicProcessService( toDeploy: PeriodicProcessDeployment ): Future[Option[PeriodicProcessDeployment]] = { delegateDeploymentManager - .getProcessStates(toDeploy.periodicProcess.deploymentData.processName)(DataFreshnessPolicy.Fresh) + .getScenarioDeploymentsStatuses(toDeploy.periodicProcess.deploymentData.processName)(DataFreshnessPolicy.Fresh) .map( _.value .map(_.status) @@ -265,7 +265,9 @@ class PeriodicProcessService( schedules: SchedulesState ): Future[(Set[PeriodicProcessDeploymentId], Set[PeriodicProcessDeploymentId])] = for { - runtimeStatuses <- delegateDeploymentManager.getProcessStates(processName)(DataFreshnessPolicy.Fresh).map(_.value) + runtimeStatuses <- delegateDeploymentManager + .getScenarioDeploymentsStatuses(processName)(DataFreshnessPolicy.Fresh) + .map(_.value) _ = logger.debug(s"Process '$processName' runtime statuses: ${runtimeStatuses.map(_.toString)}") scheduleDeploymentsWithStatus = schedules.schedules.values.toList.flatMap { scheduleData => logger.debug( @@ -302,22 +304,22 @@ class PeriodicProcessService( processName: ProcessName, versionId: VersionId, deployment: ScheduleDeploymentData, - processState: Option[StatusDetails], + statusDetails: Option[StatusDetails], ): Future[NeedsReschedule] = { implicit class RichFuture[Unit](a: Future[Unit]) { def needsReschedule(value: Boolean): Future[NeedsReschedule] = a.map(_ => value) } - processState.map(_.status) match { + statusDetails.map(_.status) match { case Some(status) if ProblemStateStatus.isProblemStatus( status ) && deployment.state.status != PeriodicProcessDeploymentStatus.Failed => - markFailedAction(deployment, processState).needsReschedule(executionConfig.rescheduleOnFailure) + markFailedAction(deployment, statusDetails).needsReschedule(executionConfig.rescheduleOnFailure) case Some(status) if EngineStatusesToReschedule.contains( status ) && deployment.state.status != PeriodicProcessDeploymentStatus.Finished => - markFinished(processName, versionId, deployment, processState).needsReschedule(value = true) + markFinished(processName, versionId, deployment, statusDetails).needsReschedule(value = true) case None if deployment.state.status == PeriodicProcessDeploymentStatus.Deployed && deployment.deployedAt.exists(_.isBefore(LocalDateTime.now().minusMinutes(5))) => @@ -325,7 +327,7 @@ class PeriodicProcessService( // this can be caused by a race in e.g. FlinkRestManager // (because /jobs/overview used in getProcessStates isn't instantly aware of submitted jobs) // so freshly deployed deployments aren't considered - markFinished(processName, versionId, deployment, processState).needsReschedule(value = true) + markFinished(processName, versionId, deployment, statusDetails).needsReschedule(value = true) case _ => Future.successful(()).needsReschedule(value = false) } @@ -557,7 +559,7 @@ class PeriodicProcessService( def getMergedStatusDetails( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[StatusDetails]] = { - delegateDeploymentManager.getProcessStates(name).flatMap { statusesWithFreshness => + delegateDeploymentManager.getScenarioDeploymentsStatuses(name).flatMap { statusesWithFreshness => logger.debug(s"Statuses for $name: $statusesWithFreshness") mergeStatusWithDeployments(name, statusesWithFreshness.value).map { statusDetails => statusesWithFreshness.copy(value = statusDetails) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala index 3784a80adc4..5f1372c65bf 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala @@ -1,6 +1,9 @@ package pl.touk.nussknacker.ui.process.periodic -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.defaultVisibleActions +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ + ScenarioStatusWithScenarioContext, + defaultVisibleActions +} import pl.touk.nussknacker.engine.api.deployment.{ OverridingProcessStateDefinitionManager, ProcessStateDefinitionManager, @@ -26,39 +29,52 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana delegate = delegate ) { - override def statusActions(processStatus: ProcessStateDefinitionManager.ProcessStatus): List[ScenarioActionName] = { - super.statusActions(processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus)) + override def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = { + super.statusActions( + extractPeriodicStatus(input.status) + .map(periodic => input.withStatus(periodic.mergedStatus)) + .getOrElse(input) // We have to handle also statuses resolved by core (for example NotDeployed) + ) } - override def actionTooltips( - processStatus: ProcessStateDefinitionManager.ProcessStatus - ): Map[ScenarioActionName, String] = { + override def actionTooltips(input: ScenarioStatusWithScenarioContext): Map[ScenarioActionName, String] = { super.actionTooltips( - processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus) + extractPeriodicStatus(input.status) + .map(periodic => input.withStatus(periodic.mergedStatus)) + .getOrElse(input) // We have to handle also statuses resolved by core (for example NotDeployed) ) } - override def statusIcon(stateStatus: StateStatus): URI = { - super.statusIcon(extractPeriodicStatus(stateStatus).mergedStatus) + override def statusIcon(input: ScenarioStatusWithScenarioContext): URI = { + super.statusIcon( + extractPeriodicStatus(input.status) + .map(periodic => input.withStatus(periodic.mergedStatus)) + .getOrElse(input) // We have to handle also statuses resolved by core (for example NotDeployed) + ) } - override def statusDescription(stateStatus: StateStatus): String = { - super.statusDescription(extractPeriodicStatus(stateStatus).mergedStatus) + override def statusDescription(input: ScenarioStatusWithScenarioContext): String = { + super.statusDescription( + extractPeriodicStatus(input.status) + .map(periodic => input.withStatus(periodic.mergedStatus)) + .getOrElse(input) // We have to handle also statuses resolved by core (for example NotDeployed) + ) } - override def statusTooltip(stateStatus: StateStatus): String = { - val periodicStatus = extractPeriodicStatus(stateStatus) - PeriodicProcessStateDefinitionManager.statusTooltip( - activeDeploymentsStatuses = periodicStatus.activeDeploymentsStatuses, - inactiveDeploymentsStatuses = periodicStatus.inactiveDeploymentsStatuses - ) + override def statusTooltip(input: ScenarioStatusWithScenarioContext): String = { + extractPeriodicStatus(input.status) + .map { periodicStatus => + PeriodicProcessStateDefinitionManager.statusTooltip( + activeDeploymentsStatuses = periodicStatus.activeDeploymentsStatuses, + inactiveDeploymentsStatuses = periodicStatus.inactiveDeploymentsStatuses + ) + } + .getOrElse(super.statusTooltip(input)) } private def extractPeriodicStatus(stateStatus: StateStatus) = { - stateStatus match { - case periodic: PeriodicProcessStatusWithMergedStatus => - periodic - case other => throw new IllegalStateException(s"Unexpected status: $other") + Option(stateStatus) collect { case periodic: PeriodicProcessStatusWithMergedStatus => + periodic } } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicStateStatus.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicStateStatus.scala index ce47c08485d..1991bc674d6 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicStateStatus.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicStateStatus.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.ui.process.periodic -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus @@ -26,33 +26,33 @@ object PeriodicStateStatus { case object ScheduledStatus { val name = "SCHEDULED" - - def isScheduledStatus(status: StateStatus): Boolean = { - status.name == name - } - } val WaitingForScheduleStatus: StateStatus = StateStatus("WAITING_FOR_SCHEDULE") - val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = { - case ProcessStatus(SimpleStateStatus.Running, _, _, _) => - // periodic processes cannot be redeployed from GUI - List(ScenarioActionName.Cancel) - case ProcessStatus(_: ScheduledStatus, _, deployedVersionId, Some(currentlyPresentedVersionId)) - if deployedVersionId.contains(currentlyPresentedVersionId) => - List(ScenarioActionName.Cancel, ScenarioActionName.Deploy, ScenarioActionName.RunOffSchedule) - case ProcessStatus(_: ScheduledStatus, _, _, None) => - // At the moment of deployment or validation, we may not have the information about the currently displayed version - // In that case we assume, that it was validated before the deployment was initiated. - List(ScenarioActionName.Cancel, ScenarioActionName.Deploy, ScenarioActionName.RunOffSchedule) - case ProcessStatus(_: ScheduledStatus, _, _, _) => - List(ScenarioActionName.Cancel, ScenarioActionName.Deploy) - case ProcessStatus(WaitingForScheduleStatus, _, _, _) => - List(ScenarioActionName.Cancel) // or maybe should it be empty?? - case ProcessStatus(_: ProblemStateStatus, _, _, _) => - List(ScenarioActionName.Cancel) // redeploy is not allowed - } + val statusActionsPF: PartialFunction[ScenarioStatusWithScenarioContext, List[ScenarioActionName]] = + Function.unlift((input: ScenarioStatusWithScenarioContext) => + (input.status, input.deployedVersionId, input.currentlyPresentedVersionId) match { + case (SimpleStateStatus.Running, _, _) => + // periodic processes cannot be redeployed from GUI + Some(List(ScenarioActionName.Cancel)) + case (_: ScheduledStatus, deployedVersionId, Some(currentlyPresentedVersionId)) + if deployedVersionId.contains(currentlyPresentedVersionId) => + Some(List(ScenarioActionName.Cancel, ScenarioActionName.Deploy, ScenarioActionName.RunOffSchedule)) + case (_: ScheduledStatus, _, None) => + // At the moment of deployment or validation, we may not have the information about the currently displayed version + // In that case we assume, that it was validated before the deployment was initiated. + Some(List(ScenarioActionName.Cancel, ScenarioActionName.Deploy, ScenarioActionName.RunOffSchedule)) + case (_: ScheduledStatus, _, _) => + Some(List(ScenarioActionName.Cancel, ScenarioActionName.Deploy)) + case (WaitingForScheduleStatus, _, _) => + Some(List(ScenarioActionName.Cancel)) // or maybe should it be empty?? + case (_: ProblemStateStatus, _, _) => + Some(List(ScenarioActionName.Cancel)) // redeploy is not allowed + case _ => + None + } + ) val statusTooltipsPF: PartialFunction[StateStatus, String] = { case ScheduledStatus(nextRunAt) => s"Scheduled at ${nextRunAt.pretty}" @@ -77,20 +77,19 @@ object PeriodicStateStatus { ), ) - def customActionTooltips(processStatus: ProcessStatus): Map[ScenarioActionName, String] = { - processStatus match { - case ProcessStatus(_: ScheduledStatus, _, deployedVersionId, currentlyPresentedVersionId) - if currentlyPresentedVersionId == deployedVersionId => + def customActionTooltips(input: ScenarioStatusWithScenarioContext): Map[ScenarioActionName, String] = { + input.status match { + case _: ScheduledStatus if input.currentlyPresentedVersionId == input.deployedVersionId => Map.empty - case ProcessStatus(_: ScheduledStatus, _, deployedVersionIdOpt, currentlyPresentedVersionId) => + case _: ScheduledStatus => def print(versionIdOpt: Option[VersionId]) = versionIdOpt match { case Some(versionId) => s"${versionId.value}" case None => "[unknown]" } Map( - ScenarioActionName.RunOffSchedule -> s"Version ${print(deployedVersionIdOpt)} is deployed, but different version ${print(currentlyPresentedVersionId)} is displayed" + ScenarioActionName.RunOffSchedule -> s"Version ${print(input.deployedVersionId)} is deployed, but different version ${print(input.currentlyPresentedVersionId)} is displayed" ) - case ProcessStatus(other, _, _, _) => + case other => Map(ScenarioActionName.RunOffSchedule -> s"Disabled for ${other.name} status.") } } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index 526b12b548a..8c22a94cc42 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -18,7 +18,7 @@ object InvalidDeploymentManagerStub extends DeploymentManager { deploymentId = None ) - override def getProcessStates(name: ProcessName)( + override def getScenarioDeploymentsStatuses(scenarioName: ProcessName)( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { Future.successful(WithDataFreshnessStatus.fresh(List(stubbedStatus))) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index 610ef66f15c..61eb0f44279 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -75,14 +75,14 @@ class MockDeploymentManager private ( case None => super.processStateDefinitionManager } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { Future { Thread.sleep(delayBeforeStateReturn.toMillis) WithDataFreshnessStatus.fresh( managerProcessStates.getOrDefault( - name, + scenarioName, List(sampleStatusDetails(defaultProcessStateStatus, sampleDeploymentId)) ) ) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubScenarioStateProvider.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubScenarioStateProvider.scala index ab6264838d3..05deec01ad5 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubScenarioStateProvider.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/StubScenarioStateProvider.scala @@ -4,7 +4,7 @@ import cats.Traverse import db.util.DBIOActionInstances.DB import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} -import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails +import pl.touk.nussknacker.restmodel.scenariodetails.{ScenarioStatusDto, ScenarioWithDetails} import pl.touk.nussknacker.ui.process.deployment.ScenarioStateProvider import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity import pl.touk.nussknacker.ui.security.api.LoggedUser @@ -13,17 +13,17 @@ import slick.dbio.DBIO import scala.concurrent.Future import scala.language.higherKinds -class StubScenarioStateProvider(states: Map[ProcessName, ProcessState]) extends ScenarioStateProvider { +class StubScenarioStateProvider(states: Map[ProcessName, ScenarioStatusDto]) extends ScenarioStateProvider { override def getProcessState( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] = Future.successful(states(processDetails.name)) override def getProcessState( processIdWithName: ProcessIdWithName, currentlyPresentedVersionId: Option[VersionId], - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusDto] = Future.successful(states(processIdWithName.name)) override def enrichDetailsWithProcessState[F[_]: Traverse](processTraverse: F[ScenarioWithDetails])( @@ -34,7 +34,7 @@ class StubScenarioStateProvider(states: Map[ProcessName, ProcessState]) extends override def getProcessStateDBIO( processDetails: ScenarioWithDetailsEntity[_], currentlyPresentedVersionId: Option[VersionId] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ProcessState] = + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ScenarioStatusDto] = DBIO.successful(states(processDetails.name)) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index a2ef9face5f..2a6bd79e20c 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -11,8 +11,8 @@ import org.scalatestplus.mockito.MockitoSugar import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{ @@ -278,16 +278,8 @@ class NotificationServiceTest notificationAfterExecutionFinished.head.id should not equal deployNotificationId } - private val notDeployed = - SimpleProcessStateDefinitionManager.processState( - StatusDetails(SimpleStateStatus.NotDeployed, None), - VersionId(1), - None, - Some(VersionId(1)), - ) - private def createServices(deploymentManager: DeploymentManager) = { - when(deploymentManager.getProcessStates(any[ProcessName])(any[DataFreshnessPolicy])) + when(deploymentManager.getScenarioDeploymentsStatuses(any[ProcessName])(any[DataFreshnessPolicy])) .thenReturn(Future.successful(WithDataFreshnessStatus.fresh(List.empty[StatusDetails]))) val managerDispatcher = mock[DeploymentManagerDispatcher] when(managerDispatcher.deploymentManager(any[String])(any[LoggedUser])).thenReturn(Some(deploymentManager)) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala index 93bd8c5353a..10b2a8a9ba5 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/ProcessStateDefinitionServiceSpec.scala @@ -4,7 +4,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.component.ComponentDefinition -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ @@ -158,8 +158,8 @@ class ProcessStateDefinitionServiceSpec extends AnyFunSuite with Matchers { } private val emptyStateDefinitionManager = new ProcessStateDefinitionManager { - override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty - override def statusActions(processStatus: ProcessStatus): List[ScenarioActionName] = Nil + override def stateDefinitions: Map[StatusName, StateDefinitionDetails] = Map.empty + override def statusActions(input: ScenarioStatusWithScenarioContext): List[ScenarioActionName] = Nil } private def createProcessingTypeDataMap( diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index 92780f56441..3107d218215 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.api.{Comment, ProcessVersion} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} +import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting import pl.touk.nussknacker.test.base.it.WithClock import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops @@ -958,7 +959,7 @@ class DeploymentServiceSpec deploymentManager.deploys.clear() } - private def checkIsFollowingDeploy(state: ProcessState, expected: Boolean) = { + private def checkIsFollowingDeploy(state: ScenarioStatusDto, expected: Boolean) = { withClue(state) { SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status) shouldBe expected } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala index 455c5118433..0fa7712cb2d 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala @@ -62,10 +62,10 @@ class DeploymentManagerStub extends BaseDeploymentManager { ) } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - Future.successful(WithDataFreshnessStatus.fresh(getJobStatus(name).toList.flatten)) + Future.successful(WithDataFreshnessStatus.fresh(getJobStatus(scenarioName).toList.flatten)) } override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala index 1ddd4b360b9..d1aa93d39f0 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala @@ -6,6 +6,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Inside, OptionValues} import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.scheduler.services.{EmptyListener, ProcessConfigEnricher} import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus @@ -93,7 +94,14 @@ class PeriodicDeploymentManagerTest currentlyPresentedVersionId: Option[VersionId], ): List[ScenarioActionName] = { periodicDeploymentManager.processStateDefinitionManager - .processState(statusDetails, latestVersionId, deployedVersionId, currentlyPresentedVersionId) + .statusPresentation( + ScenarioStatusWithScenarioContext( + statusDetails, + latestVersionId, + deployedVersionId, + currentlyPresentedVersionId + ) + ) .allowedActions } @@ -138,7 +146,7 @@ class PeriodicDeploymentManagerTest ScenarioActionName.Deploy ) f.periodicDeploymentManager - .getProcessStates(idWithName.name) + .getScenarioDeploymentsStatuses(idWithName.name) .futureValue .value .loneElement @@ -171,17 +179,19 @@ class PeriodicDeploymentManagerTest val statusDetails = f.periodicDeploymentManager - .getProcessStates(processName) + .getScenarioDeploymentsStatuses(processName) .futureValue .value .loneElement statusDetails.mergedStatus shouldBe SimpleStateStatus.Finished - val state = f.periodicDeploymentManager.processStateDefinitionManager.processState( - statusDetails, - processVersion.versionId, - None, - Some(processVersion.versionId) + val state = f.periodicDeploymentManager.processStateDefinitionManager.statusPresentation( + ScenarioStatusWithScenarioContext( + statusDetails, + processVersion.versionId, + None, + Some(processVersion.versionId) + ) ) state.allowedActions shouldBe List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala index 600b9c5c284..551701c9a77 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicProcessStateDefinitionManagerTest.scala @@ -2,8 +2,8 @@ package pl.touk.nussknacker.ui.process.periodic.flink import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus -import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext +import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, StatusDetails} import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicDeploymentStatus @@ -70,8 +70,8 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher test("not display custom tooltip for perform single execution when latest version is deployed") { PeriodicStateStatus.customActionTooltips( - ProcessStatus( - stateStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()), + ScenarioStatusWithScenarioContext( + statusDetails = StatusDetails(ScheduledStatus(nextRunAt = LocalDateTime.now()), None), latestVersionId = VersionId(5), deployedVersionId = Some(VersionId(5)), currentlyPresentedVersionId = Some(VersionId(5)), @@ -83,8 +83,8 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher "display custom tooltip for perform single execution when deployed version is different than currently displayed" ) { PeriodicStateStatus.customActionTooltips( - ProcessStatus( - stateStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()), + ScenarioStatusWithScenarioContext( + statusDetails = StatusDetails(ScheduledStatus(nextRunAt = LocalDateTime.now()), None), latestVersionId = VersionId(5), deployedVersionId = Some(VersionId(4)), currentlyPresentedVersionId = Some(VersionId(5)), @@ -96,8 +96,8 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher test("display custom tooltip for perform single execution in CANCELED state") { PeriodicStateStatus.customActionTooltips( - ProcessStatus( - stateStatus = SimpleStateStatus.Canceled, + ScenarioStatusWithScenarioContext( + statusDetails = StatusDetails(SimpleStateStatus.Canceled, None), latestVersionId = VersionId(5), deployedVersionId = Some(VersionId(4)), currentlyPresentedVersionId = Some(VersionId(5)), diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index 62eaf708cd0..0d933bb66d6 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -57,14 +57,14 @@ class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, waitForJobIsFinishedRetryPolicy = 20.seconds.toPausePolicy ) - implicit private class ProcessStateExpandable(processState: StatusDetails) { + implicit private class StatusDetailsExpandable(statusDetails: StatusDetails) { def withStateStatus(stateStatus: StateStatus): StatusDetails = { StatusDetails( stateStatus, - processState.deploymentId, - processState.externalDeploymentId, - processState.version, + statusDetails.deploymentId, + statusDetails.externalDeploymentId, + statusDetails.version, Some(System.currentTimeMillis()) ) } @@ -145,10 +145,10 @@ class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, Future.unit } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - Future.successful(WithDataFreshnessStatus.fresh(memory.get(name).toList)) + Future.successful(WithDataFreshnessStatus.fresh(memory.get(scenarioName).toList)) } override def processStateDefinitionManager: ProcessStateDefinitionManager = @@ -163,15 +163,16 @@ class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, } private def changeState(name: ProcessName, stateStatus: StateStatus): Unit = - memory.get(name).foreach { processState => - val newProcessState = processState.withStateStatus(stateStatus) + memory.get(name).foreach { statusDetails => + val newProcessState = statusDetails.withStateStatus(stateStatus) memory.update(name, newProcessState) - logger.debug(s"Changed scenario $name state from ${processState.status.name} to ${stateStatus.name}.") + logger.debug(s"Changed scenario $name state from ${statusDetails.status.name} to ${stateStatus.name}.") } private def asyncChangeState(name: ProcessName, stateStatus: StateStatus): Unit = - memory.get(name).foreach { processState => - logger.debug(s"Starting async changing state for $name from ${processState.status.name} to ${stateStatus.name}..") + memory.get(name).foreach { statusDetails => + logger + .debug(s"Starting async changing state for $name from ${statusDetails.status.name} to ${stateStatus.name}..") actorSystem.scheduler.scheduleOnce( sleepingTimeSeconds, new Runnable { @@ -182,7 +183,7 @@ class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, } private def createAndSaveProcessState(stateStatus: StateStatus, processVersion: ProcessVersion): StatusDetails = { - val processState = StatusDetails( + val statusDetails = StatusDetails( stateStatus, None, Some(ExternalDeploymentId(UUID.randomUUID().toString)), @@ -190,8 +191,8 @@ class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, startTime = Some(System.currentTimeMillis()), ) - memory.update(processVersion.processName, processState) - processState + memory.update(processVersion.processName, statusDetails) + statusDetails } private def sleepingTimeSeconds = FiniteDuration( diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala index bd0ca90b5b0..08fbcfc8eff 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentProcessStateDefinitionManager.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.development.manager -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.StateDefinitionDetails.UnknownIcon import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName import pl.touk.nussknacker.engine.api.deployment._ @@ -22,10 +22,10 @@ object DevelopmentStateStatus { val PreparingResourcesActionName: ScenarioActionName = ScenarioActionName("PREPARING") val TestActionName: ScenarioActionName = ScenarioActionName("TEST") - val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { - case DevelopmentStateStatus.AfterRunningStatus => List(ScenarioActionName.Cancel) - case DevelopmentStateStatus.PreparingResourcesStatus => List(ScenarioActionName.Deploy) - case DevelopmentStateStatus.TestStatus => List(ScenarioActionName.Deploy) + val statusActionsPF: PartialFunction[ScenarioStatusWithScenarioContext, List[ScenarioActionName]] = { + case input if input.status == DevelopmentStateStatus.AfterRunningStatus => List(ScenarioActionName.Cancel) + case input if input.status == DevelopmentStateStatus.PreparingResourcesStatus => List(ScenarioActionName.Deploy) + case input if input.status == DevelopmentStateStatus.TestStatus => List(ScenarioActionName.Deploy) } val customStateDefinitions: Map[StatusName, StateDefinitionDetails] = Map( diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index a686795e216..4456cebab1f 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -84,12 +84,12 @@ object MockableDeploymentManagerProvider { override def processStateDefinitionManager: ProcessStateDefinitionManager = SimpleProcessStateDefinitionManager - override def getProcessStates(name: ProcessName)( + override def getScenarioDeploymentsStatuses(scenarioName: ProcessName)( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { val statusDetails = MockableDeploymentManager.scenarioStatuses .get() - .getOrElse(name.value, BasicStatusDetails(SimpleStateStatus.NotDeployed, version = None)) + .getOrElse(scenarioName.value, BasicStatusDetails(SimpleStateStatus.NotDeployed, version = None)) Future.successful( WithDataFreshnessStatus.fresh( List( diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala index 25962def989..9fe7482a40f 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/BaseFlinkDeploymentManagerSpec.scala @@ -28,11 +28,7 @@ class MiniClusterFlinkDeploymentManagerSpec extends BaseFlinkDeploymentManagerSp override protected def useMiniClusterForDeployment: Boolean = true } -trait BaseFlinkDeploymentManagerSpec - extends AnyFunSuiteLike - with Matchers - with StreamingDockerTest - with StrictLogging { +trait BaseFlinkDeploymentManagerSpec extends AnyFunSuiteLike with Matchers with StreamingDockerTest with StrictLogging { import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer @@ -203,7 +199,7 @@ trait BaseFlinkDeploymentManagerSpec .processCommand(DMStopScenarioCommand(processName, savepointDir = None, user = userToAct)) .map(_.path) eventually { - val status = deploymentManager.getProcessStates(processName).futureValue + val status = deploymentManager.getScenarioDeploymentsStatuses(processName).futureValue status.value.map(_.status) shouldBe List(SimpleStateStatus.Canceled) } @@ -314,5 +310,5 @@ trait BaseFlinkDeploymentManagerSpec .toList private def processVersion(name: ProcessName): List[ProcessVersion] = - deploymentManager.getProcessStates(name).futureValue.value.flatMap(_.version) + deploymentManager.getScenarioDeploymentsStatuses(name).futureValue.value.flatMap(_.version) } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala index 16c8b3b24f3..3b9f74ebe28 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/JavaConfigDeploymentManagerSpec.scala @@ -46,7 +46,7 @@ class JavaConfigDeploymentManagerSpec extends AnyFunSuite with Matchers with Str ) eventually { - val jobStatus = deploymentManager.getProcessStates(process.name).futureValue.value + val jobStatus = deploymentManager.getScenarioDeploymentsStatuses(process.name).futureValue.value jobStatus.map(_.status) shouldBe List(SimpleStateStatus.Running) } diff --git a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala index b6a2d5738cd..3bd053dd4be 100644 --- a/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala +++ b/engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/StreamingDockerTest.scala @@ -61,7 +61,7 @@ trait StreamingDockerTest extends DockerTest with BeforeAndAfterAll with Matcher ): Assertion = { deployProcess(process, processVersion, stateRestoringStrategy) eventually { - val jobStatuses = deploymentManager.getProcessStates(process.name).futureValue.value + val jobStatuses = deploymentManager.getScenarioDeploymentsStatuses(process.name).futureValue.value logger.debug(s"Waiting for deploy: ${process.name}, $jobStatuses") jobStatuses.map(_.status) should contain(SimpleStateStatus.Running) @@ -89,7 +89,7 @@ trait StreamingDockerTest extends DockerTest with BeforeAndAfterAll with Matcher deploymentManager.processCommand(DMCancelScenarioCommand(processName, user = userToAct)).futureValue eventually { val statuses = deploymentManager - .getProcessStates(processName) + .getScenarioDeploymentsStatuses(processName) .futureValue .value val runningOrDuringCancelJobs = statuses diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index d8dbce3fb00..f93590e0a9e 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -180,7 +180,7 @@ class FlinkDeploymentManager( private def oldJobsToStop(processVersion: ProcessVersion): Future[List[StatusDetails]] = { implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(processVersion.processName) + getScenarioDeploymentsStatuses(processVersion.processName) .map(_.value.filter(details => SimpleStateStatus.DefaultFollowingDeployStatuses.contains(details.status))) } @@ -202,7 +202,7 @@ class FlinkDeploymentManager( action: ExternalDeploymentId => Future[T] ): Future[T] = { implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(processName).flatMap { statuses => + getScenarioDeploymentsStatuses(processName).flatMap { statuses => val runningDeploymentIds = statuses.value.filter(statusDetailsPredicate).collect { case StatusDetails(SimpleStateStatus.Running, _, Some(deploymentId), _, _, _, _) => deploymentId } @@ -241,10 +241,10 @@ class FlinkDeploymentManager( else Future.successful(()) - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - getAllProcessesStatesFromFlink().map(_.getOrElse(name, List.empty)) + getAllProcessesStatesFromFlink().map(_.getOrElse(scenarioName, List.empty)) } override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport = @@ -311,7 +311,7 @@ class FlinkDeploymentManager( .Pause(config.maxChecks, config.delay) .apply { implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(processName).map { statuses => + getScenarioDeploymentsStatuses(processName).map { statuses => statuses.value .find(details => details.externalDeploymentId @@ -335,7 +335,7 @@ class FlinkDeploymentManager( protected def cancelScenario(command: DMCancelScenarioCommand): Future[Unit] = { import command._ implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(scenarioName).map(_.value).flatMap { statuses => + getScenarioDeploymentsStatuses(scenarioName).map(_.value).flatMap { statuses => cancelEachMatchingJob(scenarioName, None, statuses) } } @@ -343,7 +343,7 @@ class FlinkDeploymentManager( private def cancelDeployment(command: DMCancelDeploymentCommand): Future[Unit] = { import command._ implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh - getProcessStates(scenarioName).map(_.value).flatMap { statuses => + getScenarioDeploymentsStatuses(scenarioName).map(_.value).flatMap { statuses => cancelEachMatchingJob(scenarioName, Some(deploymentId), statuses.filter(_.deploymentId.contains(deploymentId))) } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala index b7453dc58a7..7090438a074 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStateStatus.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.management -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus @@ -10,9 +10,9 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus */ object FlinkStateStatus { - val statusActionsPF: PartialFunction[ProcessStatus, List[ScenarioActionName]] = _.stateStatus match { - case SimpleStateStatus.DuringDeploy => List(ScenarioActionName.Cancel) - case SimpleStateStatus.Restarting => List(ScenarioActionName.Cancel) + val statusActionsPF: PartialFunction[ScenarioStatusWithScenarioContext, List[ScenarioActionName]] = { + case input if input.status == SimpleStateStatus.DuringDeploy => List(ScenarioActionName.Cancel) + case input if input.status == SimpleStateStatus.Restarting => List(ScenarioActionName.Cancel) } } diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala index ae8eb1f7936..c8a3ed671b5 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala @@ -386,7 +386,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - val returnedStatuses = manager.getProcessStates(ProcessName("p1")).map(_.value).futureValue + val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( StatusDetails( ProblemStateStatus.MultipleJobsRunning, @@ -416,7 +416,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - val returnedStatuses = manager.getProcessStates(ProcessName("p1")).map(_.value).futureValue + val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( StatusDetails( ProblemStateStatus.MultipleJobsRunning, @@ -447,7 +447,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - val returnedStatuses = manager.getProcessStates(ProcessName("p1")).map(_.value).futureValue + val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( StatusDetails( SimpleStateStatus.Running, @@ -476,7 +476,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - val returnedStatuses = manager.getProcessStates(ProcessName("p1")).map(_.value).futureValue + val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( StatusDetails( SimpleStateStatus.Finished, @@ -506,7 +506,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - val returnedStatuses = manager.getProcessStates(ProcessName("p1")).map(_.value).futureValue + val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( StatusDetails( SimpleStateStatus.Restarting, @@ -553,7 +553,7 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS ) val manager = createManager(statuses) - manager.getProcessStates(processName).map(_.value).futureValue shouldBe List( + manager.getScenarioDeploymentsStatuses(processName).map(_.value).futureValue shouldBe List( StatusDetails( SimpleStateStatus.Finished, Some(DeploymentId(deploymentId)), @@ -589,13 +589,13 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS stubWithFixedDelay(durationLongerThanClientTimeout) a[SttpClientException.TimeoutException] shouldBe thrownBy { manager - .getProcessStates(ProcessName("p1")) + .getScenarioDeploymentsStatuses(ProcessName("p1")) .futureValueEnsuringInnerException(durationLongerThanClientTimeout) } stubWithFixedDelay(0.seconds) val resultWithoutDelay = manager - .getProcessStates(ProcessName("p1")) + .getScenarioDeploymentsStatuses(ProcessName("p1")) .map(_.value) .futureValue(Timeout(durationLongerThanClientTimeout.plus(1 second))) resultWithoutDelay shouldEqual List.empty diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkScenarioStatusDtoSpec.scala similarity index 64% rename from engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala rename to engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkScenarioStatusDtoSpec.scala index f85af8d6ae9..3524a8cc3a4 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkProcessStateSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkScenarioStatusDtoSpec.scala @@ -4,19 +4,25 @@ import org.scalatest.Inside import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{ + ScenarioStatusPresentationDetails, + ScenarioStatusWithScenarioContext +} import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.deployment.{ProcessState, ScenarioActionName, StateStatus, StatusDetails} +import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, StateStatus, StatusDetails} import pl.touk.nussknacker.engine.api.process.VersionId import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId -class FlinkProcessStateSpec extends AnyFunSpec with Matchers with Inside { +class FlinkScenarioStatusDtoSpec extends AnyFunSpec with Matchers with Inside { - def createProcessState(stateStatus: StateStatus): ProcessState = - FlinkProcessStateDefinitionManager.processState( - StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12")), Some(ProcessVersion.empty)), - VersionId(1), - None, - None, + def createProcessState(stateStatus: StateStatus): ScenarioStatusPresentationDetails = + FlinkProcessStateDefinitionManager.statusPresentation( + ScenarioStatusWithScenarioContext( + StatusDetails(stateStatus, None, Some(ExternalDeploymentId("12")), Some(ProcessVersion.empty)), + VersionId(1), + None, + None, + ) ) it("scenario state should be during deploy") { diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 1f34fc80c65..1eabc76028a 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -167,13 +167,13 @@ class EmbeddedDeploymentManager( } } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { Future.successful( WithDataFreshnessStatus.fresh( deployments - .get(name) + .get(scenarioName) .map { interpreterData => StatusDetails( status = interpreterData.scenarioDeployment diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala index 83315d6d799..caf2f64ff1e 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedProcessStateDefinitionManager.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.embedded -import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ProcessStatus +import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.deployment.{OverridingProcessStateDefinitionManager, ScenarioActionName} @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.deployment.{OverridingProcessStateDefiniti object EmbeddedProcessStateDefinitionManager extends OverridingProcessStateDefinitionManager( delegate = SimpleProcessStateDefinitionManager, - statusActionsPF = { case ProcessStatus(SimpleStateStatus.Restarting, _, _, _) => + statusActionsPF = { case ScenarioStatusWithScenarioContext(SimpleStateStatus.Restarting, _, _, _) => List(ScenarioActionName.Cancel) } ) diff --git a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala index 5f4992e1a19..a62727d81b7 100644 --- a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala +++ b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/RequestResponseEmbeddedDeploymentManagerTest.scala @@ -129,7 +129,9 @@ class RequestResponseEmbeddedDeploymentManagerTest fixture.deployScenario(scenario) eventually { - manager.getProcessStates(name).futureValue.value.map(_.status) shouldBe List(SimpleStateStatus.Running) + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( + SimpleStateStatus.Running + ) } request.body("""{ productId: 15 }""").send(backend).body shouldBe Right("""{"transformed":15}""") @@ -149,7 +151,7 @@ class RequestResponseEmbeddedDeploymentManagerTest manager.processCommand(DMCancelScenarioCommand(name, User("a", "b"))).futureValue - manager.getProcessStates(name).futureValue.value shouldBe List.empty + manager.getScenarioDeploymentsStatuses(name).futureValue.value shouldBe List.empty request.body("""{ productId: 15 }""").send(backend).code shouldBe StatusCode.NotFound } diff --git a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala index 814529cf5f1..33293c95639 100644 --- a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala +++ b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerRestartTest.scala @@ -42,7 +42,7 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde kafkaServerWithDependencies.shutdownKafkaServer() eventually { - val jobStatuses = manager.getProcessStates(name).futureValue.value + val jobStatuses = manager.getScenarioDeploymentsStatuses(name).futureValue.value jobStatuses.map(_.status) shouldBe List(SimpleStateStatus.Restarting) } @@ -51,7 +51,9 @@ class StreamingEmbeddedDeploymentManagerRestartTest extends BaseStreamingEmbedde kafkaServerWithDependencies.startupKafkaServer() eventually { - manager.getProcessStates(name).futureValue.value.map(_.status) shouldBe List(SimpleStateStatus.Running) + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( + SimpleStateStatus.Running + ) } } diff --git a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerTest.scala b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerTest.scala index aaae53dec82..0d56ec5017a 100644 --- a/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerTest.scala +++ b/engine/lite/embeddedDeploymentManager/src/test/scala/pl/touk/nussknacker/streaming/embedded/StreamingEmbeddedDeploymentManagerTest.scala @@ -57,7 +57,9 @@ class StreamingEmbeddedDeploymentManagerTest } eventually { - manager.getProcessStates(name).futureValue.value.map(_.status) shouldBe List(SimpleStateStatus.Running) + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( + SimpleStateStatus.Running + ) } val input = obj("productId" -> fromInt(10)) @@ -67,7 +69,7 @@ class StreamingEmbeddedDeploymentManagerTest wrapInFailingLoader { manager.processCommand(DMCancelScenarioCommand(name, User("a", "b"))).futureValue } - manager.getProcessStates(name).futureValue.value shouldBe List.empty + manager.getScenarioDeploymentsStatuses(name).futureValue.value shouldBe List.empty } test("Run persisted scenario deployments") { @@ -98,7 +100,9 @@ class StreamingEmbeddedDeploymentManagerTest val FixtureParam(manager, _, _, _) = prepareFixture(inputTopic, outputTopic, List(deployedScenarioData)) eventually { - manager.getProcessStates(name).futureValue.value.map(_.status) shouldBe List(SimpleStateStatus.Running) + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( + SimpleStateStatus.Running + ) } val input = obj("productId" -> fromInt(10)) @@ -107,7 +111,7 @@ class StreamingEmbeddedDeploymentManagerTest kafkaClient.createConsumer().consumeWithJson[Json](outputTopic.name).take(1).head.message() shouldBe input manager.processCommand(DMCancelScenarioCommand(name, User("a", "b"))).futureValue - manager.getProcessStates(name).futureValue.value shouldBe List.empty + manager.getScenarioDeploymentsStatuses(name).futureValue.value shouldBe List.empty } test("Run persisted scenario deployment with scenario json incompatible with current component API") { @@ -141,7 +145,7 @@ class StreamingEmbeddedDeploymentManagerTest ) val FixtureParam(manager, _, _, _) = prepareFixture(inputTopic, outputTopic, List(deployedScenarioData)) - manager.getProcessStates(name).futureValue.value.map(_.status) should matchPattern { + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) should matchPattern { case ProblemStateStatus("Scenario compilation errors", _) :: Nil => } } @@ -228,7 +232,9 @@ class StreamingEmbeddedDeploymentManagerTest fixture.deployScenario(scenarioForOutput("next")) eventually { - manager.getProcessStates(name).futureValue.value.map(_.status) shouldBe List(SimpleStateStatus.Running) + manager.getScenarioDeploymentsStatuses(name).futureValue.value.map(_.status) shouldBe List( + SimpleStateStatus.Running + ) } kafkaClient.sendMessage(inputTopic.name, message("2")).futureValue @@ -239,7 +245,7 @@ class StreamingEmbeddedDeploymentManagerTest manager.processCommand(DMCancelScenarioCommand(name, User("a", "b"))).futureValue - manager.getProcessStates(name).futureValue.value shouldBe List.empty + manager.getScenarioDeploymentsStatuses(name).futureValue.value shouldBe List.empty } test("Performs test from file") { diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala index 6ce5b511355..c4c27566b3d 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala @@ -315,15 +315,15 @@ class K8sDeploymentManager( } } - override def getProcessStates( - name: ProcessName + override def getScenarioDeploymentsStatuses( + scenarioName: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { val mapper = new K8sDeploymentStatusMapper(processStateDefinitionManager) for { deployments <- scenarioStateK8sClient - .listSelected[ListResource[Deployment]](requirementForName(name)) + .listSelected[ListResource[Deployment]](requirementForName(scenarioName)) .map(_.items) - pods <- scenarioStateK8sClient.listSelected[ListResource[Pod]](requirementForName(name)).map(_.items) + pods <- scenarioStateK8sClient.listSelected[ListResource[Pod]](requirementForName(scenarioName)).map(_.items) } yield { WithDataFreshnessStatus.fresh(deployments.map(mapper.status(_, pods))) } diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentStatusMapper.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentStatusMapper.scala index ebbd1c9c926..7a06c6f184a 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentStatusMapper.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentStatusMapper.scala @@ -4,23 +4,11 @@ import com.typesafe.scalalogging.LazyLogging import io.circe.Json import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment.{ - ProcessState, - ProcessStateDefinitionManager, - StateStatus, - StatusDetails -} +import pl.touk.nussknacker.engine.api.deployment.{ProcessStateDefinitionManager, StateStatus, StatusDetails} import pl.touk.nussknacker.k8s.manager.K8sDeploymentManager.parseVersionAnnotation -import pl.touk.nussknacker.k8s.manager.K8sDeploymentStatusMapper.{ - availableCondition, - crashLoopBackOffReason, - newReplicaSetAvailable, - progressingCondition, - replicaFailureCondition, - trueConditionStatus -} -import skuber.{Container, Pod} +import pl.touk.nussknacker.k8s.manager.K8sDeploymentStatusMapper._ import skuber.apps.v1.Deployment +import skuber.{Container, Pod} object K8sDeploymentStatusMapper { diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala index c244db2fc50..4487be9fe73 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/BaseK8sDeploymentManagerTest.scala @@ -145,14 +145,14 @@ class BaseK8sDeploymentManagerTest } finally { manager.processCommand(DMCancelScenarioCommand(version.processName, DeploymentData.systemUser)).futureValue eventually { - manager.getProcessStates(version.processName).futureValue.value shouldBe List.empty + manager.getScenarioDeploymentsStatuses(version.processName).futureValue.value shouldBe List.empty } } } def waitForRunning(version: ProcessVersion): Assertion = { eventually { - val state = manager.getProcessStates(version.processName).map(_.value).futureValue + val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue logger.debug(s"Current process state: $state") state.flatMap(_.version) shouldBe List(version) state.map(_.status) shouldBe List(SimpleStateStatus.Running) diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerKafkaTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerKafkaTest.scala index 344ae6e5850..6f20c09f4ce 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerKafkaTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerKafkaTest.scala @@ -102,7 +102,7 @@ class K8sDeploymentManagerKafkaTest def waitForRunning(version: ProcessVersion) = { eventually { - val state = manager.getProcessStates(version.processName).map(_.value).futureValue + val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue state.flatMap(_.version) shouldBe List(version) state.map(_.status) shouldBe List(SimpleStateStatus.Running) } @@ -391,7 +391,7 @@ class K8sDeploymentManagerKafkaTest private def cancelAndAssertCleanup(manager: K8sDeploymentManager, version: ProcessVersion) = { manager.processCommand(DMCancelScenarioCommand(version.processName, DeploymentData.systemUser)).futureValue eventually { - manager.getProcessStates(version.processName).map(_.value).futureValue shouldBe List.empty + manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue shouldBe List.empty } assertNoGarbageLeft() } diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala index b1f121d6fd2..60ff1d54d89 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerOnMocksTest.scala @@ -87,13 +87,13 @@ class K8sDeploymentManagerOnMocksTest stubWithFixedDelay(durationLongerThanClientTimeout) a[TcpIdleTimeoutException] shouldBe thrownBy { manager - .getProcessStates(ProcessName("foo")) + .getScenarioDeploymentsStatuses(ProcessName("foo")) .futureValueEnsuringInnerException(durationLongerThanClientTimeout) } stubWithFixedDelay(0 seconds) val result = manager - .getProcessStates(ProcessName("foo")) + .getScenarioDeploymentsStatuses(ProcessName("foo")) .map(_.value) .futureValueEnsuringInnerException(durationLongerThanClientTimeout) result shouldEqual List.empty diff --git a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerReqRespTest.scala b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerReqRespTest.scala index fcbf34b6896..de408912e74 100644 --- a/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerReqRespTest.scala +++ b/engine/lite/k8sDeploymentManager/src/test/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManagerReqRespTest.scala @@ -179,7 +179,7 @@ class K8sDeploymentManagerReqRespTest ) .futureValue eventually { - val state = f.manager.getProcessStates(secondVersionInfo.processName).map(_.value).futureValue + val state = f.manager.getScenarioDeploymentsStatuses(secondVersionInfo.processName).map(_.value).futureValue state.flatMap(_.version).map(_.versionId.value) shouldBe List(secondVersion) state.map(_.status) shouldBe List(SimpleStateStatus.Running) } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala new file mode 100644 index 00000000000..2dc88d4cbb9 --- /dev/null +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala @@ -0,0 +1,20 @@ +package pl.touk.nussknacker.engine.api.deployment + +import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName + +object StateStatus { + type StatusName = String + + // Temporary methods to simplify status creation + def apply(statusName: StatusName): StateStatus = NoAttributesStateStatus(statusName) + +} + +trait StateStatus { + // Status identifier, should be unique among all states registered within all processing types. + def name: StatusName +} + +case class NoAttributesStateStatus(name: StatusName) extends StateStatus { + override def toString: String = name +}