Skip to content

Commit

Permalink
ScenarioDeploymentsReconciler init
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 20, 2025
1 parent b2357ff commit 10cd096
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ final case class ScenarioWithDetails(
createdAt: Instant,
createdBy: String,
override val labels: List[String],
// TODO: remove actions, are they still used anywhere?
lastDeployedAction: Option[ProcessAction],
lastStateAction: Option[ProcessAction],
lastAction: Option[ProcessAction],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.ui.process.deployment

import db.util.DBIOActionInstances.DB
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, DeploymentStatusDetails}
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.{DeploymentStatusesProvider, EngineDeploymentStatus}
import pl.touk.nussknacker.ui.security.api.{LoggedUser, NussknackerInternalUser}

import scala.concurrent.ExecutionContext

class ScenarioDeploymentsReconciler(deploymentStatusesProvider: DeploymentStatusesProvider)(
implicit ec: ExecutionContext
) {

// FIXME abr
def markLocalDeploymentsAsExecutionFinished(): DB[Unit] = {
implicit val user: LoggedUser = NussknackerInternalUser.instance
implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh
for {
nonExecutionFinishedDeployments <-
deploymentStatusesProvider.getAllLocallyAvailableNonExecutionFinishedDeployments
actionsIdsToMarkActionAsActionExecutionFinished = nonExecutionFinishedDeployments.collect {
case EngineDeploymentStatus(DeploymentStatusDetails(status, _, _), Some(localDeploymentAction))
if status == SimpleStateStatus.Finished =>
localDeploymentAction.id
}
} yield ()

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,85 @@ package pl.touk.nussknacker.ui.process.deployment.deploymentstatus

import akka.actor.ActorSystem
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances.DB
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.deployment.DeploymentId
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentManagerReliableStatusesWrapper.Ops
import pl.touk.nussknacker.ui.process.repository.{ScenarioActionsSummary, ScenarioWithDetailsEntity}
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.security.api.{LoggedUser, NussknackerInternalUser}
import slick.dbio.DBIOAction

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenarioStateTimeout: Option[FiniteDuration])(
// This class returns information about all deployments basen on two sources of information:
// - engine side - data are fetched by DeploymentManager
// - local store - data are queried from ActionRepository
// Data from local store are needed in certain situation:
// 1. when scenario deployment is requested but not yet seen on engine side (deploy action is in progress)
// 2. when scenario job was finished and was removed by retention mechanism
// 3. when scenario job have been canceled and was removed by retention mechanism
// TODO: Due to the fact, that we have no deployment's entity (only actions) and cancel action is not correlated with deploy action
// we can't recover information about canceled jobs that were removed by retention mechanism (3.) We should fix it
class DeploymentStatusesProvider(
actionRepository: ScenarioActionReadOnlyRepository,
dispatcher: DeploymentManagerDispatcher,
scenarioStateTimeout: Option[FiniteDuration]
)(
implicit system: ActorSystem
) extends LazyLogging {

private implicit val ec: ExecutionContext = system.dispatcher

def getAllLocallyAvailableNonExecutionFinishedDeployments(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): DB[List[DeploymentStatusWithReconciliationInfo]] = {
for {
finishedDeploysSummaryWithIdData <- actionRepository.getAllScenariosActionsWithScenarioIdData(
Set(ScenarioActionName.Deploy),
Set(ProcessActionState.Finished)
)
scenarioIds = finishedDeploysSummaryWithIdData.map(_.scenarioIdData)
deployActionsSummaries = finishedDeploysSummaryWithIdData
.map(summaryWithScenarioIdData =>
summaryWithScenarioIdData.scenarioIdData.id -> summaryWithScenarioIdData.summary
)
.toMap
prefetchedDeploymentStatuses <- DBIOAction.from(
getBulkQueriedDeploymentStatusesForSupportedManagers(scenarioIds, deployActionsSummaries)
)
finalDeploymentStatuses <- DBIOAction.from(Future.sequence(finishedDeploysSummaryWithIdData.map {
case ScenarioActionsSummaryWithScenarioIdData(actionsSummary, idData) =>
doGetDeploymentStatuses(idData, actionsSummary, Some(prefetchedDeploymentStatuses)).map(idData -> _)
}))
finalDeploymentStatusesFlatten = finalDeploymentStatuses.flatMap {
case (idData, Left(error)) =>
logger.warn(
s"Error during fetching deployments statuses for scenario ${idData.name}. Scenario deployments will be ignored in further processing",
error
)
List.empty
case (_, Right(WithDataFreshnessStatus(scenarioDeployments, _))) =>
scenarioDeployments.filter(_.isAvailableLocally)
}
} yield finalDeploymentStatusesFlatten

}

// DeploymentManager's may support fetching state of all scenarios at once
// State is prefetched only when:
// - DM has capability DeploymentsStatusesQueryForAllScenariosSupport
// - the query is about more than one scenario handled by that DM - for one scenario prefetching would be non-optimal
// and this is a common case for this method because it is invoked for Id Traverse - see usages
def getBulkQueriedDeploymentStatusesForSupportedManagers(
scenarios: List[ScenarioWithDetailsEntity[_]],
stateActionsSummaries: Map[ProcessId, ScenarioActionsSummary],
scenarios: List[ScenarioIdData],
deployActionsSummaries: Map[ProcessId, ScenarioActionsSummary],
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
Expand All @@ -49,10 +102,10 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
scenariosGroupedByProcessingType.map { scenario =>
val scenarioDeploymentsFromManager = allDeploymentsFromManager.getOrElse(scenario.name, List.empty)
val scenarioStateSummaries =
stateActionsSummaries.getOrElse(scenario.processId, ScenarioActionsSummary.empty)
deployActionsSummaries.getOrElse(scenario.id, ScenarioActionsSummary.empty)
val mergedDeploymentStatuses =
mergedStatusesFromManagerWithLocalDeployments(scenarioDeploymentsFromManager, scenarioStateSummaries)
scenario.processId -> mergedDeploymentStatuses
scenario.id -> mergedDeploymentStatuses
}.toMap
}
})
Expand All @@ -62,24 +115,38 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
}

