Skip to content

Commit

Permalink
Rollback to actions use only during scenario status resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 21, 2025
1 parent b7c3ec3 commit 51a10dd
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ActionService(
_ <- validateExpectedProcessingType(expectedProcessingType, processId)
lastStateAction <- actionRepository.getFinishedProcessActions(
processId,
Some(ScenarioActionName.StateActions)
Some(ScenarioActionName.ScenarioStatusActions)
)
} yield lastStateAction.headOption
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, DeploymentStatusDetails, WithDataFreshnessStatus}
import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.repository.ScenarioIdData
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.FutureUtils.FutureOps

Expand All @@ -16,8 +17,7 @@ object DeploymentManagerReliableStatusesWrapper {
implicit class Ops(dmDispatcher: DeploymentManagerDispatcher) {

def getScenarioDeploymentsStatusesWithErrorWrappingAndTimeoutOpt(
processingType: ProcessingType,
scenarioName: ProcessName,
scenarioIdData: ScenarioIdData,
timeoutOpt: Option[FiniteDuration]
)(
implicit user: LoggedUser,
Expand All @@ -28,18 +28,20 @@ object DeploymentManagerReliableStatusesWrapper {
val deploymentStatusesOptFuture
: Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusDetails]]]] =
dmDispatcher
.deploymentManager(processingType)
.deploymentManager(scenarioIdData.processingType)
.map(
_.getScenarioDeploymentsStatuses(scenarioName)
_.getScenarioDeploymentsStatuses(scenarioIdData.name)
.map(Right(_))
.recover { case NonFatal(e) => Left(GetDeploymentsStatusesFailure(scenarioName, e)) }
.recover { case NonFatal(e) => Left(GetDeploymentsStatusesFailure(scenarioIdData.name, e)) }
)
.getOrElse(
Future.successful(Left(ProcessingTypeIsNotConfigured(scenarioIdData.name, scenarioIdData.processingType)))
)
.getOrElse(Future.successful(Left(ProcessingTypeIsNotConfigured(scenarioName, processingType))))

timeoutOpt
.map { timeout =>
deploymentStatusesOptFuture
.withTimeout(timeout, timeoutResult = Left(GetDeploymentsStatusTimeout(scenarioName)))
.withTimeout(timeout, timeoutResult = Left(GetDeploymentsStatusTimeout(scenarioIdData.name)))
}
.getOrElse(deploymentStatusesOptFuture)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@ import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusMapOps
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentManagerReliableStatusesWrapper.Ops
import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

// This class provides statuses for every deployment of scenario
// Currently it doesn't return correct value in situation when deployment is requested but not yet visible on the engine side
// To fix this we have to change the model of actions(activities) which holds separate action/activity for deploy and for cancel
// and they are not related.
// It also don't return status of deployments that are finished but not visible on the engine side because of retention
// FIXME abr: Take into an account in progress and finished deployments that are not visible on the engine side
class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenarioStateTimeout: Option[FiniteDuration])(
// This class returns information about deployments basen on information from DeploymentManager's
// To have full information about DeploymentStatus'es, these information have to be merged with data from local store
// Data from local store are needed in certain situation:
// 1. when scenario deployment is requested but not yet seen on engine side (deploy action is in progress)
// 2. when scenario job was finished and was removed by retention mechanism
// 3. when scenario job have been canceled and was removed by retention mechanism
// Currently, for local store is used ActionRepository. It is quite problematic for determining the statuses. For example,
// case 3. is barely possible because cancel action is not correlated with deploy action so for two deploys done by one
// we won't know which one should be canceled
// TODO: Extract a new service that would should merged perspective for of deployment statuses. To do that,
// we need to change (or refactor) the local storage
class DeploymentStatusesProvider(
dispatcher: DeploymentManagerDispatcher,
scenarioStateTimeout: Option[FiniteDuration]
)(
implicit system: ActorSystem
) extends LazyLogging {

Expand All @@ -32,13 +40,13 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
// - the query is about more than one scenario handled by that DM - for one scenario prefetching would be non-optimal
// and this is a common case for this method because it is invoked for Id Traverse - see usages
def getBulkQueriedDeploymentStatusesForSupportedManagers(
scenarios: List[ScenarioWithDetailsEntity[_]],
scenarios: List[ScenarioIdData]
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[BulkQueriedDeploymentStatuses] = {
// We assume that prefetching gives profits for at least two scenarios
val processingTypesWithMoreThanOneScenario = scenarios.groupBy(_.processingType).filter(_._2.size >= 2).keySet
val processingTypesWithMoreThanOneScenario = scenarios.groupBy(_.processingType).filter(_._2.size >= 2).keys

Future
.sequence {
Expand All @@ -49,7 +57,9 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
case supported: DeploymentsStatusesQueryForAllScenariosSupported => Some(supported)
case NoDeploymentsStatusesQueryForAllScenariosSupport => None
}
} yield getAllDeploymentStatuses(processingType, managerWithCapability))
} yield getAllDeploymentStatusesRecoveringFailure(processingType, managerWithCapability).map(
_.map(processingType -> _)
))
.getOrElse(Future.successful(None))
}
}
Expand All @@ -58,36 +68,34 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
}

