Skip to content

Commit

Permalink
[NU-1979] Snenario status resolving moved from DM to core
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 14, 2025
1 parent db90430 commit b48260d
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 317 deletions.
Original file line number Diff line number Diff line change
@@ -1,49 +1,13 @@
package pl.touk.nussknacker.engine.api.deployment

import com.typesafe.config.Config
import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future

trait DeploymentManagerInconsistentStateHandlerMixIn {
self: DeploymentManager =>

final override def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState] = {
val engineStateResolvedWithLastAction = flattenStatus(lastStateAction, statusDetails)
Future.successful(
processStateDefinitionManager.processState(
engineStateResolvedWithLastAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId
)
)
}

// This method is protected to make possible to override it with own logic handling different edge cases like
// other state on engine than based on lastStateAction
protected def flattenStatus(
lastStateAction: Option[ProcessAction],
statusDetails: List[StatusDetails]
): StatusDetails = {
InconsistentStateDetector.resolve(statusDetails, lastStateAction)
}

}

trait DeploymentManager extends AutoCloseable {

def deploymentSynchronisationSupport: DeploymentSynchronisationSupport
Expand All @@ -54,28 +18,6 @@ trait DeploymentManager extends AutoCloseable {

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
idWithName: ProcessIdWithName,
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
)(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[ProcessState]] = {
for {
statusDetailsWithFreshness <- getProcessStates(idWithName.name)
stateWithFreshness <- resolve(
idWithName,
statusDetailsWithFreshness.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
).map(statusDetailsWithFreshness.withValue)
} yield stateWithFreshness
}

/**
* We provide a special wrapper called WithDataFreshnessStatus to ensure that fetched data is restored
* from the cache or not. If you use any kind of cache in your DM implementation please wrap result data
Expand All @@ -85,18 +27,6 @@ trait DeploymentManager extends AutoCloseable {
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[List[StatusDetails]]]

/**
* Resolves possible inconsistency with lastAction and formats status using `ProcessStateDefinitionManager`
*/
def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState]

def processStateDefinitionManager: ProcessStateDefinitionManager