def getDeploymentStatuses(
processingType: ProcessingType,
idWithName: ProcessIdWithName,
scenarioIdData: ScenarioIdData,
scenarioStateActionsSummary: ScenarioActionsSummary,
prefetchedDeploymentStatuses: Option[BulkQueriedDeploymentStatuses],
prefetchedDeploymentStatuses: Option[BulkQueriedDeploymentStatuses]
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusDetails]]]] = {
doGetDeploymentStatuses(scenarioIdData, scenarioStateActionsSummary, prefetchedDeploymentStatuses).map(
_.map(_.map(_.map(_.status)))
)
}

private def doGetDeploymentStatuses(
scenarioIdData: ScenarioIdData,
scenarioStateActionsSummary: ScenarioActionsSummary,
prefetchedDeploymentStatuses: Option[BulkQueriedDeploymentStatuses],
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[
Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusWithReconciliationInfo]]]
] = {
prefetchedDeploymentStatuses
.flatMap(_.getDeploymentStatuses(processingType, idWithName.id))
.flatMap(_.getDeploymentStatuses(scenarioIdData.processingType, scenarioIdData.id))
.map { prefetchedStatusDetails =>
Future.successful(Right(prefetchedStatusDetails))
}
.getOrElse {
dispatcher
.getScenarioDeploymentsStatusesWithErrorWrappingAndTimeoutOpt(
processingType,
idWithName.name,
scenarioIdData.processingType,
scenarioIdData.name,
scenarioStateTimeout
)
.map(_.map(_.map(mergedStatusesFromManagerWithLocalDeployments(_, scenarioStateActionsSummary))))
Expand All @@ -89,15 +156,23 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
private def mergedStatusesFromManagerWithLocalDeployments(
deploymentsFromManager: List[DeploymentStatusDetails],
scenarioStateActionsSummary: ScenarioActionsSummary
): List[DeploymentStatusDetails] = {
): List[DeploymentStatusWithReconciliationInfo] = {
val deploymentIdsFromManager = deploymentsFromManager.flatMap(_.deploymentId).toSet
val missingInProgressDeployments = scenarioStateActionsSummary.inProgressDeployments.filterNot(d =>
deploymentIdsFromManager.contains(d.deploymentIdUnsafe)
)
val missingExecutionFinishedDeployments = scenarioStateActionsSummary.executionFinishedDeployments.filterNot(d =>
deploymentIdsFromManager.contains(d.deploymentIdUnsafe)
val missingInProgressDeployments = scenarioStateActionsSummary.inProgressDeployments
.filterNot(d => deploymentIdsFromManager.contains(DeploymentId.fromActionId(d.id)))
.map(LocalOnlyDeploymentStatus(_))
val missingExecutionFinishedDeployments = scenarioStateActionsSummary.executionFinishedDeployments
.filterNot(d => deploymentIdsFromManager.contains(DeploymentId.fromActionId(d.id)))
.map(LocalOnlyDeploymentStatus(_))
val deploymentExistingOnEngine = deploymentsFromManager.map(deploymentStatusFromManager =>
EngineDeploymentStatus(
deploymentStatusFromManager,
deploymentStatusFromManager.deploymentId
.flatMap(_.toActionIdOpt)
.flatMap(scenarioStateActionsSummary.getFinishedDeployment)
)
)
deploymentsFromManager ++ missingInProgressDeployments ++ missingExecutionFinishedDeployments
deploymentExistingOnEngine ++ missingInProgressDeployments ++ missingExecutionFinishedDeployments
}

private def getAllDeploymentStatusesRecoveringFailure(
Expand All @@ -122,20 +197,57 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar

class BulkQueriedDeploymentStatuses(
bulkQueriedStatusesByProcessingType: Map[ProcessingType, WithDataFreshnessStatus[
Map[ProcessId, List[DeploymentStatusDetails]]
Map[ProcessId, List[DeploymentStatusWithReconciliationInfo]]
]]
) {

def getDeploymentStatuses(
processingType: ProcessingType,
scenarioId: ProcessId
): Option[WithDataFreshnessStatus[List[DeploymentStatusDetails]]] =
): Option[WithDataFreshnessStatus[List[DeploymentStatusWithReconciliationInfo]]] =
for {
prefetchedStatusesForProcessingType <- bulkQueriedStatusesByProcessingType.get(processingType)
statusesForProcessingType <- bulkQueriedStatusesByProcessingType.get(processingType)
// Deployment statuses are prefetched for all scenarios for the given processing type.
// If there is no information available for a specific scenario name,
// then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails] instead of None
prefetchedStatusesForScenario = prefetchedStatusesForProcessingType.getOrElse(scenarioId, List.empty)
} yield prefetchedStatusesForScenario
statusesForScenario = statusesForProcessingType.getOrElse(scenarioId, List.empty)
} yield statusesForScenario

}

sealed trait DeploymentStatusWithReconciliationInfo {
def status: DeploymentStatusDetails
def availableOnEngine: Boolean
def localDeploymentAction: Option[BasicActionDetails]

def isAvailableLocally: Boolean = localDeploymentAction.isDefined
}

final case class LocalOnlyDeploymentStatus(localStatusValue: BasicActionDetails)
extends DeploymentStatusWithReconciliationInfo {

override val availableOnEngine: Boolean = false
override def localDeploymentAction: Option[BasicActionDetails] = Some(localStatusValue)

override lazy val status: DeploymentStatusDetails = {
val deploymentStatus = localStatusValue.state match {
case ProcessActionState.InProgress => SimpleStateStatus.DuringDeploy
case ProcessActionState.Finished => SimpleStateStatus.Running
case ProcessActionState.ExecutionFinished => SimpleStateStatus.Finished
// Failed actions shouldn't be used anywhere but for notifications mechanisms purpose
case ProcessActionState.Failed =>
throw new IllegalStateException(s"ProcessActionState.Failed occurred in deployment status context")
}
DeploymentStatusDetails(
status = deploymentStatus,
deploymentId = Some(DeploymentId.fromActionId(localStatusValue.id)),
version = Some(localStatusValue.scenarioVersionId)
)
}

}

case class EngineDeploymentStatus(status: DeploymentStatusDetails, localDeploymentAction: Option[BasicActionDetails])
extends DeploymentStatusWithReconciliationInfo {
override def availableOnEngine: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class ScenarioStatusProvider(
dbioRunner.run(for {
processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id)
processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processIdWithName.name))
stateActionsSummary <- actionRepository.getSuccessScenarioActions(
stateActionsSummary <- actionRepository.getScenarioActions(
processDetails.processId,
ScenarioActionName.ScenarioStateActions
ScenarioActionName.ScenarioStateActions,
ProcessActionState.SuccessStates
)
scenarioStatus <- getScenarioStatusFetchingDeploymentsStatusesFromManager(processDetails, stateActionsSummary)
} yield scenarioStatus)
Expand All @@ -59,13 +60,14 @@ class ScenarioStatusProvider(
val scenarios = processTraverse.toList
dbioRunner.run(
for {
stateActionsSummary <- actionRepository.getSuccessScenarioActions(
stateActionsSummary <- actionRepository.getScenariosActions(
scenarios.map(_.processId),
ScenarioActionName.ScenarioStateActions
ScenarioActionName.ScenarioStateActions,
ProcessActionState.SuccessStates
)
prefetchedDeploymentStatuses <- DBIO.from(
deploymentStatusesProvider.getBulkQueriedDeploymentStatusesForSupportedManagers(
scenarios,
scenarios.map(_.idData),
stateActionsSummary
)
)
Expand Down Expand Up @@ -93,8 +95,7 @@ class ScenarioStatusProvider(
process,
scenarioStateActionsSummary,
deploymentStatusesProvider.getDeploymentStatuses(
process.processingType,
process.idWithName,
process.idData,
scenarioStateActionsSummary,
Some(prefetchedDeploymentStatuses)
)
Expand All @@ -111,9 +112,10 @@ class ScenarioStatusProvider(
processDetails: ScenarioWithDetailsEntity[_]
)(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): DB[ScenarioStatusWithAllowedActions] = {
for {
actionsSummary <- actionRepository.getSuccessScenarioActions(
actionsSummary <- actionRepository.getScenarioActions(
processDetails.processId,
ScenarioActionName.ScenarioStateActions,
ProcessActionState.SuccessStates
)
statusDetails <- getScenarioStatusFetchingDeploymentsStatusesFromManager(processDetails, actionsSummary)
allowedActions = getAllowedActions(statusDetails, processDetails, None)
Expand Down Expand Up @@ -145,8 +147,7 @@ class ScenarioStatusProvider(
processDetails,
stateActionsSummary,
deploymentStatusesProvider.getDeploymentStatuses(
processDetails.processingType,
processDetails.idWithName,
processDetails.idData,
stateActionsSummary,
prefetchedDeploymentStatuses = None
)
Expand Down
Loading

0 comments on commit 10cd096

Please sign in to comment.