def getDeploymentStatuses(
processingType: ProcessingType,
scenarioName: ProcessName,
prefetchedDeploymentStatuses: Option[BulkQueriedDeploymentStatuses],
scenarioIdData: ScenarioIdData,
prefetchedDeploymentStatuses: Option[BulkQueriedDeploymentStatuses]
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[Either[GetDeploymentsStatusesError, WithDataFreshnessStatus[List[DeploymentStatusDetails]]]] = {
prefetchedDeploymentStatuses
.flatMap(_.getDeploymentStatuses(processingType, scenarioName))
.flatMap(_.getDeploymentStatuses(scenarioIdData))
.map { prefetchedStatusDetails =>
Future.successful(Right(prefetchedStatusDetails))
}
.getOrElse {
dispatcher.getScenarioDeploymentsStatusesWithErrorWrappingAndTimeoutOpt(
processingType,
scenarioName,
scenarioIdData,
scenarioStateTimeout
)
}
}

private def getAllDeploymentStatuses(
private def getAllDeploymentStatusesRecoveringFailure(
processingType: ProcessingType,
manager: DeploymentsStatusesQueryForAllScenariosSupported
)(
implicit freshnessPolicy: DataFreshnessPolicy,
): Future[Option[(ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[DeploymentStatusDetails]]])]] = {
): Future[Option[WithDataFreshnessStatus[Map[ProcessName, List[DeploymentStatusDetails]]]]] = {
manager
.getAllScenariosDeploymentsStatuses()
.map(states => Some((processingType, states)))
.map(Some(_))
.recover { case NonFatal(e) =>
logger.warn(
s"Failed to get statuses of all scenarios in deployment manager for $processingType: ${e.getMessage}",
Expand All @@ -106,15 +114,14 @@ class BulkQueriedDeploymentStatuses(
) {

def getDeploymentStatuses(
processingType: ProcessingType,
scenarioName: ProcessName
scenarioIdData: ScenarioIdData
): Option[WithDataFreshnessStatus[List[DeploymentStatusDetails]]] =
for {
prefetchedStatusesForProcessingType <- bulkQueriedStatusesByProcessingType.get(processingType)
prefetchedStatusesForProcessingType <- bulkQueriedStatusesByProcessingType.get(scenarioIdData.processingType)
// Deployment statuses are prefetched for all scenarios for the given processing type.
// If there is no information available for a specific scenario name,
// then it means that DM is not aware of this scenario, and we should default to List.empty[StatusDetails] instead of None
prefetchedStatusesForScenario = prefetchedStatusesForProcessingType.getOrElse(scenarioName, List.empty)
prefetchedStatusesForScenario = prefetchedStatusesForProcessingType.getOrElse(scenarioIdData.name, List.empty)
} yield prefetchedStatusesForScenario

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ package pl.touk.nussknacker.ui.process.deployment.scenariostatus
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.deployment.{
DeploymentStatusDetails,
ProcessAction,
ProcessActionState,
ScenarioActionName,
StateStatus
}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.deployment.DeploymentId

object InconsistentStateDetector extends InconsistentStateDetector
Expand All @@ -18,21 +12,21 @@ class InconsistentStateDetector extends LazyLogging {

def resolveScenarioStatus(
deploymentStatuses: List[DeploymentStatusDetails],
lastStateAction: ProcessAction
lastStateAction: ScenarioStatusActionDetails
): StateStatus = {
val status = (doExtractAtMostOneStatus(deploymentStatuses), lastStateAction) match {
case (Left(deploymentStatus), _) => deploymentStatus.status
case (Right(None), action)
if action.actionName == ScenarioActionName.Deploy && action.state == ProcessActionState.ExecutionFinished =>
case (Right(None), lastAction)
if lastAction.actionName == ScenarioActionName.Deploy && lastAction.state == ProcessActionState.ExecutionFinished =>
// Some engines like Flink have jobs retention. Because of that we restore finished status
SimpleStateStatus.Finished
case (Right(Some(deploymentStatus)), _) if shouldAlwaysReturnStatus(deploymentStatus) => deploymentStatus.status
case (Right(deploymentStatusOpt), action) if action.actionName == ScenarioActionName.Deploy =>
handleLastActionDeploy(deploymentStatusOpt, action)
case (Right(deploymentStatusOpt), lastAction) if lastAction.actionName == ScenarioActionName.Deploy =>
handleLastActionDeploy(deploymentStatusOpt, lastAction)
case (Right(Some(deploymentStatus)), _) if isFollowingDeployStatus(deploymentStatus) =>
handleFollowingDeployState(deploymentStatus, lastStateAction)
case (Right(deploymentStatusOpt), action) if action.actionName == ScenarioActionName.Cancel =>
handleCanceledState(deploymentStatusOpt)
handleFollowingDeployEngineSideStatus(deploymentStatus, lastStateAction)
case (Right(deploymentStatusOpt), lastAction) if lastAction.actionName == ScenarioActionName.Cancel =>
handleLastActionCancel(deploymentStatusOpt)
case (Right(Some(deploymentStatus)), _) => deploymentStatus.status
case (Right(None), _) => SimpleStateStatus.NotDeployed
}
Expand Down Expand Up @@ -69,13 +63,15 @@ class InconsistentStateDetector extends LazyLogging {
}

// This method handles some corner cases for canceled process -> with last action = Canceled
private def handleCanceledState(deploymentStatusOpt: Option[DeploymentStatusDetails]): StateStatus =
deploymentStatusOpt.map(_.status).getOrElse(SimpleStateStatus.Canceled)
private def handleLastActionCancel(deploymentStatusOpt: Option[DeploymentStatusDetails]): StateStatus =
deploymentStatusOpt
.map(_.status)
.getOrElse(SimpleStateStatus.Canceled)

// This method handles some corner cases for following deploy status mismatch last action version
private def handleFollowingDeployState(
private def handleFollowingDeployEngineSideStatus(
deploymentStatus: DeploymentStatusDetails,
lastStateAction: ProcessAction
lastStateAction: ScenarioStatusActionDetails
): StateStatus = {
if (lastStateAction.actionName != ScenarioActionName.Deploy)
ProblemStateStatus.shouldNotBeRunning(true)
Expand All @@ -86,7 +82,7 @@ class InconsistentStateDetector extends LazyLogging {
// This method handles some corner cases for deployed action mismatch version
private def handleLastActionDeploy(
deploymentStatusOpt: Option[DeploymentStatusDetails],
action: ProcessAction
action: ScenarioStatusActionDetails
): StateStatus =
deploymentStatusOpt match {
case Some(deploymentStatuses) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ScenarioStatusProvider(
for {
actionsInProgress <- getInProgressActionTypesForScenarios(scenarios)
prefetchedDeploymentStatuses <- DBIO.from(
deploymentStatusesProvider.getBulkQueriedDeploymentStatusesForSupportedManagers(scenarios)
deploymentStatusesProvider.getBulkQueriedDeploymentStatusesForSupportedManagers(scenarios.map(_.idData))
)
finalScenariosStatuses <- processTraverse
.map {
Expand All @@ -87,8 +87,7 @@ class ScenarioStatusProvider(
process,
inProgressActionNames,
deploymentStatusesProvider.getDeploymentStatuses(
process.processingType,
process.name,
process.idData,
Some(prefetchedDeploymentStatuses)
)
)
Expand Down Expand Up @@ -138,8 +137,7 @@ class ScenarioStatusProvider(
processDetails,
inProgressActionNames,
deploymentStatusesProvider.getDeploymentStatuses(
processDetails.processingType,
processDetails.name,
processDetails.idData,
prefetchedDeploymentStatuses = None
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ abstract class DBFetchingProcessRepository[F[_]: Monad](
)
lastStateActionPerProcess <- fetchActionsOrEmpty(
actionRepository
.getLastActionPerProcess(ProcessActionState.FinishedStates, Some(ScenarioActionName.StateActions))
.getLastActionPerProcess(ProcessActionState.FinishedStates, Some(ScenarioActionName.ScenarioStatusActions))
)
// For last deploy action we are interested in Deploys that are Finished (not ExecutionFinished) and that are not Cancelled
// so that the presence of such an action means that the process is currently deployed
Expand Down Expand Up @@ -248,11 +248,11 @@ abstract class DBFetchingProcessRepository[F[_]: Monad](
process = process,
processVersion = processVersion,
lastActionData = actions.headOption,
lastStateActionData = actions.find(a => ScenarioActionName.StateActions.contains(a.actionName)),
lastStateActionData = actions.find(a => ScenarioActionName.ScenarioStatusActions.contains(a.actionName)),
// For last deploy action we are interested in Deploys that are Finished (not ExecutionFinished) and that are not Cancelled
// so that the presence of such an action means that the process is currently deployed
lastDeployedActionData = actions
.find(action => Set(ScenarioActionName.Deploy, ScenarioActionName.Cancel).contains(action.actionName))
.find(action => ScenarioActionName.ScenarioStatusActions.contains(action.actionName))
.filter(action =>
action.actionName == ScenarioActionName.Deploy && action.state == ProcessActionState.Finished
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.ui.process.repository

import pl.touk.nussknacker.engine.api.deployment.ProcessAction
import pl.touk.nussknacker.engine.api.deployment.ProcessActionState.ProcessActionState
import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessActionId, ScenarioActionName}
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{
ProcessId,
Expand Down Expand Up @@ -47,6 +48,8 @@ final case class ScenarioWithDetailsEntity[ScenarioShape](
) extends ListenerScenarioWithDetails {
lazy val idWithName: ProcessIdWithName = ProcessIdWithName(processId, name)

lazy val idData: ScenarioIdData = ScenarioIdData(processId, name, processingType)

def mapScenario[NewShape](action: ScenarioShape => NewShape): ScenarioWithDetailsEntity[NewShape] =
copy(json = action(json))

Expand All @@ -68,3 +71,7 @@ final case class ScenarioWithDetailsEntity[ScenarioShape](
}

}

// It is a set of id-like data that allow to identify scenario both in local storage and on engine side
// On engine side it is needed to have processingType (to navigate to correct DeploymentManager) and scenario name
final case class ScenarioIdData(id: ProcessId, name: ProcessName, processingType: ProcessingType)
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import cats.implicits.catsSyntaxEitherId
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances.DB
import pl.touk.nussknacker.engine.api.Comment
import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.deployment.ScenarioAttachment.{AttachmentFilename, AttachmentId}
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.modelinfo.ModelInfo
import pl.touk.nussknacker.engine.api.process.ProcessId
import pl.touk.nussknacker.ui.api.description.scenarioActivity.Dtos.Legacy
import pl.touk.nussknacker.ui.db.entity.{
Expand All @@ -22,7 +22,6 @@ import pl.touk.nussknacker.ui.process.repository.DbioRepository
import pl.touk.nussknacker.ui.process.repository.activities.ScenarioActivityRepository.{
CommentModificationMetadata,
DeleteAttachmentError,
ModifyActivityError,
ModifyCommentError
}
import pl.touk.nussknacker.ui.security.api.LoggedUser
Expand Down Expand Up @@ -350,10 +349,6 @@ class DbScenarioActivityRepository private (override protected val dbRef: DbRef,
private lazy val attachmentInsertQuery =
attachmentsTable returning attachmentsTable.map(_.id) into ((item, id) => item.copy(id = id))

private def validateActivityExistsForScenario(entity: ScenarioActivityEntityData) = {
fromEntity(entity).left.map(_ => ModifyActivityError.CouldNotModifyActivity).map(_._2)
}

private def modifyActivityByActivityId[ERROR](
activityId: ScenarioActivityId,
activityDoesNotExistError: ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,6 @@ object ScenarioActivityRepository {
case object CouldNotDeleteAttachment extends DeleteAttachmentError
}

sealed trait ModifyActivityError

object ModifyActivityError {
case object ActivityDoesNotExist extends ModifyActivityError
case object CouldNotModifyActivity extends ModifyActivityError
}

final case class CommentModificationMetadata(commentForScenarioDeployed: Boolean)

}
Loading

0 comments on commit 51a10dd

Please sign in to comment.