Skip to content

Commit

Permalink
test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 14, 2025
1 parent f68b964 commit dea343a
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class PeriodicDeploymentManager private[periodic] (
override def getProcessStates(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = {
service.getStatusDetails(name).map(_.map(List(_)))
service.getMergedStatusDetails(name).map(_.map(List(_)))
}

override def processStateDefinitionManager: ProcessStateDefinitionManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class PeriodicProcessService(

private def now(): LocalDateTime = LocalDateTime.now(clock)

def getStatusDetails(
def getMergedStatusDetails(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[StatusDetails]] = {
delegateDeploymentManager.getProcessStates(name).flatMap { statusesWithFreshness =>
Expand Down Expand Up @@ -746,10 +746,8 @@ class PeriodicProcessService(

object PeriodicProcessService {

private implicit val localDateOrdering: Ordering[LocalDateTime] = Ordering.by(identity[ChronoLocalDateTime[_]])

// TODO: some configuration?
private val MaxDeploymentsStatus = 5
private[periodic] val MaxDeploymentsStatus = 5

private val DeploymentStatusesToReschedule =
Set(PeriodicProcessDeploymentStatus.Deployed, PeriodicProcessDeploymentStatus.Failed)
Expand All @@ -761,45 +759,52 @@ object PeriodicProcessService {
// for each historical and active deployments. mergedStatusDetails and methods below are for purpose of presentation
// of single, merged status similar to this available for streaming job. This merged status should be a straightforward derivative
// of these deployments statuses so it will be easy to figure out it by user.
case class PeriodicProcessStatus(
private case class PeriodicProcessStatus(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus],
inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus]
) extends LazyLogging {

def limitedAndSortedDeployments: List[PeriodicDeploymentStatus] =
(activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take(
MaxDeploymentsStatus - activeDeploymentsStatuses.size
)).sorted(PeriodicDeploymentStatus.ordering.reverse)

// Currently we don't present deployments - theirs statuses are available only in tooltip - because of that we have to pick
// one "merged" status that will be presented to users
def mergedStatusDetails: StatusDetails = {
pickMostImportantActiveDeployment
def toPeriodicProcessStatusWithMergedStatus(mergedStatus: StateStatus) = PeriodicProcessStatusWithMergedStatus(
activeDeploymentsStatuses,
inactiveDeploymentsStatuses,
mergedStatus
)

def createStatusDetails(mergedStatus: StateStatus, periodicDeploymentIdOpt: Option[PeriodicProcessDeploymentId]) =
StatusDetails(
status = toPeriodicProcessStatusWithMergedStatus(mergedStatus),
deploymentId = periodicDeploymentIdOpt.map(_.toString).map(DeploymentId(_)),
)

pickMostImportantActiveDeployment(activeDeploymentsStatuses)
.map { deploymentStatus =>
def createStatusDetails(status: StateStatus) = StatusDetails(
status = status,
deploymentId = Some(DeploymentId(deploymentStatus.deploymentId.toString)),
)
if (deploymentStatus.isWaitingForReschedule) {
deploymentStatus.runtimeStatusOpt
.map(_.copy(status = WaitingForScheduleStatus))
.getOrElse(createStatusDetails(WaitingForScheduleStatus))
.map(_.copy(status = toPeriodicProcessStatusWithMergedStatus(WaitingForScheduleStatus)))
.getOrElse(createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId)))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.Scheduled) {
createStatusDetails(ScheduledStatus(deploymentStatus.runAt))
createStatusDetails(ScheduledStatus(deploymentStatus.runAt), Some(deploymentStatus.deploymentId))
} else if (Set(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.FailedOnDeploy)
.contains(deploymentStatus.status)) {
createStatusDetails(ProblemStateStatus.Failed)
createStatusDetails(ProblemStateStatus.Failed, Some(deploymentStatus.deploymentId))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.RetryingDeploy) {
createStatusDetails(SimpleStateStatus.DuringDeploy)
createStatusDetails(SimpleStateStatus.DuringDeploy, Some(deploymentStatus.deploymentId))
} else {
deploymentStatus.runtimeStatusOpt.getOrElse {
createStatusDetails(WaitingForScheduleStatus)
}
deploymentStatus.runtimeStatusOpt
.map(runtimeDetails =>
runtimeDetails.copy(status = toPeriodicProcessStatusWithMergedStatus(runtimeDetails.status))
)
.getOrElse {
createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId))
}
}
}
.getOrElse {
if (inactiveDeploymentsStatuses.isEmpty) {
StatusDetails(SimpleStateStatus.NotDeployed, None)
createStatusDetails(SimpleStateStatus.NotDeployed, None)
} else {
val latestInactiveProcessId =
inactiveDeploymentsStatuses.maxBy(_.scheduleId.processId.value).scheduleId.processId
Expand All @@ -810,52 +815,64 @@ object PeriodicProcessService {
if (latestDeploymentsForEachScheduleOfLatestProcessId.forall(
_.status == PeriodicProcessDeploymentStatus.Finished
)) {
StatusDetails(SimpleStateStatus.Finished, None)
createStatusDetails(SimpleStateStatus.Finished, None)
} else {
StatusDetails(SimpleStateStatus.Canceled, None)
createStatusDetails(SimpleStateStatus.Canceled, None)
}
}
}
}

/**
* Returns latest deployment. It can be in any status (consult [[PeriodicProcessDeploymentStatus]]).
* For multiple schedules only single schedule is returned in the following order:
* <ol>
* <li>If there are any deployed scenarios, then the first one is returned. Please be aware that deployment of previous
* schedule could fail.</li>
* <li>If there are any failed scenarios, then the last one is returned. We want to inform user, that some deployments
* failed and the scenario should be rescheduled/retried manually.
* <li>If there are any scheduled scenarios, then the first one to be run is returned.
* <li>If there are any finished scenarios, then the last one is returned. It should not happen because the scenario
* should be deactivated earlier.
* </ol>
*/
def pickMostImportantActiveDeployment: Option[PeriodicDeploymentStatus] = {
val lastActiveDeploymentStatusForEachSchedule =
latestDeploymentForEachSchedule(activeDeploymentsStatuses).sorted

def first(status: PeriodicProcessDeploymentStatus) =
lastActiveDeploymentStatusForEachSchedule.find(_.status == status)

def last(status: PeriodicProcessDeploymentStatus) =
lastActiveDeploymentStatusForEachSchedule.reverse.find(_.status == status)

first(PeriodicProcessDeploymentStatus.Deployed)
.orElse(last(PeriodicProcessDeploymentStatus.Failed))
.orElse(last(PeriodicProcessDeploymentStatus.RetryingDeploy))
.orElse(last(PeriodicProcessDeploymentStatus.FailedOnDeploy))
.orElse(first(PeriodicProcessDeploymentStatus.Scheduled))
.orElse(last(PeriodicProcessDeploymentStatus.Finished))
}
}

private def latestDeploymentForEachSchedule(deploymentsStatuses: List[PeriodicDeploymentStatus]) = {
deploymentsStatuses
.groupBy(_.scheduleId)
.values
.toList
.map(_.min(PeriodicDeploymentStatus.ordering.reverse))
}
/**
* Returns latest deployment. It can be in any status (consult [[PeriodicProcessDeploymentStatus]]).
* For multiple schedules only single schedule is returned in the following order:
* <ol>
* <li>If there are any deployed scenarios, then the first one is returned. Please be aware that deployment of previous
* schedule could fail.</li>
* <li>If there are any failed scenarios, then the last one is returned. We want to inform user, that some deployments
* failed and the scenario should be rescheduled/retried manually.
* <li>If there are any scheduled scenarios, then the first one to be run is returned.
* <li>If there are any finished scenarios, then the last one is returned. It should not happen because the scenario
* should be deactivated earlier.
* </ol>
*/
private[periodic] def pickMostImportantActiveDeployment(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus]
): Option[PeriodicDeploymentStatus] = {
val lastActiveDeploymentStatusForEachSchedule =
latestDeploymentForEachSchedule(activeDeploymentsStatuses).sorted

def first(status: PeriodicProcessDeploymentStatus) =
lastActiveDeploymentStatusForEachSchedule.find(_.status == status)

def last(status: PeriodicProcessDeploymentStatus) =
lastActiveDeploymentStatusForEachSchedule.reverse.find(_.status == status)

first(PeriodicProcessDeploymentStatus.Deployed)
.orElse(last(PeriodicProcessDeploymentStatus.Failed))
.orElse(last(PeriodicProcessDeploymentStatus.RetryingDeploy))
.orElse(last(PeriodicProcessDeploymentStatus.FailedOnDeploy))
.orElse(first(PeriodicProcessDeploymentStatus.Scheduled))
.orElse(last(PeriodicProcessDeploymentStatus.Finished))
}

private def latestDeploymentForEachSchedule(deploymentsStatuses: List[PeriodicDeploymentStatus]) = {
deploymentsStatuses
.groupBy(_.scheduleId)
.values
.toList
.map(_.min(PeriodicDeploymentStatus.ordering.reverse))
}

case class PeriodicProcessStatusWithMergedStatus(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus],
inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus],
mergedStatus: StateStatus
) extends StateStatus {

override def name: StatusName = mergedStatus.name

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import pl.touk.nussknacker.engine.api.deployment.{
ScenarioActionName,
StateStatus
}
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{PeriodicDeploymentStatus, PeriodicProcessStatus}
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{
MaxDeploymentsStatus,
PeriodicDeploymentStatus,
PeriodicProcessStatusWithMergedStatus
}

import java.net.URI

class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionManager)
extends OverridingProcessStateDefinitionManager(
Expand All @@ -20,19 +26,55 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana
delegate = delegate
) {

override def statusActions(processStatus: ProcessStateDefinitionManager.ProcessStatus): List[ScenarioActionName] = {
super.statusActions(processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus))
}

override def actionTooltips(
processStatus: ProcessStateDefinitionManager.ProcessStatus
): Map[ScenarioActionName, String] = {
super.actionTooltips(
processStatus.copy(stateStatus = extractPeriodicStatus(processStatus.stateStatus).mergedStatus)
)
}

override def statusIcon(stateStatus: StateStatus): URI = {
super.statusIcon(extractPeriodicStatus(stateStatus).mergedStatus)
}

override def statusDescription(stateStatus: StateStatus): String = {
super.statusDescription(extractPeriodicStatus(stateStatus).mergedStatus)
}

override def statusTooltip(stateStatus: StateStatus): String = {
val periodicStatus = extractPeriodicStatus(stateStatus)
PeriodicProcessStateDefinitionManager.statusTooltip(
activeDeploymentsStatuses = periodicStatus.activeDeploymentsStatuses,
inactiveDeploymentsStatuses = periodicStatus.inactiveDeploymentsStatuses
)
}

private def extractPeriodicStatus(stateStatus: StateStatus) = {
stateStatus match {
case periodic: PeriodicProcessStatus => PeriodicProcessStateDefinitionManager.statusTooltip(periodic)
case other => throw new IllegalStateException(s"Unexpected status: $other")
case periodic: PeriodicProcessStatusWithMergedStatus =>
periodic
case other => throw new IllegalStateException(s"Unexpected status: $other")
}
}

}

object PeriodicProcessStateDefinitionManager {

def statusTooltip(processStatus: PeriodicProcessStatus): String = {
processStatus.limitedAndSortedDeployments
def statusTooltip(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus],
inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus]
): String = {
val limitedAndSortedDeployments: List[PeriodicDeploymentStatus] =
(activeDeploymentsStatuses ++ inactiveDeploymentsStatuses.take(
MaxDeploymentsStatus - activeDeploymentsStatuses.size
)).sorted(PeriodicDeploymentStatus.ordering.reverse)
limitedAndSortedDeployments
.map { case d @ PeriodicDeploymentStatus(_, scheduleId, _, runAt, status, _, _) =>
val refinedStatus = {
if (d.isCanceled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse
import org.hamcrest.Matchers._
import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.must.Matchers.be
import pl.touk.nussknacker.development.manager.BasicStatusDetails
import pl.touk.nussknacker.development.manager.MockableDeploymentManagerProvider.MockableDeploymentManager
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
Expand Down Expand Up @@ -42,7 +43,7 @@ class AppApiHttpServiceBusinessSpec
createDeployedExampleScenario(ProcessName("id1"))

MockableDeploymentManager.configureScenarioStatuses(
Map("id1" -> SimpleStateStatus.Running)
Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))))
)
}
.when()
Expand Down Expand Up @@ -70,9 +71,8 @@ class AppApiHttpServiceBusinessSpec

