diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index ac96afec600..3bcef90e187 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -47,7 +47,7 @@ sealed trait StateQueryForAllScenariosSupport trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport { - def getAllProcessesStates()( + def getAllDeploymentStatuses()( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala index 974d38421a1..272292fce1a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StatusDetails.scala @@ -12,6 +12,11 @@ case class StatusDetails( startTime: Option[Long] = None, errors: List[String] = List.empty ) { + + def deploymentIdUnsafe: DeploymentId = + deploymentId.getOrElse(throw new IllegalStateException(s"deploymentId is missing")) + def externalDeploymentIdUnsafe: ExternalDeploymentId = externalDeploymentId.getOrElse(throw new IllegalStateException(s"externalDeploymentId is missing")) + } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala deleted file mode 100644 index 7b4cbd19eae..00000000000 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala +++ /dev/null @@ -1,155 +0,0 @@ -package pl.touk.nussknacker.engine.api.deployment.inconsistency - -import com.typesafe.scalalogging.LazyLogging -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessActionState, ScenarioActionName, StatusDetails} -import pl.touk.nussknacker.engine.deployment.DeploymentId - -// FIXME abr: move to core -object InconsistentStateDetector extends InconsistentStateDetector - -class InconsistentStateDetector extends LazyLogging { - - def resolve(statusDetails: List[StatusDetails], lastStateAction: Option[ProcessAction]): StatusDetails = { - val status = (doExtractAtMostOneStatus(statusDetails), lastStateAction) match { - case (Left(state), _) => state - case (Right(Some(state)), _) if shouldAlwaysReturnStatus(state) => state - case (Right(Some(state)), _) if state.status == SimpleStateStatus.Restarting => - handleRestartingState(state, lastStateAction) - case (Right(statusDetailsOpt), Some(action)) - if action.actionName == ScenarioActionName.Deploy && action.state == ProcessActionState.ExecutionFinished => - handleLastActionFinishedDeploy(statusDetailsOpt, action) - case (Right(statusDetailsOpt), Some(action)) if action.actionName == ScenarioActionName.Deploy => - handleLastActionDeploy(statusDetailsOpt, action) - case (Right(Some(state)), _) if isFollowingDeployStatus(state) => - handleFollowingDeployState(state, lastStateAction) - case (Right(statusDetailsOpt), Some(action)) if action.actionName == ScenarioActionName.Cancel => - handleCanceledState(statusDetailsOpt) - case (Right(Some(state)), _) => handleState(state, lastStateAction) - case (Right(None), Some(a)) => StatusDetails(SimpleStateStatus.NotDeployed, Some(DeploymentId.fromActionId(a.id))) - case (Right(None), None) => StatusDetails(SimpleStateStatus.NotDeployed, None) - } - logger.debug(s"Resolved $statusDetails , lastStateAction: $lastStateAction to status $status") - status - } - - // TODO: This method is exposed to make transition between Option[StatusDetails] and List[StatusDetails] easier to perform. - // After full migration to List[StatusDetails], this method should be removed - def extractAtMostOneStatus(statusDetails: List[StatusDetails]): Option[StatusDetails] = - doExtractAtMostOneStatus(statusDetails).fold(Some(_), identity) - - private def doExtractAtMostOneStatus( - statusDetails: List[StatusDetails] - ): Either[StatusDetails, Option[StatusDetails]] = { - val notFinalStatuses = statusDetails.filterNot(isFinalOrTransitioningToFinalStatus) - (statusDetails, notFinalStatuses) match { - case (Nil, Nil) => Right(None) - case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished)) - case (_, firstNotFinished :: _ :: _) => - Left( - firstNotFinished.copy( - status = ProblemStateStatus.MultipleJobsRunning, - errors = List(s"Expected one job, instead: ${notFinalStatuses - .map(details => details.externalDeploymentId.map(_.value).getOrElse("missing") + " - " + details.status) - .mkString(", ")}") - ) - ) - case (firstFinished :: _, Nil) => Right(Some(firstFinished)) - } - } - - private def handleState(statusDetails: StatusDetails, lastStateAction: Option[ProcessAction]): StatusDetails = - statusDetails.status match { - case SimpleStateStatus.Restarting | SimpleStateStatus.DuringCancel | SimpleStateStatus.Finished - if lastStateAction.isEmpty => - statusDetails.copy(status = ProblemStateStatus.ProcessWithoutAction) - case _ => statusDetails - } - - // This method handles some corner cases for canceled process -> with last action = Canceled - private def handleCanceledState(statusDetailsOpt: Option[StatusDetails]): StatusDetails = - statusDetailsOpt - // Missing deployment is fine for cancelled action as well because of retention of states - .getOrElse(StatusDetails(SimpleStateStatus.Canceled, None)) - - private def handleRestartingState( - statusDetails: StatusDetails, - lastStateAction: Option[ProcessAction] - ): StatusDetails = - lastStateAction match { - case Some(action) if action.actionName == ScenarioActionName.Deploy => statusDetails - case _ => handleState(statusDetails, lastStateAction) - } - - // This method handles some corner cases for following deploy state mismatch last action version - private def handleFollowingDeployState( - statusDetails: StatusDetails, - lastStateAction: Option[ProcessAction] - ): StatusDetails = - lastStateAction match { - case Some(action) if action.actionName != ScenarioActionName.Deploy => - statusDetails.copy(status = ProblemStateStatus.shouldNotBeRunning(true)) - case Some(_) => - statusDetails - case None => - statusDetails.copy(status = ProblemStateStatus.shouldNotBeRunning(false)) - } - - private def handleLastActionFinishedDeploy( - statusDetailsOpt: Option[StatusDetails], - action: ProcessAction - ): StatusDetails = - statusDetailsOpt match { - case Some(state) => - state - case None => - // Some engines like Flink have jobs retention. Because of that we restore finished state. See FlinkDeploymentManager.postprocess - StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId.fromActionId(action.id))) - } - - // This method handles some corner cases for deployed action mismatch state version - private def handleLastActionDeploy(statusDetailsOpt: Option[StatusDetails], action: ProcessAction): StatusDetails = - statusDetailsOpt match { - case Some(state) => - state.version match { - case _ if !isFollowingDeployStatus(state) && !isFinishedStatus(state) => - logger.debug( - s"handleLastActionDeploy: is not following deploy status nor finished, but it should be. $state" - ) - state.copy(status = ProblemStateStatus.shouldBeRunning(action.processVersionId, action.user)) - case Some(ver) if ver.versionId != action.processVersionId => - state.copy(status = - ProblemStateStatus.mismatchDeployedVersion(ver.versionId, action.processVersionId, action.user) - ) - case Some(ver) if ver.versionId == action.processVersionId => - state - case None => // TODO: we should remove Option from ProcessVersion? - state.copy(status = ProblemStateStatus.missingDeployedVersion(action.processVersionId, action.user)) - case _ => - state.copy(status = ProblemStateStatus.Failed) // Generic error in other cases - } - case None => - logger.debug( - s"handleLastActionDeploy for empty statusDetails. Action.processVersionId: ${action.processVersionId}" - ) - StatusDetails(ProblemStateStatus.shouldBeRunning(action.processVersionId, action.user), None) - } - - // Methods below are protected in case of other state machine implementation for a given DeploymentManager - protected def shouldAlwaysReturnStatus(state: StatusDetails): Boolean = { - ProblemStateStatus.isProblemStatus(state.status) - } - - protected def isFollowingDeployStatus(state: StatusDetails): Boolean = { - SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status) - } - - protected def isFinalOrTransitioningToFinalStatus(state: StatusDetails): Boolean = - SimpleStateStatus.isFinalOrTransitioningToFinalStatus(state.status) - - protected def isFinishedStatus(state: StatusDetails): Boolean = { - state.status == SimpleStateStatus.Finished - } - -} diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala index 60258356c92..5093a5ce12c 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala @@ -7,7 +7,6 @@ import pl.touk.nussknacker.engine.api.process.VersionId import java.net.URI -// FIXME abr separate core statuses and DM statuses - the same for presentation object SimpleStateStatus { def fromDeploymentStatus(deploymentStatus: DeploymentStatus): StateStatus = { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala index 164604f1442..62d066ca7b5 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ManagementResources.scala @@ -76,7 +76,6 @@ class ManagementResources( dispatcher: DeploymentManagerDispatcher, metricRegistry: MetricRegistry, scenarioTestServices: ProcessingTypeDataProvider[ScenarioTestService, _], - typeToConfig: ProcessingTypeDataProvider[ModelData, _] )(implicit val ec: ExecutionContext) extends Directives with LazyLogging diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala index 6368523e2b7..86d57ade244 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/ProcessesResources.scala @@ -22,7 +22,7 @@ import pl.touk.nussknacker.ui.process.ProcessService.{ } import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._ import pl.touk.nussknacker.ui.process._ -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.util._ diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessService.scala index 1b7dbe04f55..ffe282ede66 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessService.scala @@ -25,7 +25,7 @@ import pl.touk.nussknacker.ui.api.ProcessesResources.ProcessUnmarshallingError import pl.touk.nussknacker.ui.api.ScenarioStatusPresenter import pl.touk.nussknacker.ui.process.ProcessService._ import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._ -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.process.exception.{ProcessIllegalAction, ProcessValidationError} import pl.touk.nussknacker.ui.process.label.ScenarioLabel import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala index 89229c9ebde..5b8ab696bc4 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ActionService.scala @@ -6,7 +6,6 @@ import pl.touk.nussknacker.engine.api.Comment import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo import pl.touk.nussknacker.engine.api.process._ -import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser} @@ -14,6 +13,7 @@ import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository.ProcessNotFoundError import pl.touk.nussknacker.ui.process.repository._ +import pl.touk.nussknacker.ui.process.scenariostatus.{ScenarioStatusProvider, ScenarioStatusWithAllowedActions} import pl.touk.nussknacker.ui.security.api.{AdminUser, LoggedUser, NussknackerInternalUser} import slick.dbio.DBIOAction @@ -229,11 +229,11 @@ class ActionService( private def checkIfCanPerformActionInState( actionName: ScenarioActionName, processDetails: ScenarioWithDetailsEntity[LatestScenarioDetailsShape], - statusWithAllowedActions: StatusWithAllowedActions + statusWithAllowedActions: ScenarioStatusWithAllowedActions ): Unit = { if (!statusWithAllowedActions.allowedActions.contains(actionName)) { logger.debug( - s"Action: $actionName on process: ${processDetails.name} not allowed in ${statusWithAllowedActions.status} state" + s"Action: $actionName on process: ${processDetails.name} not allowed in ${statusWithAllowedActions.scenarioStatus} state" ) throw ProcessIllegalAction(actionName, processDetails.name, statusWithAllowedActions) } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala index bb6306a06da..cf30df470ea 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/exception/ProcessIllegalAction.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.ui.process.exception import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, StateStatus} import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.ui.IllegalOperationError -import pl.touk.nussknacker.ui.process.deployment.StatusWithAllowedActions +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusWithAllowedActions final case class ProcessIllegalAction(message: String) extends IllegalOperationError(message, details = "") @@ -12,10 +12,10 @@ object ProcessIllegalAction { def apply( actionName: ScenarioActionName, processName: ProcessName, - statusWithAllowedActions: StatusWithAllowedActions + ScenarioStatusWithAllowedActions: ScenarioStatusWithAllowedActions ): ProcessIllegalAction = ProcessIllegalAction( - s"Action: $actionName is not allowed in scenario ($processName) state: ${statusWithAllowedActions.status}, allowed actions: ${statusWithAllowedActions.allowedActions + s"Action: $actionName is not allowed in scenario ($processName) state: ${ScenarioStatusWithAllowedActions.scenarioStatus}, allowed actions: ${ScenarioStatusWithAllowedActions.allowedActions .map(_.value) .mkString(",")}." ) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala index ef18474d558..a4d0ae57476 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala @@ -572,11 +572,11 @@ class PeriodicProcessService( case supported: StateQueryForAllScenariosSupported => new StateQueryForAllScenariosSupported { - override def getAllProcessesStates()( + override def getAllDeploymentStatuses()( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { for { - allStatusDetailsInDelegate <- supported.getAllProcessesStates() + allStatusDetailsInDelegate <- supported.getAllDeploymentStatuses() allStatusDetailsInPeriodic <- mergeStatusWithDeployments(allStatusDetailsInDelegate.value) result = allStatusDetailsInPeriodic.map { case (name, status) => (name, List(status)) } } yield allStatusDetailsInDelegate.map(_ => result) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetector.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetector.scala new file mode 100644 index 00000000000..31f5c1786c9 --- /dev/null +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetector.scala @@ -0,0 +1,158 @@ +package pl.touk.nussknacker.ui.process.scenariostatus + +import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus +import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessActionState, ScenarioActionName, StatusDetails} +import pl.touk.nussknacker.engine.deployment.DeploymentId + +object InconsistentStateDetector extends InconsistentStateDetector + +class InconsistentStateDetector extends LazyLogging { + + def resolve(deploymentStatuses: List[StatusDetails], lastStateAction: Option[ProcessAction]): StatusDetails = { + val status = (doExtractAtMostOneStatus(deploymentStatuses), lastStateAction) match { + case (Left(deploymentStatus), _) => deploymentStatus + case (Right(Some(deploymentStatus)), _) if shouldAlwaysReturnStatus(deploymentStatus) => deploymentStatus + case (Right(Some(deploymentStatus)), _) if deploymentStatus.status == SimpleStateStatus.Restarting => + handleRestartingState(deploymentStatus, lastStateAction) + case (Right(deploymentStatusOpt), Some(action)) + if action.actionName == ScenarioActionName.Deploy && action.state == ProcessActionState.ExecutionFinished => + handleLastActionFinishedDeploy(deploymentStatusOpt, action) + case (Right(deploymentStatusOpt), Some(action)) if action.actionName == ScenarioActionName.Deploy => + handleLastActionDeploy(deploymentStatusOpt, action) + case (Right(Some(deploymentStatus)), _) if isFollowingDeployStatus(deploymentStatus) => + handleFollowingDeployState(deploymentStatus, lastStateAction) + case (Right(deploymentStatusOpt), Some(action)) if action.actionName == ScenarioActionName.Cancel => + handleCanceledState(deploymentStatusOpt) + case (Right(Some(deploymentStatus)), _) => handleSingleDeploymentStatus(deploymentStatus, lastStateAction) + case (Right(None), Some(a)) => StatusDetails(SimpleStateStatus.NotDeployed, Some(DeploymentId.fromActionId(a.id))) + case (Right(None), None) => StatusDetails(SimpleStateStatus.NotDeployed, None) + } + logger.debug(s"Resolved $deploymentStatuses , lastStateAction: $lastStateAction to status $status") + status + } + + // TODO: This method is exposed to make transition between Option[StatusDetails] and List[StatusDetails] easier to perform. + // After full migration to List[StatusDetails], this method should be removed + def extractAtMostOneStatus(deploymentStatuses: List[StatusDetails]): Option[StatusDetails] = + doExtractAtMostOneStatus(deploymentStatuses).fold(Some(_), identity) + + private def doExtractAtMostOneStatus( + deploymentStatuses: List[StatusDetails] + ): Either[StatusDetails, Option[StatusDetails]] = { + val notFinalStatuses = deploymentStatuses.filterNot(isFinalOrTransitioningToFinalStatus) + (deploymentStatuses, notFinalStatuses) match { + case (Nil, Nil) => Right(None) + case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished)) + case (_, firstNotFinished :: _ :: _) => + Left( + firstNotFinished.copy( + status = ProblemStateStatus.MultipleJobsRunning, + errors = List(s"Expected one job, instead: ${notFinalStatuses + .map(details => details.deploymentId.map(_.value).getOrElse("missing") + " - " + details.status) + .mkString(", ")}") + ) + ) + case (firstFinished :: _, Nil) => Right(Some(firstFinished)) + } + } + + private def handleSingleDeploymentStatus( + deploymentStatus: StatusDetails, + lastStateAction: Option[ProcessAction] + ): StatusDetails = + deploymentStatus.status match { + case SimpleStateStatus.Restarting | SimpleStateStatus.DuringCancel | SimpleStateStatus.Finished + if lastStateAction.isEmpty => + deploymentStatus.copy(status = ProblemStateStatus.ProcessWithoutAction) + case _ => deploymentStatus + } + + // This method handles some corner cases for canceled process -> with last action = Canceled + private def handleCanceledState(deploymentStatusOpt: Option[StatusDetails]): StatusDetails = + deploymentStatusOpt + // Missing deployment is fine for cancelled action as well because of retention of states + .getOrElse(StatusDetails(SimpleStateStatus.Canceled, None)) + + private def handleRestartingState( + deploymentStatus: StatusDetails, + lastStateAction: Option[ProcessAction] + ): StatusDetails = + lastStateAction match { + case Some(action) if action.actionName == ScenarioActionName.Deploy => deploymentStatus + case _ => handleSingleDeploymentStatus(deploymentStatus, lastStateAction) + } + + // This method handles some corner cases for following deploy status mismatch last action version + private def handleFollowingDeployState( + deploymentStatus: StatusDetails, + lastStateAction: Option[ProcessAction] + ): StatusDetails = + lastStateAction match { + case Some(action) if action.actionName != ScenarioActionName.Deploy => + deploymentStatus.copy(status = ProblemStateStatus.shouldNotBeRunning(true)) + case Some(_) => + deploymentStatus + case None => + deploymentStatus.copy(status = ProblemStateStatus.shouldNotBeRunning(false)) + } + + private def handleLastActionFinishedDeploy( + deploymentStatusOpt: Option[StatusDetails], + action: ProcessAction + ): StatusDetails = + deploymentStatusOpt match { + case Some(deploymentStatus) => + deploymentStatus + case None => + // Some engines like Flink have jobs retention. Because of that we restore finished status. See FlinkDeploymentManager.postprocess + StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId.fromActionId(action.id))) + } + + // This method handles some corner cases for deployed action mismatch version + private def handleLastActionDeploy(deploymentStatusOpt: Option[StatusDetails], action: ProcessAction): StatusDetails = + deploymentStatusOpt match { + case Some(deploymentStatuses) => + deploymentStatuses.version match { + case _ if !isFollowingDeployStatus(deploymentStatuses) && !isFinishedStatus(deploymentStatuses) => + logger.debug( + s"handleLastActionDeploy: is not following deploy status nor finished, but it should be. $deploymentStatuses" + ) + deploymentStatuses.copy(status = ProblemStateStatus.shouldBeRunning(action.processVersionId, action.user)) + case Some(ver) if ver.versionId != action.processVersionId => + deploymentStatuses.copy(status = + ProblemStateStatus.mismatchDeployedVersion(ver.versionId, action.processVersionId, action.user) + ) + case Some(ver) if ver.versionId == action.processVersionId => + deploymentStatuses + case None => // TODO: we should remove Option from ProcessVersion? + deploymentStatuses.copy(status = + ProblemStateStatus.missingDeployedVersion(action.processVersionId, action.user) + ) + case _ => + deploymentStatuses.copy(status = ProblemStateStatus.Failed) // Generic error in other cases + } + case None => + logger.debug( + s"handleLastActionDeploy for empty deploymentStatus. Action.processVersionId: ${action.processVersionId}" + ) + StatusDetails(ProblemStateStatus.shouldBeRunning(action.processVersionId, action.user), None) + } + + private def shouldAlwaysReturnStatus(deploymentStatus: StatusDetails): Boolean = { + ProblemStateStatus.isProblemStatus(deploymentStatus.status) + } + + private def isFollowingDeployStatus(deploymentStatus: StatusDetails): Boolean = { + SimpleStateStatus.DefaultFollowingDeployStatuses.contains(deploymentStatus.status) + } + + private def isFinalOrTransitioningToFinalStatus(deploymentStatus: StatusDetails): Boolean = + SimpleStateStatus.isFinalOrTransitioningToFinalStatus(deploymentStatus.status) + + private def isFinishedStatus(deploymentStatus: StatusDetails): Boolean = { + deploymentStatus.status == SimpleStateStatus.Finished + } + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStatusProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/ScenarioStatusProvider.scala similarity index 86% rename from designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStatusProvider.scala rename to designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/ScenarioStatusProvider.scala index 8369918892d..38aa52ac3a6 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStatusProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/scenariostatus/ScenarioStatusProvider.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.ui.process.deployment +package pl.touk.nussknacker.ui.process.scenariostatus import akka.actor.ActorSystem import cats.Traverse @@ -9,7 +9,6 @@ import db.util.DBIOActionInstances._ import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.ScenarioStatusWithScenarioContext import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus.FailedToGet @@ -17,9 +16,10 @@ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.deployment.DeploymentId import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps import pl.touk.nussknacker.ui.BadRequestError -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider.FragmentStateException +import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository.ProcessNotFoundError import pl.touk.nussknacker.ui.process.repository._ +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider.FragmentStateException import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.util.FutureUtils._ import slick.dbio.{DBIO, DBIOAction} @@ -43,12 +43,12 @@ trait ScenarioStatusProvider { def getAllowedActionsForScenarioStatus( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[StatusWithAllowedActions] + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusWithAllowedActions] def getAllowedActionsForScenarioStatusDBIO(processDetails: ScenarioWithDetailsEntity[_])( implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy - ): DB[StatusWithAllowedActions] + ): DB[ScenarioStatusWithAllowedActions] } @@ -104,20 +104,20 @@ private class ScenarioStatusProviderImpl( val scenarios = processTraverse.toList dbioRunner.run( for { - actionsInProgress <- getInProgressActionTypesForScenarios(scenarios) - prefetchedStates <- DBIO.from(getPrefetchedStatesForSupportedManagers(scenarios)) - statusesDetails <- processTraverse + actionsInProgress <- getInProgressActionTypesForScenarios(scenarios) + prefetchedDeploymentStatuses <- DBIO.from(getPrefetchedDeploymentStatusesForSupportedManagers(scenarios)) + finalDeploymentStatuses <- processTraverse .map { case process if process.isFragment => DBIO.successful(Option.empty[StatusDetails]) case process => - val prefetchedState = for { - prefetchedStatesForProcessingType <- prefetchedStates.get(process.processingType) - // State is prefetched for all scenarios for the given processing type. + val prefetchedDeploymentStatusesFroScenario = for { + prefetchedStatusesForProcessingType <- prefetchedDeploymentStatuses.get(process.processingType) + // Deployment statuses are prefetched for all scenarios for the given processing type. // If there is no information available for a specific scenario name, // then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails]. - prefetchedState = prefetchedStatesForProcessingType.getOrElse(process.name, List.empty) - } yield prefetchedState - (prefetchedState match { + prefetchedStatusesForScenario = prefetchedStatusesForProcessingType.getOrElse(process.name, List.empty) + } yield prefetchedStatusesForScenario + (prefetchedDeploymentStatusesFroScenario match { case Some(prefetchedStatusDetails) => getProcessStateUsingPrefetchedStatus( process, @@ -132,19 +132,19 @@ private class ScenarioStatusProviderImpl( }).map(Some(_)) } .sequence[DB, Option[StatusDetails]] - } yield statusesDetails + } yield finalDeploymentStatuses ) } override def getAllowedActionsForScenarioStatus( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[StatusWithAllowedActions] = { + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ScenarioStatusWithAllowedActions] = { dbioRunner.run(getAllowedActionsForScenarioStatusDBIO(processDetails)) } override def getAllowedActionsForScenarioStatusDBIO( processDetails: ScenarioWithDetailsEntity[_] - )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[StatusWithAllowedActions] = { + )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ScenarioStatusWithAllowedActions] = { for { inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) statusDetails <- getProcessStateFetchingStatusFromManager( @@ -152,7 +152,7 @@ private class ScenarioStatusProviderImpl( inProgressActionNames ) allowedActions = getAllowedActions(statusDetails, processDetails, None) - } yield StatusWithAllowedActions(statusDetails, allowedActions) + } yield ScenarioStatusWithAllowedActions(statusDetails, allowedActions) } private def getAllowedActions( @@ -181,7 +181,7 @@ private class ScenarioStatusProviderImpl( processDetails, inProgressActionNames, manager => - getStateFromDeploymentManager( + getDeploymentStatusesFromDeploymentManager( manager, processDetails.idWithName, processDetails.lastStateAction, @@ -193,7 +193,7 @@ private class ScenarioStatusProviderImpl( // State is prefetched only when: // - DM has capability StateQueryForAllScenariosSupported // - the query is about more than one scenario handled by that DM - private def getPrefetchedStatesForSupportedManagers( + private def getPrefetchedDeploymentStatusesForSupportedManagers( scenarios: List[ScenarioWithDetailsEntity[_]], )( implicit user: LoggedUser, @@ -215,18 +215,18 @@ private class ScenarioStatusProviderImpl( case supported: StateQueryForAllScenariosSupported => Some(supported) case NoStateQueryForAllScenariosSupport => None } - } yield getAllProcessesStates(processingType, managerWithCapability)) + } yield getAllDeploymentStatuses(processingType, managerWithCapability)) .getOrElse(Future.successful(None)) } } .map(_.flatten.toMap) } - private def getAllProcessesStates(processingType: ProcessingType, manager: StateQueryForAllScenariosSupported)( + private def getAllDeploymentStatuses(processingType: ProcessingType, manager: StateQueryForAllScenariosSupported)( implicit freshnessPolicy: DataFreshnessPolicy, ): Future[Option[(ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]])]] = { manager - .getAllProcessesStates() + .getAllDeploymentStatuses() .map(states => Some((processingType, states))) .recover { case NonFatal(e) => logger.warn( @@ -257,7 +257,7 @@ private class ScenarioStatusProviderImpl( private def getProcessStateUsingPrefetchedStatus( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], - prefetchedStatusDetails: WithDataFreshnessStatus[List[StatusDetails]], + prefetchedDeploymentStatuses: WithDataFreshnessStatus[List[StatusDetails]], )(implicit user: LoggedUser): DB[StatusDetails] = { getScenarioStatusDetails( processDetails, @@ -265,9 +265,9 @@ private class ScenarioStatusProviderImpl( { _ => // FIXME abr: handle finished, it has no sense for periodic but it shouldn't hurt us Future { - prefetchedStatusDetails.map { prefetchedStatusDetailsValue => + prefetchedDeploymentStatuses.map { prefetchedDeploymentStatusesValue => // FIXME abr: resolved states shouldn't be handled here - InconsistentStateDetector.resolve(prefetchedStatusDetailsValue, processDetails.lastStateAction) + InconsistentStateDetector.resolve(prefetchedDeploymentStatusesValue, processDetails.lastStateAction) } } } @@ -277,7 +277,7 @@ private class ScenarioStatusProviderImpl( private def getScenarioStatusDetails( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], - fetchState: DeploymentManager => Future[WithDataFreshnessStatus[StatusDetails]], + fetchDeploymentStatuses: DeploymentManager => Future[WithDataFreshnessStatus[StatusDetails]], )(implicit user: LoggedUser): DB[StatusDetails] = { dispatcher .deploymentManager(processDetails.processingType) @@ -290,7 +290,7 @@ private class ScenarioStatusProviderImpl( logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringDeploy}") DBIOAction.successful( StatusDetails(SimpleStateStatus.DuringDeploy, None) - ) // FIXME abr: deploymentId from inProgressActionNames + ) } else if (inProgressActionNames.contains(ScenarioActionName.Cancel)) { logger.debug(s"Status for: '${processDetails.name}' is: ${SimpleStateStatus.DuringCancel}") DBIOAction.successful(StatusDetails(SimpleStateStatus.DuringCancel, None)) @@ -298,7 +298,7 @@ private class ScenarioStatusProviderImpl( processDetails.lastStateAction match { case Some(_) => DBIOAction - .from(fetchState(manager)) + .from(fetchDeploymentStatuses(manager)) .map { statusWithFreshness => logger.debug( s"Status for: '${processDetails.name}' is: ${statusWithFreshness.value.status}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction @@ -339,7 +339,7 @@ private class ScenarioStatusProviderImpl( } } - private def getStateFromDeploymentManager( + private def getDeploymentStatusesFromDeploymentManager( deploymentManager: DeploymentManager, processIdWithName: ProcessIdWithName, lastStateAction: Option[ProcessAction] @@ -385,8 +385,11 @@ private class ScenarioStatusProviderImpl( } -final case class StatusWithAllowedActions(statusDetails: StatusDetails, allowedActions: Set[ScenarioActionName]) { +final case class ScenarioStatusWithAllowedActions( + scenarioStatusDetails: StatusDetails, + allowedActions: Set[ScenarioActionName] +) { - def status: StateStatus = statusDetails.status + def scenarioStatus: StateStatus = scenarioStatusDetails.status } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index 6a27252b3fc..e15b21ddc34 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -56,7 +56,6 @@ import pl.touk.nussknacker.ui.process.deployment.{ DeploymentService => LegacyDeploymentService, RepositoryBasedScenarioActivityManager, ScenarioResolver, - ScenarioStatusProvider, ScenarioTestExecutorServiceImpl } import pl.touk.nussknacker.ui.process.fragment.{DefaultFragmentRepository, FragmentResolver} @@ -76,6 +75,7 @@ import pl.touk.nussknacker.ui.process.repository._ import pl.touk.nussknacker.ui.process.repository.activities.{DbScenarioActivityRepository, ScenarioActivityRepository} import pl.touk.nussknacker.ui.process.repository.stickynotes.DbStickyNotesRepository import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.process.test.{PreliminaryScenarioTestDataSerDe, ScenarioTestService} import pl.touk.nussknacker.ui.process.version.{ScenarioGraphVersionRepository, ScenarioGraphVersionService} import pl.touk.nussknacker.ui.processreport.ProcessCounter @@ -255,7 +255,7 @@ class AkkaHttpBasedRouteProvider( processingTypeData.designerModelData.modelData.additionalConfigsFromProvider } - val scenarioStateProvider = + val scenarioStatusProvider = ScenarioStatusProvider( dmDispatcher, processRepository, @@ -269,7 +269,7 @@ class AkkaHttpBasedRouteProvider( actionRepository, dbioRunner, processChangeListener, - scenarioStateProvider, + scenarioStatusProvider, featureTogglesConfig.deploymentCommentSettings, modelInfos, designerClock @@ -313,7 +313,7 @@ class AkkaHttpBasedRouteProvider( val scenarioStatusPresenter = new ScenarioStatusPresenter(dmDispatcher) val processService = new DBProcessService( - scenarioStateProvider, + scenarioStatusProvider, scenarioStatusPresenter, newProcessPreparer, processingTypeDataProvider.mapCombined(_.parametersService), @@ -522,7 +522,7 @@ class AkkaHttpBasedRouteProvider( val routes = List( new ProcessesResources( processService = processService, - scenarioStatusProvider = scenarioStateProvider, + scenarioStatusProvider = scenarioStatusProvider, scenarioStatusPresenter = scenarioStatusPresenter, processToolbarService = configProcessToolbarService, processAuthorizer = processAuthorizer, @@ -542,7 +542,6 @@ class AkkaHttpBasedRouteProvider( dmDispatcher, metricsRegistry, scenarioTestService, - processingTypeDataProvider.mapValues(_.designerModelData.modelData) ), new ValidationResources(processService, processResolver), new DefinitionResources( diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala index eaf174a0820..3db6cbbe9ec 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/base/it/NuResourcesTest.scala @@ -44,9 +44,8 @@ import pl.touk.nussknacker.test.utils.domain.{ProcessTestData, TestFactory} import pl.touk.nussknacker.test.utils.scalas.AkkaHttpExtensions.toRequestEntity import pl.touk.nussknacker.ui.api.ManagementResources.RunDeploymentRequest import pl.touk.nussknacker.ui.api._ +import pl.touk.nussknacker.ui.config.{DesignerConfig, FeatureTogglesConfig} import pl.touk.nussknacker.ui.config.scenariotoolbar.CategoriesScenarioToolbarsConfigParser -import pl.touk.nussknacker.ui.config.FeatureTogglesConfig -import pl.touk.nussknacker.ui.config.DesignerConfig import pl.touk.nussknacker.ui.process.ProcessService.{CreateScenarioCommand, UpdateScenarioCommand} import pl.touk.nussknacker.ui.process._ import pl.touk.nussknacker.ui.process.deployment._ @@ -59,6 +58,7 @@ import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeData import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction import pl.touk.nussknacker.ui.process.repository._ import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.process.test.{PreliminaryScenarioTestDataSerDe, ScenarioTestService} import pl.touk.nussknacker.ui.processreport.ProcessCounter import pl.touk.nussknacker.ui.security.api.{LoggedUser, RealLoggedUser} @@ -269,7 +269,6 @@ trait NuResourcesTest dispatcher = dmDispatcher, metricRegistry = new MetricRegistry, scenarioTestServices = scenarioTestServiceByProcessingType, - typeToConfig = typeToConfig.mapValues(_.designerModelData.modelData) ) override def beforeEach(): Unit = { diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala index 0479f94f3f2..68f9124e002 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesSpec.scala @@ -319,7 +319,7 @@ class ManagementResourcesSpec status shouldBe StatusCodes.Conflict } getProcess(invalidScenario.name) ~> check { - decodeDetails.state.value.status shouldEqual SimpleStateStatus.NotDeployed + decodeDetails.state.value.statusName shouldEqual SimpleStateStatus.NotDeployed.name } } } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/definition/component/DefaultComponentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/definition/component/DefaultComponentServiceSpec.scala index 4a5423bd68b..5b6d13b6249 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/definition/component/DefaultComponentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/definition/component/DefaultComponentServiceSpec.scala @@ -42,7 +42,7 @@ import pl.touk.nussknacker.ui.definition.component.ComponentModelData._ import pl.touk.nussknacker.ui.definition.component.ComponentTestProcessData._ import pl.touk.nussknacker.ui.definition.component.DynamicComponentProvider._ import pl.touk.nussknacker.ui.process.DBProcessService -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.process.fragment.DefaultFragmentRepository import pl.touk.nussknacker.ui.process.processingtype.ProcessingTypeData.SchedulingForProcessingType import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLoader diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index 1d28b7e6668..8f827338fc9 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -41,6 +41,7 @@ import pl.touk.nussknacker.ui.process.repository.{ ScenarioWithDetailsEntity } import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.security.api.LoggedUser import pl.touk.nussknacker.ui.util.InMemoryTimeseriesRepository import pl.touk.nussknacker.ui.validation.UIProcessValidator @@ -294,7 +295,7 @@ class NotificationServiceTest config, clock ) - val scenarioStateProvider = ScenarioStatusProvider( + val scenarioStatusProvider = ScenarioStatusProvider( managerDispatcher, dbProcessRepository, actionRepository, @@ -307,7 +308,7 @@ class NotificationServiceTest actionRepository, dbioRunner, mock[ProcessChangeListener], - scenarioStateProvider, + scenarioStatusProvider, None, TestFactory.modelInfoProvider, clock diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/DBProcessServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/DBProcessServiceSpec.scala index c8701b86823..afb28c9d16f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/DBProcessServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/DBProcessServiceSpec.scala @@ -22,7 +22,7 @@ import pl.touk.nussknacker.ui.NuDesignerError import pl.touk.nussknacker.ui.NuDesignerError.XError import pl.touk.nussknacker.ui.api.ProcessesResources.ProcessUnmarshallingError import pl.touk.nussknacker.ui.api.ScenarioStatusPresenter -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala index 898804773f7..8f5a92bccb0 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentServiceSpec.scala @@ -29,14 +29,15 @@ import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, NuScalaTestAssertions, PatientScalaFutures} import pl.touk.nussknacker.ui.api.DeploymentCommentSettings import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionSuccess} -import pl.touk.nussknacker.ui.process.deployment.ScenarioStatusProvider.FragmentStateException +import pl.touk.nussknacker.ui.process.ScenarioQuery import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState} import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction import pl.touk.nussknacker.ui.process.repository.{CommentValidationError, DBIOActionRunner} -import pl.touk.nussknacker.ui.process.{ScenarioQuery, ScenarioWithDetailsConversions} +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider +import pl.touk.nussknacker.ui.process.scenariostatus.ScenarioStatusProvider.FragmentStateException import pl.touk.nussknacker.ui.security.api.LoggedUser import slick.dbio.DBIOAction @@ -92,7 +93,7 @@ class DeploymentServiceSpec private val listener = new TestProcessChangeListener - private val scenarioStateProvider = createScenarioStateProvider(scenarioStateTimeout = None) + private val scenarioStatusProvider = createScenarioStatusProvider(scenarioStateTimeout = None) private val actionService = createActionService(deploymentCommentSettings = None) @@ -127,14 +128,14 @@ class DeploymentServiceSpec actionRepository, dbioRunner, listener, - scenarioStateProvider, + scenarioStatusProvider, deploymentCommentSettings, modelInfoProvider, clock ) } - private def createScenarioStateProvider(scenarioStateTimeout: Option[FiniteDuration]) = + private def createScenarioStatusProvider(scenarioStateTimeout: Option[FiniteDuration]) = ScenarioStatusProvider(dmDispatcher, fetchingProcessRepository, actionRepository, dbioRunner, scenarioStateTimeout) // TODO: temporary step - we would like to extract the validation and the comment validation tests to external validators @@ -187,7 +188,7 @@ class DeploymentServiceSpec ) eventually { - val status = scenarioStateProvider + val status = scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status @@ -218,7 +219,7 @@ class DeploymentServiceSpec ) eventually { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Running @@ -238,7 +239,7 @@ class DeploymentServiceSpec .futureValue eventually { - val status = scenarioStateProvider + val status = scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status @@ -269,14 +270,14 @@ class DeploymentServiceSpec ) ) .futureValue - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.DuringDeploy } eventually { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Running @@ -290,7 +291,7 @@ class DeploymentServiceSpec deploymentManager.withWaitForCancelFinish { deploymentService.processCommand(CancelScenarioCommand(CommonCommandData(processId, None, user))) eventually { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.DuringCancel @@ -318,7 +319,7 @@ class DeploymentServiceSpec val (processId, deployActionId) = prepareDeployedProcess(processName).dbioActionValues checkIsFollowingDeploy( - scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue, + scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue, expected = true ) fetchingProcessRepository @@ -331,11 +332,11 @@ class DeploymentServiceSpec // we simulate what happens when retrieveStatus is called multiple times to check only one comment is added (1 to 5).foreach { _ => checkIsFollowingDeploy( - scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue, + scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue, expected = false ) } - val finishedStatus = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val finishedStatus = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue finishedStatus.status shouldBe SimpleStateStatus.Finished getAllowedActions(finishedStatus) shouldBe Set( ScenarioActionName.Deploy, @@ -355,12 +356,12 @@ class DeploymentServiceSpec deploymentManager.withEmptyProcessState(processName) { val stateAfterJobRetention = - scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue stateAfterJobRetention.status shouldBe SimpleStateStatus.Finished } archiveProcess(processId).dbioActionValues - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Finished @@ -376,7 +377,7 @@ class DeploymentServiceSpec .dbioActionValues .flatMap(_.lastStateAction) .map(_.actionName) shouldBe expectedAction - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe expectedStatus @@ -461,7 +462,7 @@ class DeploymentServiceSpec listener.events shouldBe Symbol("empty") // during short period of time, status will be during deploy - because parallelism validation are done in the same critical section as deployment eventually { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.NotDeployed @@ -474,7 +475,7 @@ class DeploymentServiceSpec val (processId, _) = prepareCanceledProcess(processName).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Canceled) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Canceled @@ -492,7 +493,7 @@ class DeploymentServiceSpec .lastStateAction should not be None deploymentManager.withEmptyProcessState(processName) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Canceled @@ -517,7 +518,7 @@ class DeploymentServiceSpec .lastStateAction should not be None deploymentManager.withEmptyProcessState(processName) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.Canceled @@ -536,7 +537,7 @@ class DeploymentServiceSpec val (processId, _) = prepareCanceledProcess(processName).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.shouldNotBeRunning(true) state.status shouldBe expectedStatus @@ -549,7 +550,7 @@ class DeploymentServiceSpec val processId = prepareProcess(processName).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed } } @@ -559,7 +560,7 @@ class DeploymentServiceSpec val (processId, _) = prepareCanceledProcess(processName).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.DuringCancel) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.DuringCancel } @@ -573,7 +574,7 @@ class DeploymentServiceSpec StatusDetails(SimpleStateStatus.Restarting, None, Some(ExternalDeploymentId("12")), Some(ProcessVersion.empty)) deploymentManager.withProcessStates(processName, List(state)) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.Restarting getAllowedActions(state) shouldBe Set(ScenarioActionName.Cancel) @@ -585,7 +586,7 @@ class DeploymentServiceSpec val (processId, _) = prepareDeployedProcess(processName).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Canceled) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin") state.status shouldBe expectedStatus @@ -598,7 +599,7 @@ class DeploymentServiceSpec val (processId, _) = prepareDeployedProcess(processName).dbioActionValues deploymentManager.withEmptyProcessState(processName) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin") state.status shouldBe expectedStatus @@ -621,7 +622,7 @@ class DeploymentServiceSpec ) deploymentManager.withProcessStateVersion(processName, SimpleStateStatus.Running, version) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.mismatchDeployedVersion(VersionId(2L), VersionId(1L), "admin") state.status shouldBe expectedStatus @@ -645,7 +646,7 @@ class DeploymentServiceSpec // FIXME: doesnt check recover from failed verifications ??? deploymentManager.withProcessStateVersion(processName, ProblemStateStatus.Failed, version) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe ProblemStateStatus.Failed getAllowedActions(state) shouldBe Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel) @@ -657,7 +658,7 @@ class DeploymentServiceSpec val (processId, _) = prepareDeployedProcess(processName).dbioActionValues deploymentManager.withProcessStateVersion(processName, SimpleStateStatus.Running, Option.empty) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.missingDeployedVersion(VersionId(1L), "admin") state.status shouldBe expectedStatus @@ -671,7 +672,7 @@ class DeploymentServiceSpec // FIXME: doesnt check recover from failed future of findJobStatus ??? deploymentManager.withProcessStateVersion(processName, ProblemStateStatus.FailedToGet, Option.empty) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.FailedToGet state.status shouldBe expectedStatus @@ -689,7 +690,7 @@ class DeploymentServiceSpec .lastStateAction shouldBe None deploymentManager.withEmptyProcessState(processName) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(ProcessIdWithName(processId.id, processName), Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.NotDeployed @@ -711,7 +712,7 @@ class DeploymentServiceSpec .lastStateAction shouldBe None deploymentManager.withEmptyProcessState(processName) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processId, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.NotDeployed @@ -733,7 +734,7 @@ class DeploymentServiceSpec .lastStateAction shouldBe None deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(ProcessIdWithName(processId.id, processName), Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.NotDeployed @@ -749,7 +750,7 @@ class DeploymentServiceSpec val processName: ProcessName = generateProcessName val (processId, _) = prepareArchivedProcess(processName, None).dbioActionValues - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed } @@ -760,7 +761,7 @@ class DeploymentServiceSpec val (processId, _) = prepareArchivedProcess(processName, None).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed } } @@ -769,7 +770,7 @@ class DeploymentServiceSpec val processName: ProcessName = generateProcessName val (processId, _) = prepareArchivedProcess(processName, Some(Cancel)).dbioActionValues - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.Canceled } @@ -778,7 +779,7 @@ class DeploymentServiceSpec val (processId, _) = prepareArchivedProcess(processName, Some(Cancel)).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.Canceled } } @@ -787,7 +788,7 @@ class DeploymentServiceSpec val processName: ProcessName = generateProcessName val (processId, _) = preparedUnArchivedProcess(processName, None).dbioActionValues - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed } @@ -798,7 +799,7 @@ class DeploymentServiceSpec .addInProgressAction(processId.id, ScenarioActionName.Deploy, Some(VersionId(1)), None) .dbioActionValues - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.DuringDeploy } @@ -809,7 +810,7 @@ class DeploymentServiceSpec .fetchLatestProcessesDetails[Unit](ScenarioQuery.empty) .dbioActionValues - val statesBasedOnCachedInProgressActionTypes = scenarioStateProvider + val statesBasedOnCachedInProgressActionTypes = scenarioStatusProvider .getScenariosStatuses(processesDetails) .futureValue .map(_.map(_.status.name)) @@ -826,7 +827,7 @@ class DeploymentServiceSpec .map(pd => Option(pd) .filterNot(_.isFragment) - .map(scenarioStateProvider.getAllowedActionsForScenarioStatus(_).map(_.status.name)) + .map(scenarioStatusProvider.getAllowedActionsForScenarioStatus(_).map(_.scenarioStatus.name)) .sequence ) .sequence @@ -842,7 +843,7 @@ class DeploymentServiceSpec val (processId, _) = prepareArchivedProcess(processName, None).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.NotDeployed } } @@ -851,7 +852,7 @@ class DeploymentServiceSpec val processName: ProcessName = generateProcessName val (processId, _) = prepareArchivedProcess(processName, Some(Deploy)).dbioActionValues - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe ProblemStateStatus.ArchivedShouldBeCanceled } @@ -860,7 +861,7 @@ class DeploymentServiceSpec val (processId, _) = prepareArchivedProcess(processName, Some(Cancel)).dbioActionValues deploymentManager.withEmptyProcessState(processName) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue state.status shouldBe SimpleStateStatus.Canceled } } @@ -870,7 +871,7 @@ class DeploymentServiceSpec val (processId, _) = preparedUnArchivedProcess(processName, Some(Cancel)).dbioActionValues deploymentManager.withProcessStateStatus(processName, SimpleStateStatus.Running) { - val state = scenarioStateProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue + val state = scenarioStatusProvider.getScenarioStatus(processId, Some(initialVersionId)).futureValue val expectedStatus = ProblemStateStatus.shouldNotBeRunning(true) state.status shouldBe expectedStatus getAllowedActions(state) shouldBe Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel) @@ -883,7 +884,7 @@ class DeploymentServiceSpec deploymentManager.withEmptyProcessState(processName) { val initialStatus = SimpleStateStatus.NotDeployed - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe initialStatus @@ -897,13 +898,13 @@ class DeploymentServiceSpec ) ) .futureValue - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe SimpleStateStatus.DuringDeploy actionService.invalidateInProgressActions() - scenarioStateProvider + scenarioStatusProvider .getScenarioStatus(processIdWithName, Some(initialVersionId)) .futureValue .status shouldBe initialStatus @@ -916,7 +917,7 @@ class DeploymentServiceSpec val (processId, _) = prepareDeployedProcess(processName).dbioActionValues val timeout = 1.second - val serviceWithTimeout = createScenarioStateProvider(Some(timeout)) + val serviceWithTimeout = createScenarioStatusProvider(Some(timeout)) val durationLongerThanTimeout = timeout.plus(patienceConfig.timeout) deploymentManager.withDelayBeforeStateReturn(durationLongerThanTimeout) { @@ -933,7 +934,7 @@ class DeploymentServiceSpec val id = prepareFragment(processName).dbioActionValues assertThrowsWithParent[FragmentStateException.type] { - scenarioStateProvider.getScenarioStatus(id, Some(initialVersionId)).futureValue + scenarioStatusProvider.getScenarioStatus(id, Some(initialVersionId)).futureValue } } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala index 2ca848596e6..fee68f9155c 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessesFetchingTest.scala @@ -114,7 +114,7 @@ class PeriodicProcessesFetchingTest val statuses = f.periodicProcessService.stateQueryForAllScenariosSupport .asInstanceOf[StateQueryForAllScenariosSupported] - .getAllProcessesStates() + .getAllDeploymentStatuses() .futureValue .value diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala index 421fad924d4..919dea5822f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala @@ -73,7 +73,7 @@ class DeploymentManagerStub extends BaseDeploymentManager { override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = new StateQueryForAllScenariosSupported { - override def getAllProcessesStates()( + override def getAllDeploymentStatuses()( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = Future.successful(WithDataFreshnessStatus.fresh(jobStatus.toMap)) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetectorTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetectorTest.scala new file mode 100644 index 00000000000..91c192d94a5 --- /dev/null +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/scenariostatus/InconsistentStateDetectorTest.scala @@ -0,0 +1,86 @@ +package pl.touk.nussknacker.ui.process.scenariostatus + +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.deployment.StatusDetails +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus +import pl.touk.nussknacker.engine.deployment.DeploymentId + +import java.util.UUID + +class InconsistentStateDetectorTest extends AnyFunSuiteLike with Matchers { + + test("return failed status if two deployments running") { + val firstDeploymentStatus = StatusDetails(SimpleStateStatus.Running, Some(DeploymentId(UUID.randomUUID().toString))) + val secondDeploymentStatus = + StatusDetails(SimpleStateStatus.Running, Some(DeploymentId(UUID.randomUUID().toString))) + + InconsistentStateDetector.extractAtMostOneStatus(List(firstDeploymentStatus, secondDeploymentStatus)) shouldBe Some( + StatusDetails( + ProblemStateStatus.MultipleJobsRunning, + firstDeploymentStatus.deploymentId, + errors = List( + s"Expected one job, instead: ${firstDeploymentStatus.deploymentIdUnsafe} - RUNNING, ${secondDeploymentStatus.deploymentIdUnsafe} - RUNNING" + ) + ) + ) + } + + test("return failed status if two in non-terminal state") { + val firstDeploymentStatus = StatusDetails(SimpleStateStatus.Running, Some(DeploymentId(UUID.randomUUID().toString))) + val secondDeploymentStatus = + StatusDetails(SimpleStateStatus.Restarting, Some(DeploymentId(UUID.randomUUID().toString))) + + InconsistentStateDetector.extractAtMostOneStatus(List(firstDeploymentStatus, secondDeploymentStatus)) shouldBe Some( + StatusDetails( + ProblemStateStatus.MultipleJobsRunning, + firstDeploymentStatus.deploymentId, + errors = List( + s"Expected one job, instead: ${firstDeploymentStatus.deploymentIdUnsafe} - RUNNING, ${secondDeploymentStatus.deploymentIdUnsafe} - RESTARTING" + ) + ) + ) + } + + test("return running status if cancelled job has last-modification date later then running job") { + val runningDeploymentStatus = + StatusDetails(SimpleStateStatus.Running, Some(DeploymentId(UUID.randomUUID().toString))) + val canceledDeploymentStatus = + StatusDetails(SimpleStateStatus.Canceled, Some(DeploymentId(UUID.randomUUID().toString))) + val duringCancelDeploymentStatus = + StatusDetails(SimpleStateStatus.DuringCancel, Some(DeploymentId(UUID.randomUUID().toString))) + + InconsistentStateDetector.extractAtMostOneStatus( + List(runningDeploymentStatus, canceledDeploymentStatus, duringCancelDeploymentStatus) + ) shouldBe Some( + StatusDetails( + SimpleStateStatus.Running, + runningDeploymentStatus.deploymentId, + ) + ) + } + + test("return last terminal state if not running") { + val firstFinishedDeploymentStatus = + StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId(UUID.randomUUID().toString))) + val secondFinishedDeploymentStatus = + StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId(UUID.randomUUID().toString))) + + InconsistentStateDetector.extractAtMostOneStatus( + List(firstFinishedDeploymentStatus, secondFinishedDeploymentStatus) + ) shouldBe Some(firstFinishedDeploymentStatus) + } + + test("return non-terminal state if not running") { + val finishedDeploymentStatus = + StatusDetails(SimpleStateStatus.Finished, Some(DeploymentId(UUID.randomUUID().toString))) + val nonTerminalDeploymentStatus = + StatusDetails(SimpleStateStatus.Restarting, Some(DeploymentId(UUID.randomUUID().toString))) + + InconsistentStateDetector.extractAtMostOneStatus( + List(finishedDeploymentStatus, nonTerminalDeploymentStatus) + ) shouldBe Some(nonTerminalDeploymentStatus) + } + +} diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index 3595a921948..09deb8334b1 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -273,7 +273,7 @@ class FlinkDeploymentManager( override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = new StateQueryForAllScenariosSupported { - override def getAllProcessesStates()( + override def getAllDeploymentStatuses()( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = getAllProcessesStatesFromFlink() diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala index c8a3ed671b5..ff7b9280f8c 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManagerSpec.scala @@ -7,6 +7,7 @@ import com.github.tomakehurst.wiremock.client.WireMock._ import com.typesafe.config.ConfigFactory import io.circe.Json.{fromString, fromValues} import org.apache.flink.api.common.JobStatus +import org.scalatest.LoneElement import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -14,9 +15,7 @@ import pl.touk.nussknacker.engine.DeploymentManagerDependencies import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -39,7 +38,7 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} //TODO move some tests to FlinkHttpClientTest -class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientScalaFutures { +class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientScalaFutures with LoneElement { private implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh @@ -378,155 +377,6 @@ class FlinkDeploymentManagerSpec extends AnyFunSuite with Matchers with PatientS history.filter(_.operation == "cancel") shouldBe Nil } - // TODO: extract test for InconsistentStateDetector - test("return failed status if two jobs running") { - statuses = List( - JobOverview("2343", "p1", 10L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)), - JobOverview("1111", "p1", 30L, 30L, JobStatus.RUNNING.name(), tasksOverview(running = 1)) - ) - - val manager = createManager(statuses) - val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue - InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( - StatusDetails( - ProblemStateStatus.MultipleJobsRunning, - Some(DeploymentId("1111")), - Some(ExternalDeploymentId("1111")), - Some( - ProcessVersion( - VersionId(1), - ProcessName("p1"), - ProcessId(123), - List.empty, - "user1", - None - ) - ), - startTime = Some(30L), - errors = List("Expected one job, instead: 1111 - RUNNING, 2343 - RUNNING") - ) - ) - } - - // TODO: extract test for InconsistentStateDetector - test("return failed status if two in non-terminal state") { - statuses = List( - JobOverview("2343", "p1", 10L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)), - JobOverview("1111", "p1", 30L, 30L, JobStatus.RESTARTING.name(), tasksOverview()) - ) - - val manager = createManager(statuses) - val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue - InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( - StatusDetails( - ProblemStateStatus.MultipleJobsRunning, - Some(DeploymentId("1111")), - Some(ExternalDeploymentId("1111")), - Some( - ProcessVersion( - VersionId(1), - ProcessName("p1"), - ProcessId(123), - List.empty, - "user1", - None - ) - ), - startTime = Some(30L), - errors = List("Expected one job, instead: 1111 - RESTARTING, 2343 - RUNNING") - ) - ) - } - - // TODO: extract test for InconsistentStateDetector - test("return running status if cancelled job has last-modification date later then running job") { - statuses = List( - JobOverview("2343", "p1", 20L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)), - JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)), - JobOverview("2222", "p1", 30L, 5L, JobStatus.CANCELLING.name(), tasksOverview(canceling = 1)) - ) - - val manager = createManager(statuses) - val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue - InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( - StatusDetails( - SimpleStateStatus.Running, - Some(DeploymentId("2343")), - Some(ExternalDeploymentId("2343")), - Some( - ProcessVersion( - VersionId(1), - ProcessName("p1"), - ProcessId(123), - List.empty, - "user1", - None - ) - ), - startTime = Some(10L) - ) - ) - } - - // TODO: extract test for InconsistentStateDetector - test("return last terminal state if not running") { - statuses = List( - JobOverview("2343", "p1", 40L, 10L, JobStatus.FINISHED.name(), tasksOverview(finished = 1)), - JobOverview("1111", "p1", 35L, 30L, JobStatus.FINISHED.name(), tasksOverview(finished = 1)) - ) - - val manager = createManager(statuses) - val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue - InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( - StatusDetails( - SimpleStateStatus.Finished, - Some(DeploymentId("2343")), - Some(ExternalDeploymentId("2343")), - Some( - ProcessVersion( - VersionId(1), - ProcessName("p1"), - ProcessId(123), - List.empty, - "user1", - None - ) - ), - startTime = Some(10L) - ) - ) - - } - - // TODO: extract test for InconsistentStateDetector - test("return non-terminal state if not running") { - statuses = List( - JobOverview("2343", "p1", 40L, 10L, JobStatus.FINISHED.name(), tasksOverview(finished = 1)), - JobOverview("1111", "p1", 35L, 30L, JobStatus.RESTARTING.name(), tasksOverview()) - ) - - val manager = createManager(statuses) - val returnedStatuses = manager.getScenarioDeploymentsStatuses(ProcessName("p1")).map(_.value).futureValue - InconsistentStateDetector.extractAtMostOneStatus(returnedStatuses) shouldBe Some( - StatusDetails( - SimpleStateStatus.Restarting, - Some(DeploymentId("1111")), - Some(ExternalDeploymentId("1111")), - Some( - ProcessVersion( - VersionId(1), - ProcessName("p1"), - ProcessId(123), - List.empty, - "user1", - None - ) - ), - startTime = Some(30L) - ) - ) - } - test("return process version the same as configured") { val jid = "2343" val processName = ProcessName("p1") diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala index 2dc88d4cbb9..ebf1743cb90 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/StateStatus.scala @@ -2,6 +2,14 @@ package pl.touk.nussknacker.engine.api.deployment import pl.touk.nussknacker.engine.api.deployment.StateStatus.StatusName +// This class represents both deployment status and scenario status. +// TODO: we should use DeploymentStatus in StatusDetails which is returned by DeploymentManager.getScenarioDeploymentsStatuses +// but before we do this, scenario scheduling mechanism should stop using DeploymentManager's API +trait StateStatus { + // Status identifier, should be unique among all states registered within all processing types. + def name: StatusName +} + object StateStatus { type StatusName = String @@ -10,11 +18,6 @@ object StateStatus { } -trait StateStatus { - // Status identifier, should be unique among all states registered within all processing types. - def name: StatusName -} - case class NoAttributesStateStatus(name: StatusName) extends StateStatus { override def toString: String = name }