From b48260deb43f3bb3a76410f897141a87c799d03c Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Fri, 14 Feb 2025 17:38:07 +0100 Subject: [PATCH] [NU-1979] Snenario status resolving moved from DM to core --- .../api/deployment/DeploymentManager.scala | 72 +----------------- .../ProcessStateDefinitionManager.scala | 1 + ...CachingProcessStateDeploymentManager.scala | 19 +---- .../InconsistentStateDetector.scala | 1 + .../testing/DeploymentManagerStub.scala | 73 +++++++------------ .../deployment/ScenarioStateProvider.scala | 41 ++++++----- .../periodic/PeriodicDeploymentManager.scala | 31 +------- .../periodic/PeriodicProcessService.scala | 11 +-- ...eriodicProcessStateDefinitionManager.scala | 9 ++- .../InvalidDeploymentManagerStub.scala | 20 +---- .../NotificationServiceTest.scala | 19 ++--- .../flink/DeploymentManagerStub.scala | 32 +++----- .../flink/PeriodicDeploymentManagerTest.scala | 26 ++++--- ...DevelopmentDeploymentManagerProvider.scala | 3 +- .../MockableDeploymentManagerProvider.scala | 32 ++------ .../management/FlinkDeploymentManager.scala | 29 +------- .../embedded/EmbeddedDeploymentManager.scala | 3 +- .../k8s/manager/K8sDeploymentManager.scala | 3 +- 18 files changed, 108 insertions(+), 317 deletions(-) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index ce2fa774423..2274ae5aa88 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -1,49 +1,13 @@ package pl.touk.nussknacker.engine.api.deployment import com.typesafe.config.Config -import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.newdeployment -import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps import java.time.Instant -import scala.concurrent.ExecutionContext.Implicits._ import scala.concurrent.Future -trait DeploymentManagerInconsistentStateHandlerMixIn { - self: DeploymentManager => - - final override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - val engineStateResolvedWithLastAction = flattenStatus(lastStateAction, statusDetails) - Future.successful( - processStateDefinitionManager.processState( - engineStateResolvedWithLastAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId - ) - ) - } - - // This method is protected to make possible to override it with own logic handling different edge cases like - // other state on engine than based on lastStateAction - protected def flattenStatus( - lastStateAction: Option[ProcessAction], - statusDetails: List[StatusDetails] - ): StatusDetails = { - InconsistentStateDetector.resolve(statusDetails, lastStateAction) - } - -} - trait DeploymentManager extends AutoCloseable { def deploymentSynchronisationSupport: DeploymentSynchronisationSupport @@ -54,28 +18,6 @@ trait DeploymentManager extends AutoCloseable { def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] - final def getProcessState( - idWithName: ProcessIdWithName, - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - )( - implicit freshnessPolicy: DataFreshnessPolicy - ): Future[WithDataFreshnessStatus[ProcessState]] = { - for { - statusDetailsWithFreshness <- getProcessStates(idWithName.name) - stateWithFreshness <- resolve( - idWithName, - statusDetailsWithFreshness.value, - lastStateAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ).map(statusDetailsWithFreshness.withValue) - } yield stateWithFreshness - } - /** * We provide a special wrapper called WithDataFreshnessStatus to ensure that fetched data is restored * from the cache or not. If you use any kind of cache in your DM implementation please wrap result data @@ -85,18 +27,6 @@ trait DeploymentManager extends AutoCloseable { implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[List[StatusDetails]]] - /** - * Resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager` - */ - def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] - def processStateDefinitionManager: ProcessStateDefinitionManager protected final def notImplemented: Future[Nothing] = diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala index 19ca137a742..709a34615f6 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/ProcessStateDefinitionManager.scala @@ -56,6 +56,7 @@ trait ProcessStateDefinitionManager { /** * Enhances raw [[StateStatus]] with scenario properties, including deployment info. */ + // FIXME abr: extract other class without most of fields from ProcessState def processState( statusDetails: StatusDetails, latestVersionId: VersionId, diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index 1689041c83b..9b520ee6f3d 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -4,7 +4,7 @@ import com.github.benmanes.caffeine.cache.{AsyncCache, Caffeine} import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.ProcessName import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext.Implicits._ @@ -24,23 +24,6 @@ class CachingProcessStateDeploymentManager( .expireAfterWrite(java.time.Duration.ofMillis(cacheTTL.toMillis)) .buildAsync[ProcessName, List[StatusDetails]] - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = - delegate.resolve( - idWithName, - statusDetails, - lastStateAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId - ) - override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala index c2f713a020a..7b4cbd19eae 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.Proble import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessActionState, ScenarioActionName, StatusDetails} import pl.touk.nussknacker.engine.deployment.DeploymentId +// FIXME abr: move to core object InconsistentStateDetector extends InconsistentStateDetector class InconsistentStateDetector extends LazyLogging { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 1919c1d4b35..8a463ad5bf6 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition._ import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.{ BaseModelData, DeploymentManagerDependencies, @@ -15,43 +15,36 @@ import pl.touk.nussknacker.engine.{ MetaDataInitializer } -import scala.concurrent.Future +import scala.collection.concurrent.TrieMap import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} -class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands { - - // We map lastStateAction to state to avoid some corner/blocking cases with the deleting/canceling scenario on tests.. - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - val lastStateActionStatus = lastStateAction match { - case Some(action) if action.actionName == ScenarioActionName.Deploy => - SimpleStateStatus.Running - case Some(action) if action.actionName == ScenarioActionName.Cancel => - SimpleStateStatus.Canceled - case _ => - SimpleStateStatus.NotDeployed - } - Future.successful( - processStateDefinitionManager.processState( - StatusDetails(lastStateActionStatus, None), - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) +class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymentManager { + + private val scenarioStatusMap = TrieMap.empty[ProcessName, StateStatus] + + override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { + case _: DMValidateScenarioCommand => Future.successful(()) + case run: DMRunDeploymentCommand => + Future { + scenarioStatusMap.put(run.processVersion.processName, SimpleStateStatus.Running) + None + } + case cancel: DMCancelScenarioCommand => + Future.successful { + scenarioStatusMap.put(cancel.scenarioName, SimpleStateStatus.Canceled) + () + } + case _: DMStopScenarioCommand | _: DMStopDeploymentCommand | _: DMCancelDeploymentCommand | + _: DMMakeScenarioSavepointCommand | _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => + notImplemented } override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { Future.successful( - WithDataFreshnessStatus.fresh(List.empty) + WithDataFreshnessStatus.fresh(scenarioStatusMap.get(name).map(StatusDetails(_, None)).toList) ) } @@ -67,21 +60,6 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands } -trait StubbingCommands { self: DeploymentManager => - - override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { - case _: DMValidateScenarioCommand => Future.successful(()) - case _: DMRunDeploymentCommand => Future.successful(None) - case _: DMStopDeploymentCommand => Future.successful(SavepointResult("")) - case _: DMStopScenarioCommand => Future.successful(SavepointResult("")) - case _: DMCancelDeploymentCommand => Future.successful(()) - case _: DMCancelScenarioCommand => Future.successful(()) - case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult("")) - case _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => notImplemented - } - -} - //This provider can be used for testing. Override methods to implement more complex behaviour //Provider is registered via ServiceLoader, so it can be used e.g. to run simple docker configuration class DeploymentManagerProviderStub extends DeploymentManagerProvider { @@ -91,7 +69,10 @@ class DeploymentManagerProviderStub extends DeploymentManagerProvider { deploymentManagerDependencies: DeploymentManagerDependencies, config: Config, scenarioStateCacheTTL: Option[FiniteDuration] - ): ValidatedNel[String, DeploymentManager] = Validated.valid(new DeploymentManagerStub) + ): ValidatedNel[String, DeploymentManager] = { + import deploymentManagerDependencies._ + Validated.valid(new DeploymentManagerStub) + } override def name: String = "stub" diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala index f40f4c0360c..bc80022bd0a 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/ScenarioStateProvider.scala @@ -8,6 +8,7 @@ import com.typesafe.scalalogging.LazyLogging import db.util.DBIOActionInstances._ import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy} import pl.touk.nussknacker.engine.api.deployment._ +import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.process._ @@ -255,17 +256,21 @@ private class ScenarioStateProviderImpl( processDetails, inProgressActionNames, currentlyPresentedVersionId, - manager => - manager - .resolve( - processDetails.idWithName, - prefetchedStatusDetails.value, - processDetails.lastStateAction, - processDetails.processVersionId, - processDetails.lastDeployedAction.map(_.processVersionId), - currentlyPresentedVersionId, - ) - .map(prefetchedStatusDetails.withValue) + { manager => + // FIXME abr: handle finished + Future { + prefetchedStatusDetails.map { prefetchedStatusDetailsValue => + val resolved = + InconsistentStateDetector.resolve(prefetchedStatusDetailsValue, processDetails.lastStateAction) + manager.processStateDefinitionManager.processState( + resolved, + processDetails.processVersionId, + processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId + ) + } + } + } ) } @@ -396,14 +401,14 @@ private class ScenarioStateProviderImpl( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[ProcessState]] = { + // FIXME abr: handle finished val state = deploymentManager - .getProcessState( - processIdWithName, - lastStateAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) + .getProcessStates(processIdWithName.name) + .map(_.map { statusDetails => + val resolved = InconsistentStateDetector.resolve(statusDetails, lastStateAction) + deploymentManager.processStateDefinitionManager + .processState(resolved, latestVersionId, deployedVersionId, currentlyPresentedVersionId) + }) .recover { case NonFatal(e) => logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) failedToGetProcessState(latestVersionId) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala index 5a143896590..c3bb2a25cc3 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicDeploymentManager.scala @@ -7,10 +7,9 @@ import pl.touk.nussknacker.engine.DeploymentManagerDependencies import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => ApiScheduleProperty} import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId -import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatus import pl.touk.nussknacker.ui.process.periodic.Utils._ import pl.touk.nussknacker.ui.process.repository.PeriodicProcessesRepository @@ -200,34 +199,6 @@ class PeriodicDeploymentManager private[periodic] ( service.getStatusDetails(name).map(_.map(List(_))) } - override def resolve( - idWithName: ProcessIdWithName, - statusDetailsList: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - val statusDetails = statusDetailsList match { - case head :: _ => - head - case Nil => - val status = PeriodicProcessStatus(List.empty, List.empty) - status.mergedStatusDetails.copy(status = status) - } - // TODO: add "real" presentation of deployments in GUI - val mergedStatus = processStateDefinitionManager - .processState( - statusDetails.copy(status = - statusDetails.status.asInstanceOf[PeriodicProcessStatus].mergedStatusDetails.status - ), - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - Future.successful(mergedStatus.copy(tooltip = processStateDefinitionManager.statusTooltip(statusDetails.status))) - } - override def processStateDefinitionManager: ProcessStateDefinitionManager = new PeriodicProcessStateDefinitionManager(delegate.processStateDefinitionManager) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala index 93512bb1a2f..585bbeb86c0 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessService.scala @@ -615,7 +615,7 @@ class PeriodicProcessService( ) } yield { val status = PeriodicProcessStatus(toDeploymentStatuses(activeSchedules), toDeploymentStatuses(inactiveSchedules)) - status.mergedStatusDetails.copy(status = status) + status.mergedStatusDetails } } @@ -651,8 +651,7 @@ class PeriodicProcessService( toDeploymentStatuses(processName, activeSchedulesForProcess), toDeploymentStatuses(processName, inactiveSchedulesForProcess) ) - val mergedStatus = status.mergedStatusDetails.copy(status = status) - (processName, mergedStatus) + (processName, status.mergedStatusDetails) }.toMap } } @@ -765,17 +764,13 @@ object PeriodicProcessService { case class PeriodicProcessStatus( activeDeploymentsStatuses: List[PeriodicDeploymentStatus], inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus] - ) extends StateStatus - with LazyLogging { + ) extends LazyLogging { def limitedAndSortedDeployments: List[PeriodicDeploymentStatus] = (activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take( MaxDeploymentsStatus - activeDeploymentsStatuses.size )).sorted(PeriodicDeploymentStatus.ordering.reverse) - // We present merged name to be possible to filter scenario by status - override def name: StatusName = mergedStatusDetails.status.name - // Currently we don't present deployments - theirs statuses are available only in tooltip - because of that we have to pick // one "merged" status that will be presented to users def mergedStatusDetails: StatusDetails = { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala index 17c3642a8c8..f999a485af0 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/periodic/PeriodicProcessStateDefinitionManager.scala @@ -1,7 +1,12 @@ package pl.touk.nussknacker.ui.process.periodic import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.defaultVisibleActions -import pl.touk.nussknacker.engine.api.deployment.{OverridingProcessStateDefinitionManager, ProcessStateDefinitionManager, ScenarioActionName, StateStatus} +import pl.touk.nussknacker.engine.api.deployment.{ + OverridingProcessStateDefinitionManager, + ProcessStateDefinitionManager, + ScenarioActionName, + StateStatus +} import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{PeriodicDeploymentStatus, PeriodicProcessStatus} class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionManager) @@ -18,7 +23,7 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana override def statusTooltip(stateStatus: StateStatus): String = { stateStatus match { case periodic: PeriodicProcessStatus => PeriodicProcessStateDefinitionManager.statusTooltip(periodic) - case _ => super.statusTooltip(stateStatus) + case other => throw new IllegalStateException(s"Unexpected status: $other") } } diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index ee66bfe9b5b..526b12b548a 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -3,7 +3,7 @@ package pl.touk.nussknacker.ui.process.processingtype import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleProcessStateDefinitionManager import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.ui.process.exception.ProcessIllegalAction import scala.concurrent.Future @@ -24,24 +24,6 @@ object InvalidDeploymentManagerStub extends DeploymentManager { Future.successful(WithDataFreshnessStatus.fresh(List(stubbedStatus))) } - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - Future.successful( - processStateDefinitionManager.processState( - stubbedStatus, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId - ) - ) - } - override def processStateDefinitionManager: ProcessStateDefinitionManager = SimpleProcessStateDefinitionManager override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala index aeebefc873e..a2ef9face5f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/notifications/NotificationServiceTest.scala @@ -4,15 +4,14 @@ import akka.actor.ActorSystem import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when import org.scalatest.exceptions.TestFailedException -import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, OptionValues} import org.scalatestplus.mockito.MockitoSugar import pl.touk.nussknacker.engine.api.component.NodesDeploymentData import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -102,8 +101,8 @@ class NotificationServiceTest val id = saveSampleProcess(processName) val processIdWithName = ProcessIdWithName(id, processName) - val deploymentManager = mock[DeploymentManager] - val (deploymentService, actionService, notificationService) = createServices(deploymentManager) + val deploymentManager = mock[DeploymentManager] + val (deploymentService, _, notificationService) = createServices(deploymentManager) def notificationsFor(user: LoggedUser): List[Notification] = notificationService @@ -288,16 +287,8 @@ class NotificationServiceTest ) private def createServices(deploymentManager: DeploymentManager) = { - when( - deploymentManager.getProcessState( - any[ProcessIdWithName], - any[Option[ProcessAction]], - any[VersionId], - any[Option[VersionId]], - any[Option[VersionId]], - )(any[DataFreshnessPolicy]) - ) - .thenReturn(Future.successful(WithDataFreshnessStatus.fresh(notDeployed))) + when(deploymentManager.getProcessStates(any[ProcessName])(any[DataFreshnessPolicy])) + .thenReturn(Future.successful(WithDataFreshnessStatus.fresh(List.empty[StatusDetails]))) val managerDispatcher = mock[DeploymentManagerDispatcher] when(managerDispatcher.deploymentManager(any[String])(any[LoggedUser])).thenReturn(Some(deploymentManager)) when(managerDispatcher.deploymentManagerUnsafe(any[String])(any[LoggedUser])).thenReturn(deploymentManager) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala index 3aa6e2c0e8b..c494da1c3ec 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/DeploymentManagerStub.scala @@ -1,16 +1,14 @@ package pl.touk.nussknacker.ui.process.periodic.flink import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} -import pl.touk.nussknacker.engine.testing.StubbingCommands import pl.touk.nussknacker.ui.process.periodic.model.PeriodicProcessDeploymentId import scala.collection.concurrent.TrieMap import scala.concurrent.Future -class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands { +class DeploymentManagerStub extends BaseDeploymentManager { val jobStatus: TrieMap[ProcessName, List[StatusDetails]] = TrieMap.empty @@ -64,23 +62,6 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands ) } - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = - Future.successful( - processStateDefinitionManager.processState( - statusDetails.headOption.getOrElse(StatusDetails(SimpleStateStatus.NotDeployed, None)), - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) - override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { @@ -101,4 +82,13 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands } + override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match { + case _: DMValidateScenarioCommand => Future.successful(()) + case _: DMRunDeploymentCommand => Future.successful(None) + case _: DMCancelScenarioCommand => Future.successful(()) + case _: DMStopScenarioCommand | _: DMStopDeploymentCommand | _: DMCancelDeploymentCommand | + _: DMMakeScenarioSavepointCommand | _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => + notImplemented + } + } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala index 1ab3c591fec..1b825bbcec2 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/process/periodic/flink/PeriodicDeploymentManagerTest.scala @@ -7,7 +7,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Inside, OptionValues} import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} @@ -125,7 +125,7 @@ class PeriodicDeploymentManagerTest state shouldEqual SimpleStateStatus.NotDeployed } - test("getProcessState - should be scheduled when scenario scheduled and no job on Flink") { + test("getProcessStates - should be scheduled when scenario scheduled and no job on Flink") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) @@ -136,15 +136,10 @@ class PeriodicDeploymentManagerTest ScenarioActionName.Deploy ) f.periodicDeploymentManager - .getProcessState( - idWithName, - None, - processVersion.versionId, - Some(processVersion.versionId), - Some(processVersion.versionId) - ) + .getProcessStates(idWithName.name) .futureValue .value + .loneElement .status shouldBe a[ScheduledStatus] } @@ -172,13 +167,20 @@ class PeriodicDeploymentManagerTest f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) f.periodicProcessService.deactivate(processName).futureValue - val state = + val statusDetails = f.periodicDeploymentManager - .getProcessState(idWithName, None, processVersion.versionId, None, Some(processVersion.versionId)) + .getProcessStates(processName) .futureValue .value + .loneElement - state.status shouldBe SimpleStateStatus.Finished + statusDetails.status shouldBe SimpleStateStatus.Finished + val state = f.periodicDeploymentManager.processStateDefinitionManager.processState( + statusDetails, + processVersion.versionId, + None, + Some(processVersion.versionId) + ) state.allowedActions shouldBe List(ScenarioActionName.Deploy, ScenarioActionName.Archive, ScenarioActionName.Rename) } diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index a27c2a17523..62eaf708cd0 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -26,8 +26,7 @@ import scala.util.{Failure, Success} class DevelopmentDeploymentManager(dependencies: DeploymentManagerDependencies, modelData: BaseModelData) extends DeploymentManager - with LazyLogging - with DeploymentManagerInconsistentStateHandlerMixIn { + with LazyLogging { import SimpleStateStatus._ import dependencies._ diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index b5c44dcc349..53b44e1dce1 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -11,20 +11,19 @@ import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus} -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.FlinkMiniClusterScenarioTestRunner import pl.touk.nussknacker.engine.flink.minicluster.util.DurationToRetryPolicyConverterOps._ import pl.touk.nussknacker.engine.management.FlinkStreamingPropertiesConfig import pl.touk.nussknacker.engine.newdeployment.DeploymentId -import pl.touk.nussknacker.engine.testing.StubbingCommands import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults import java.time.Instant import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} import scala.util.Try class MockableDeploymentManagerProvider extends DeploymentManagerProvider { @@ -61,8 +60,7 @@ object MockableDeploymentManagerProvider { implicit executionContext: ExecutionContext, ioRuntime: IORuntime ) extends DeploymentManager - with ManagerSpecificScenarioActivitiesStoredByManager - with StubbingCommands { + with ManagerSpecificScenarioActivitiesStoredByManager { private lazy val miniClusterWithServicesOpt = modelDataOpt.map { modelData => FlinkMiniClusterFactory.createMiniClusterWithServices( @@ -82,24 +80,6 @@ object MockableDeploymentManagerProvider { ) } - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - Future.successful( - processStateDefinitionManager.processState( - statusDetails.head, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId - ) - ) - } - override def processStateDefinitionManager: ProcessStateDefinitionManager = SimpleProcessStateDefinitionManager @@ -112,6 +92,7 @@ object MockableDeploymentManagerProvider { override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = { command match { + case _: DMValidateScenarioCommand => Future.successful(()) case DMRunDeploymentCommand(_, deploymentData, _, _) => Future { deploymentData.deploymentId.toNewDeploymentIdOpt @@ -129,8 +110,9 @@ object MockableDeploymentManagerProvider { s"Tests results not mocked for scenario [${processVersion.processName.value}] and no model data provided" ) ) - case other => - super.processCommand(other) + case _: DMCancelScenarioCommand | _: DMStopScenarioCommand | _: DMStopDeploymentCommand | + _: DMCancelDeploymentCommand | _: DMMakeScenarioSavepointCommand | _: DMRunOffScheduleCommand => + notImplemented } } diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala index fdfa1864a66..d8dbce3fb00 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkDeploymentManager.scala @@ -8,10 +8,9 @@ import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy import pl.touk.nussknacker.engine.api.deployment._ -import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector import pl.touk.nussknacker.engine.api.deployment.scheduler.services._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterWithServices @@ -59,31 +58,6 @@ class FlinkDeploymentManager( private val statusDeterminer = new FlinkStatusDetailsDeterminer(modelData.namingStrategy, client.getJobConfig) - /** - * Gets status from engine, handles finished state, resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager` - */ - override def resolve( - idWithName: ProcessIdWithName, - statusDetails: List[StatusDetails], - lastStateAction: Option[ProcessAction], - latestVersionId: VersionId, - deployedVersionId: Option[VersionId], - currentlyPresentedVersionId: Option[VersionId], - ): Future[ProcessState] = { - for { - actionAfterPostprocessOpt <- postprocess(idWithName, statusDetails) - engineStateResolvedWithLastAction = InconsistentStateDetector.resolve( - statusDetails, - actionAfterPostprocessOpt.orElse(lastStateAction) - ) - } yield processStateDefinitionManager.processState( - engineStateResolvedWithLastAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - } - // Flink has a retention for job overviews so we can't rely on this to distinguish between statuses: // - job is finished without troubles // - job has failed @@ -91,6 +65,7 @@ class FlinkDeploymentManager( // and treat another case as ProblemStateStatus.shouldBeRunning (see InconsistentStateDetector) // TODO: We should synchronize the status of deployment more explicitly as we already do in periodic case // See PeriodicProcessService.synchronizeDeploymentsStates and remove the InconsistentStateDetector + // FIXME abr move to reconciliation private def postprocess( idWithName: ProcessIdWithName, statusDetailsList: List[StatusDetails] diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 8352a7b9e0e..1f34fc80c65 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -27,8 +27,7 @@ class EmbeddedDeploymentManager( deploymentStrategy: DeploymentStrategy )(implicit ec: ExecutionContext) extends LiteDeploymentManager - with LazyLogging - with DeploymentManagerInconsistentStateHandlerMixIn { + with LazyLogging { private val retrieveDeployedScenariosTimeout = 10.seconds diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala index 5c83b0b9aa1..6ce5b511355 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala @@ -49,8 +49,7 @@ class K8sDeploymentManager( rawConfig: Config, dependencies: DeploymentManagerDependencies ) extends LiteDeploymentManager - with LazyLogging - with DeploymentManagerInconsistentStateHandlerMixIn { + with LazyLogging { import dependencies._