Skip to content

Commit

Permalink
DeploymentsStatusProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 18, 2025
1 parent db151cb commit 4d49ac4
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ class ProcessesResources(
scenarioDetails <- processService
.getLatestProcessWithDetails(processId, GetScenarioWithDetailsOptions.detailsOnly)
.map(_.toEntity)
statusDetails <- scenarioStatusProvider
.getScenarioStatus(processId, currentlyPresentedVersionId)
statusDetails <- scenarioStatusProvider.getScenarioStatus(processId)
dto = scenarioStatusPresenter.toDto(statusDetails, scenarioDetails, currentlyPresentedVersionId)
} yield dto
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object DeploymentManagerReliableStatusesWrapper {

implicit class Ops(dmDispatcher: DeploymentManagerDispatcher) {

def getScenarioDeploymentsStatusesWithTimeoutOpt(
def getScenarioDeploymentsStatusesWithErrorWrappingAndTimeoutOpt(
processingType: ProcessingType,
scenarioName: ProcessName,
timeoutOpt: Option[FiniteDuration]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package pl.touk.nussknacker.ui.process.deployment.deploymentstatus

import akka.actor.ActorSystem
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentManagerReliableStatusesWrapper.Ops
import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenarioStateTimeout: Option[FiniteDuration])(
implicit system: ActorSystem
) extends LazyLogging {

private implicit val ec: ExecutionContext = system.dispatcher

// DeploymentManager's may support fetching state of all scenarios at once
// State is prefetched only when:
// - DM has capability StateQueryForAllScenariosSupported
// - the query is about more than one scenario handled by that DM - for one scenario prefetching would be non-optimal
// and this is a common case for this method because it is invoked for Id Traverse - see usages
def getPrefetchedDeploymentStatusesForSupportedManagers(
scenarios: List[ScenarioWithDetailsEntity[_]],
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[PrefetchedDeploymentStatuses] = {
// We assume that prefetching gives profits for at least two scenarios
val processingTypesWithMoreThanOneScenario = scenarios.groupBy(_.processingType).filter(_._2.size >= 2).keySet

Future
.sequence {
processingTypesWithMoreThanOneScenario.map { processingType =>
(for {
manager <- dispatcher.deploymentManager(processingType)
managerWithCapability <- manager.stateQueryForAllScenariosSupport match {
case supported: StateQueryForAllScenariosSupported => Some(supported)
case NoStateQueryForAllScenariosSupport => None
}
} yield getAllDeploymentStatuses(processingType, managerWithCapability))
.getOrElse(Future.successful(None))
}
}
.map(_.flatten.toMap)
.map(new PrefetchedDeploymentStatuses(_))
}

def getDeploymentStatuses(
processingType: ProcessingType,
scenarioName: ProcessName,
prefetchedDeploymentStatuses: Option[PrefetchedDeploymentStatuses],
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[StatusDetails]]]] = {
prefetchedDeploymentStatuses
.flatMap(_.get(processingType, scenarioName))
.map { prefetchedStatusDetails =>
Future.successful(Right(prefetchedStatusDetails))
}
.getOrElse {
dispatcher.getScenarioDeploymentsStatusesWithErrorWrappingAndTimeoutOpt(
processingType,
scenarioName,
scenarioStateTimeout
)
}
}

private def getAllDeploymentStatuses(processingType: ProcessingType, manager: StateQueryForAllScenariosSupported)(
implicit freshnessPolicy: DataFreshnessPolicy,
): Future[Option[(ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]])]] = {
manager
.getAllDeploymentStatuses()
.map(states => Some((processingType, states)))
.recover { case NonFatal(e) =>
logger.warn(
s"Failed to get statuses of all scenarios in deployment manager for $processingType: ${e.getMessage}",
e
)
None
}
}

}

class PrefetchedDeploymentStatuses(
prefetchedStatusesByProcessingType: Map[ProcessingType, WithDataFreshnessStatus[
Map[ProcessName, List[StatusDetails]]
]]
) {

def get(
processingType: ProcessingType,
scenarioName: ProcessName
): Option[WithDataFreshnessStatus[List[StatusDetails]]] =
for {
prefetchedStatusesForProcessingType <- prefetchedStatusesByProcessingType.get(processingType)
// Deployment statuses are prefetched for all scenarios for the given processing type.
// If there is no information available for a specific scenario name,
// then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails] instead of None
prefetchedStatusesForScenario = prefetchedStatusesForProcessingType.getOrElse(scenarioName, List.empty)
} yield prefetchedStatusesForScenario

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ class InconsistentStateDetector extends LazyLogging {
status
}

// TODO: This method is exposed to make transition between Option[StatusDetails] and List[StatusDetails] easier to perform.
// After full migration to List[StatusDetails], this method should be removed
def extractAtMostOneStatus(deploymentStatuses: List[StatusDetails]): Option[StatusDetails] =
private[scenariostatus] def extractAtMostOneStatus(deploymentStatuses: List[StatusDetails]): Option[StatusDetails] =
doExtractAtMostOneStatus(deploymentStatuses).fold(Some(_), identity)

private def doExtractAtMostOneStatus(
Expand Down
Loading

0 comments on commit 4d49ac4

Please sign in to comment.