MockableDeploymentManager.configureScenarioStatuses(
Map(
"id1" -> ProblemStateStatus.FailedToGet,
"id2" -> SimpleStateStatus.Running,
"id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"),
"id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None),
"id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))),
)
)
}
Expand All @@ -94,10 +94,6 @@ class AppApiHttpServiceBusinessSpec
.applicationState {
createDeployedCanceledExampleScenario(ProcessName("id1"))
createDeployedExampleScenario(ProcessName("id2"))

MockableDeploymentManager.configureScenarioStatuses(
Map("id2" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"))
)
}
.when()
.basicAuthAllPermUser()
Expand All @@ -120,8 +116,8 @@ class AppApiHttpServiceBusinessSpec

MockableDeploymentManager.configureScenarioStatuses(
Map(
"id1" -> SimpleStateStatus.Running,
"id2" -> SimpleStateStatus.Running,
"id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))),
"id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))),
)
)
}
Expand All @@ -144,7 +140,7 @@ class AppApiHttpServiceBusinessSpec
createDeployedExampleScenario(ProcessName("id1"))

MockableDeploymentManager.configureScenarioStatuses(
Map("id1" -> SimpleStateStatus.Running)
Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))))
)
}
.when()
Expand All @@ -169,7 +165,7 @@ class AppApiHttpServiceBusinessSpec
createDeployedExampleScenario(ProcessName("id1"))

MockableDeploymentManager.configureScenarioStatuses(
Map("id1" -> SimpleStateStatus.Running)
Map("id1" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))))
)
}
.basicAuthAllPermUser()
Expand Down Expand Up @@ -259,9 +255,8 @@ class AppApiHttpServiceBusinessSpec

MockableDeploymentManager.configureScenarioStatuses(
Map(
"id1" -> ProblemStateStatus.FailedToGet,
"id2" -> SimpleStateStatus.Running,
"id3" -> ProblemStateStatus.shouldBeRunning(VersionId(1L), "admin"),
"id1" -> BasicStatusDetails(ProblemStateStatus.Failed, None),
"id2" -> BasicStatusDetails(SimpleStateStatus.Running, Some(VersionId(1))),
)
)
}
Expand Down
Loading

0 comments on commit dea343a

Please sign in to comment.