protected final def notImplemented: Future[Nothing] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ trait ProcessStateDefinitionManager {
/**
* Enhances raw [[StateStatus]] with scenario properties, including deployment info.
*/
// FIXME abr: extract other class without most of fields from ProcessState
def processState(
statusDetails: StatusDetails,
latestVersionId: VersionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.github.benmanes.caffeine.cache.{AsyncCache, Caffeine}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.process.ProcessName

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext.Implicits._
Expand All @@ -24,23 +24,6 @@ class CachingProcessStateDeploymentManager(
.expireAfterWrite(java.time.Duration.ofMillis(cacheTTL.toMillis))
.buildAsync[ProcessName, List[StatusDetails]]

override def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState] =
delegate.resolve(
idWithName,
statusDetails,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId
)

override def getProcessStates(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.Proble
import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessActionState, ScenarioActionName, StatusDetails}
import pl.touk.nussknacker.engine.deployment.DeploymentId

// FIXME abr: move to core
object InconsistentStateDetector extends InconsistentStateDetector

class InconsistentStateDetector extends LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,44 @@ import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig
import pl.touk.nussknacker.engine.api.definition._
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.{
BaseModelData,
DeploymentManagerDependencies,
DeploymentManagerProvider,
MetaDataInitializer
}

import scala.concurrent.Future
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands {

// We map lastStateAction to state to avoid some corner/blocking cases with the deleting/canceling scenario on tests..
override def resolve(
idWithName: ProcessIdWithName,
statusDetails: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState] = {
val lastStateActionStatus = lastStateAction match {
case Some(action) if action.actionName == ScenarioActionName.Deploy =>
SimpleStateStatus.Running
case Some(action) if action.actionName == ScenarioActionName.Cancel =>
SimpleStateStatus.Canceled
case _ =>
SimpleStateStatus.NotDeployed
}
Future.successful(
processStateDefinitionManager.processState(
StatusDetails(lastStateActionStatus, None),
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
)
class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymentManager {

private val scenarioStatusMap = TrieMap.empty[ProcessName, StateStatus]

override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match {
case _: DMValidateScenarioCommand => Future.successful(())
case run: DMRunDeploymentCommand =>
Future {
scenarioStatusMap.put(run.processVersion.processName, SimpleStateStatus.Running)
None
}
case cancel: DMCancelScenarioCommand =>
Future.successful {
scenarioStatusMap.put(cancel.scenarioName, SimpleStateStatus.Canceled)
()
}
case _: DMStopScenarioCommand | _: DMStopDeploymentCommand | _: DMCancelDeploymentCommand |
_: DMMakeScenarioSavepointCommand | _: DMRunOffScheduleCommand | _: DMTestScenarioCommand =>
notImplemented
}

override def getProcessStates(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = {
Future.successful(
WithDataFreshnessStatus.fresh(List.empty)
WithDataFreshnessStatus.fresh(scenarioStatusMap.get(name).map(StatusDetails(_, None)).toList)
)
}

Expand All @@ -67,21 +60,6 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands

}

trait StubbingCommands { self: DeploymentManager =>

override def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] = command match {
case _: DMValidateScenarioCommand => Future.successful(())
case _: DMRunDeploymentCommand => Future.successful(None)
case _: DMStopDeploymentCommand => Future.successful(SavepointResult(""))
case _: DMStopScenarioCommand => Future.successful(SavepointResult(""))
case _: DMCancelDeploymentCommand => Future.successful(())
case _: DMCancelScenarioCommand => Future.successful(())
case _: DMMakeScenarioSavepointCommand => Future.successful(SavepointResult(""))
case _: DMRunOffScheduleCommand | _: DMTestScenarioCommand => notImplemented
}

}

//This provider can be used for testing. Override methods to implement more complex behaviour
//Provider is registered via ServiceLoader, so it can be used e.g. to run simple docker configuration
class DeploymentManagerProviderStub extends DeploymentManagerProvider {
Expand All @@ -91,7 +69,10 @@ class DeploymentManagerProviderStub extends DeploymentManagerProvider {
deploymentManagerDependencies: DeploymentManagerDependencies,
config: Config,
scenarioStateCacheTTL: Option[FiniteDuration]
): ValidatedNel[String, DeploymentManager] = Validated.valid(new DeploymentManagerStub)
): ValidatedNel[String, DeploymentManager] = {
import deploymentManagerDependencies._
Validated.valid(new DeploymentManagerStub)
}

override def name: String = "stub"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances._
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName.{Cancel, Deploy}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentStateDetector
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.{SimpleProcessStateDefinitionManager, SimpleStateStatus}
import pl.touk.nussknacker.engine.api.process._
Expand Down Expand Up @@ -255,17 +256,21 @@ private class ScenarioStateProviderImpl(
processDetails,
inProgressActionNames,
currentlyPresentedVersionId,
manager =>
manager
.resolve(
processDetails.idWithName,
prefetchedStatusDetails.value,
processDetails.lastStateAction,
processDetails.processVersionId,
processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId,
)
.map(prefetchedStatusDetails.withValue)
{ manager =>
// FIXME abr: handle finished
Future {
prefetchedStatusDetails.map { prefetchedStatusDetailsValue =>
val resolved =
InconsistentStateDetector.resolve(prefetchedStatusDetailsValue, processDetails.lastStateAction)
manager.processStateDefinitionManager.processState(
resolved,
processDetails.processVersionId,
processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId
)
}
}
}
)
}

Expand Down Expand Up @@ -396,14 +401,14 @@ private class ScenarioStateProviderImpl(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[ProcessState]] = {

// FIXME abr: handle finished
val state = deploymentManager
.getProcessState(
processIdWithName,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
.getProcessStates(processIdWithName.name)
.map(_.map { statusDetails =>
val resolved = InconsistentStateDetector.resolve(statusDetails, lastStateAction)
deploymentManager.processStateDefinitionManager
.processState(resolved, latestVersionId, deployedVersionId, currentlyPresentedVersionId)
})
.recover { case NonFatal(e) =>
logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e)
failedToGetProcessState(latestVersionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import pl.touk.nussknacker.engine.DeploymentManagerDependencies
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.scheduler.model.{ScheduleProperty => ApiScheduleProperty}
import pl.touk.nussknacker.engine.api.deployment.scheduler.services._
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatus
import pl.touk.nussknacker.ui.process.periodic.Utils._
import pl.touk.nussknacker.ui.process.repository.PeriodicProcessesRepository

Expand Down Expand Up @@ -200,34 +199,6 @@ class PeriodicDeploymentManager private[periodic] (
service.getStatusDetails(name).map(_.map(List(_)))
}

override def resolve(
idWithName: ProcessIdWithName,
statusDetailsList: List[StatusDetails],
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
): Future[ProcessState] = {
val statusDetails = statusDetailsList match {
case head :: _ =>
head
case Nil =>
val status = PeriodicProcessStatus(List.empty, List.empty)
status.mergedStatusDetails.copy(status = status)
}
// TODO: add "real" presentation of deployments in GUI
val mergedStatus = processStateDefinitionManager
.processState(
statusDetails.copy(status =
statusDetails.status.asInstanceOf[PeriodicProcessStatus].mergedStatusDetails.status
),
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
Future.successful(mergedStatus.copy(tooltip = processStateDefinitionManager.statusTooltip(statusDetails.status)))
}

override def processStateDefinitionManager: ProcessStateDefinitionManager =
new PeriodicProcessStateDefinitionManager(delegate.processStateDefinitionManager)

Expand Down
Loading

0 comments on commit b48260d

Please sign in to comment.