Skip to content

Commit

Permalink
PeriodicDeploymentManagerTest uses DeploymentService - skip Inconsist…
Browse files Browse the repository at this point in the history
…entStateDetector use for periodics
  • Loading branch information
arkadius committed Feb 19, 2025
1 parent 80efd7b commit cd1bcbd
Show file tree
Hide file tree
Showing 17 changed files with 381 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@ object ProcessStateDefinitionManager {
* It is used as an argument of ProcessStateDefinitionManager methods
*
* @param scenarioStatus current scenario state
* @param latestVersionId latest saved versionId for the scenario
* @param deployedVersionId currently deployed versionId of the scenario
*/
final case class ScenarioStatusWithScenarioContext(
scenarioStatus: StateStatus,
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,19 @@ class OverridingProcessStateDefinitionManagerTest extends AnyFunSuite with Match
definitionsMap(CustomState.name).description shouldBe "Custom description"
definitionsMap(CustomStateThatOverrides.name).description shouldBe "Custom description that overrides"

def toInput(status: StateStatus) =
ScenarioStatusWithScenarioContext(status, VersionId(1), None, None)
def toStatusWithContext(status: StateStatus) =
ScenarioStatusWithScenarioContext(status, None, None)

// Description assigned to a scenario, with custom calculations
manager.statusDescription(toInput(DefaultState)) shouldBe "Calculated description for default, e.g. schedule date"
manager.statusDescription(toInput(CustomState)) shouldBe "Calculated description for custom, e.g. schedule date"
manager.statusDescription(toInput(CustomStateThatOverrides)) shouldBe "Custom description that overrides"
manager.statusDescription(
toStatusWithContext(DefaultState)
) shouldBe "Calculated description for default, e.g. schedule date"
manager.statusDescription(
toStatusWithContext(CustomState)
) shouldBe "Calculated description for custom, e.g. schedule date"
manager.statusDescription(
toStatusWithContext(CustomStateThatOverrides)
) shouldBe "Custom description that overrides"
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.api.deployment

import org.scalatest.Inside
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.{
Expand All @@ -17,7 +16,6 @@ class SimpleScenarioStatusDtoSpec extends AnyFunSuiteLike with Matchers with Ins
SimpleProcessStateDefinitionManager.statusPresentation(
ScenarioStatusWithScenarioContext(
status,
VersionId(1),
None,
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ScenarioStatusPresenter(dispatcher: DeploymentManagerDispatcher) {
.statusPresentation(
ScenarioStatusWithScenarioContext(
scenarioStatus = scenarioStatus,
latestVersionId = processDetails.processVersionId,
deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId = currentlyPresentedVersionId
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.{
GetDeploymentsStatusesError,
PrefetchedDeploymentStatuses
}
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicScenarioStatus
import pl.touk.nussknacker.ui.process.periodic.PeriodicStateStatus
import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository.ProcessNotFoundError
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.security.api.LoggedUser
Expand Down Expand Up @@ -123,7 +125,6 @@ class ScenarioStatusProvider(
.statusActions(
ScenarioStatusWithScenarioContext(
scenarioStatus = scenarioStatus,
latestVersionId = processDetails.processVersionId,
deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId = currentlyPresentedVersionId
)
Expand Down Expand Up @@ -195,7 +196,14 @@ class ScenarioStatusProvider(
s"Deployment statuses for: '${processDetails.name}' are: ${statusWithFreshness.value}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction
.map(_.actionName)})"
)
InconsistentStateDetector.resolveScenarioStatus(statusWithFreshness.value, lastStateActionValue)
statusWithFreshness.value match {
// periodic mechanism already returns a scenario status, so we don't need to resolve it
// TODO: PeriodicDeploymentManager shouldn't be a DeploymentManager, we should treat it as a separate
// mechanism for both action commands and scenario status resolving
case DeploymentStatusDetails(periodic: PeriodicScenarioStatus, _, _) :: Nil => periodic
case _ =>
InconsistentStateDetector.resolveScenarioStatus(statusWithFreshness.value, lastStateActionValue)
}
}
case None => // We assume that the process never deployed should have no state at the engine
logStatusAndReturn(SimpleStateStatus.NotDeployed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import pl.touk.nussknacker.ui.process.repository.PeriodicProcessesRepository
import java.time.{Clock, Instant}
import scala.concurrent.{ExecutionContext, Future}

// FIXME abr: test on DeploymentService / ScenarioStatusProvider level
object PeriodicDeploymentManager {

def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,7 @@ class PeriodicProcessService(
MaxDeploymentsStatus
)
} yield {
val status = PeriodicProcessStatus(toDeploymentStatuses(activeSchedules), toDeploymentStatuses(inactiveSchedules))
status.mergedStatusDetails
mergedDeploymentStatus(toDeploymentStatuses(activeSchedules), toDeploymentStatuses(inactiveSchedules))
}
}

Expand Down Expand Up @@ -649,11 +648,11 @@ class PeriodicProcessService(
allProcessNames.map { processName =>
val activeSchedulesForProcess = activeSchedules.getOrElse(processName, SchedulesState(Map.empty))
val inactiveSchedulesForProcess = inactiveSchedules.getOrElse(processName, SchedulesState(Map.empty))
val status = PeriodicProcessStatus(
val status = mergedDeploymentStatus(
toDeploymentStatuses(processName, activeSchedulesForProcess),
toDeploymentStatuses(processName, inactiveSchedulesForProcess)
)
(processName, status.mergedStatusDetails)
(processName, status)
}.toMap
}
}
Expand Down Expand Up @@ -761,71 +760,68 @@ object PeriodicProcessService {
// for each historical and active deployments. mergedStatusDetails and methods below are for purpose of presentation
// of single, merged status similar to this available for streaming job. This merged status should be a straightforward derivative
// of these deployments statuses so it will be easy to figure out it by user.
private case class PeriodicProcessStatus(

// Currently we don't present deployments - theirs statuses are available only in tooltip - because of that we have to pick
// one "merged" status that will be presented to users
private def mergedDeploymentStatus(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus],
inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus]
) extends LazyLogging {

// Currently we don't present deployments - theirs statuses are available only in tooltip - because of that we have to pick
// one "merged" status that will be presented to users
def mergedStatusDetails: DeploymentStatusDetails = {
def toPeriodicProcessStatusWithMergedStatus(mergedStatus: StateStatus) = PeriodicProcessStatusWithMergedStatus(
activeDeploymentsStatuses,
inactiveDeploymentsStatuses,
mergedStatus
)
): DeploymentStatusDetails = {
def toPeriodicProcessStatusWithMergedStatus(mergedStatus: StateStatus) = PeriodicScenarioStatus(
activeDeploymentsStatuses,
inactiveDeploymentsStatuses,
mergedStatus
)

def createStatusDetails(mergedStatus: StateStatus, periodicDeploymentIdOpt: Option[PeriodicProcessDeploymentId]) =
DeploymentStatusDetails(
status = toPeriodicProcessStatusWithMergedStatus(mergedStatus),
deploymentId = periodicDeploymentIdOpt.map(_.toString).map(DeploymentId(_)),
version = None
)
def createStatusDetails(mergedStatus: StateStatus, periodicDeploymentIdOpt: Option[PeriodicProcessDeploymentId]) =
DeploymentStatusDetails(
status = toPeriodicProcessStatusWithMergedStatus(mergedStatus),
deploymentId = periodicDeploymentIdOpt.map(_.toString).map(DeploymentId(_)),
version = None
)

pickMostImportantActiveDeployment(activeDeploymentsStatuses)
.map { deploymentStatus =>
if (deploymentStatus.isWaitingForReschedule) {
deploymentStatus.runtimeStatusOpt
.map(_.copy(status = toPeriodicProcessStatusWithMergedStatus(WaitingForScheduleStatus)))
.getOrElse(createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId)))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.Scheduled) {
createStatusDetails(ScheduledStatus(deploymentStatus.runAt), Some(deploymentStatus.deploymentId))
} else if (Set(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.FailedOnDeploy)
.contains(deploymentStatus.status)) {
createStatusDetails(ProblemStateStatus.Failed, Some(deploymentStatus.deploymentId))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.RetryingDeploy) {
createStatusDetails(SimpleStateStatus.DuringDeploy, Some(deploymentStatus.deploymentId))
} else {
deploymentStatus.runtimeStatusOpt
.map(runtimeDetails =>
runtimeDetails.copy(status = toPeriodicProcessStatusWithMergedStatus(runtimeDetails.status))
)
.getOrElse {
createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId))
}
}
}
.getOrElse {
if (inactiveDeploymentsStatuses.isEmpty) {
createStatusDetails(SimpleStateStatus.NotDeployed, None)
} else {
val latestInactiveProcessId =
inactiveDeploymentsStatuses.maxBy(_.scheduleId.processId.value).scheduleId.processId
val latestDeploymentsForEachScheduleOfLatestProcessId = latestDeploymentForEachSchedule(
inactiveDeploymentsStatuses.filter(_.scheduleId.processId == latestInactiveProcessId)
pickMostImportantActiveDeployment(activeDeploymentsStatuses)
.map { deploymentStatus =>
if (deploymentStatus.isWaitingForReschedule) {
deploymentStatus.runtimeStatusOpt
.map(_.copy(status = toPeriodicProcessStatusWithMergedStatus(WaitingForScheduleStatus)))
.getOrElse(createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId)))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.Scheduled) {
createStatusDetails(ScheduledStatus(deploymentStatus.runAt), Some(deploymentStatus.deploymentId))
} else if (Set(PeriodicProcessDeploymentStatus.Failed, PeriodicProcessDeploymentStatus.FailedOnDeploy)
.contains(deploymentStatus.status)) {
createStatusDetails(ProblemStateStatus.Failed, Some(deploymentStatus.deploymentId))
} else if (deploymentStatus.status == PeriodicProcessDeploymentStatus.RetryingDeploy) {
createStatusDetails(SimpleStateStatus.DuringDeploy, Some(deploymentStatus.deploymentId))
} else {
deploymentStatus.runtimeStatusOpt
.map(runtimeDetails =>
runtimeDetails.copy(status = toPeriodicProcessStatusWithMergedStatus(runtimeDetails.status))
)

if (latestDeploymentsForEachScheduleOfLatestProcessId.forall(
_.status == PeriodicProcessDeploymentStatus.Finished
)) {
createStatusDetails(SimpleStateStatus.Finished, None)
} else {
createStatusDetails(SimpleStateStatus.Canceled, None)
.getOrElse {
createStatusDetails(WaitingForScheduleStatus, Some(deploymentStatus.deploymentId))
}
}
}
}
}
.getOrElse {
if (inactiveDeploymentsStatuses.isEmpty) {
createStatusDetails(SimpleStateStatus.NotDeployed, None)
} else {
val latestInactiveProcessId =
inactiveDeploymentsStatuses.maxBy(_.scheduleId.processId.value).scheduleId.processId
val latestDeploymentsForEachScheduleOfLatestProcessId = latestDeploymentForEachSchedule(
inactiveDeploymentsStatuses.filter(_.scheduleId.processId == latestInactiveProcessId)
)

if (latestDeploymentsForEachScheduleOfLatestProcessId.forall(
_.status == PeriodicProcessDeploymentStatus.Finished
)) {
createStatusDetails(SimpleStateStatus.Finished, None)
} else {
createStatusDetails(SimpleStateStatus.Canceled, None)
}
}
}
}

/**
Expand Down Expand Up @@ -869,7 +865,7 @@ object PeriodicProcessService {
.map(_.min(PeriodicDeploymentStatus.ordering.reverse))
}

case class PeriodicProcessStatusWithMergedStatus(
case class PeriodicScenarioStatus(
activeDeploymentsStatuses: List[PeriodicDeploymentStatus],
inactiveDeploymentsStatuses: List[PeriodicDeploymentStatus],
mergedStatus: StateStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import pl.touk.nussknacker.engine.api.deployment.{
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.{
MaxDeploymentsStatus,
PeriodicDeploymentStatus,
PeriodicProcessStatusWithMergedStatus
PeriodicScenarioStatus
}

import java.net.URI
Expand Down Expand Up @@ -73,7 +73,7 @@ class PeriodicProcessStateDefinitionManager(delegate: ProcessStateDefinitionMana
}

private def extractPeriodicStatus(stateStatus: StateStatus) = {
Option(stateStatus) collect { case periodic: PeriodicProcessStatusWithMergedStatus =>
Option(stateStatus) collect { case periodic: PeriodicScenarioStatus =>
periodic
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,6 @@ class DeploymentServiceSpec
private def getAllowedActions(status: StateStatus) = deploymentManager.processStateDefinitionManager.statusActions(
ScenarioStatusWithScenarioContext(
scenarioStatus = status,
latestVersionId = VersionId(1),
deployedVersionId = None,
currentlyPresentedVersionId = None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object TestDeploymentServiceFactory {

implicit val actorSystem: ActorSystem = ActorSystem("TestDeploymentServiceFactory")
implicit val ec: ExecutionContext = actorSystem.dispatcher
private val clock = Clock.systemUTC()
val clock: Clock = Clock.systemUTC()

val processingType = "streaming"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import pl.touk.nussknacker.test.base.it.WithClock
import pl.touk.nussknacker.test.utils.domain.TestFactory
import pl.touk.nussknacker.test.utils.domain.TestFactory.newWriteProcessRepository
import pl.touk.nussknacker.test.utils.scalas.DBIOActionValues
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicProcessStatusWithMergedStatus
import pl.touk.nussknacker.ui.process.periodic.PeriodicProcessService.PeriodicScenarioStatus
import pl.touk.nussknacker.ui.process.periodic.flink.{DeploymentManagerStub, ScheduledExecutionPerformerStub}
import pl.touk.nussknacker.ui.process.periodic.legacy.db.{LegacyDbInitializer, SlickLegacyPeriodicProcessesRepository}
import pl.touk.nussknacker.ui.process.periodic.model._
Expand Down Expand Up @@ -299,7 +299,7 @@ class PeriodicProcessServiceIntegrationTest
)
afterDeployDeployment.runAt shouldBe localTime(expectedScheduleTime)

f.delegateDeploymentManagerStub.setStateStatus(
f.delegateDeploymentManagerStub.setDeploymentStatus(
processName,
SimpleStateStatus.Finished,
Some(afterDeployDeployment.id)
Expand Down Expand Up @@ -392,7 +392,7 @@ class PeriodicProcessServiceIntegrationTest
// finish all
stateAfterDeploy.values.foreach(schedulesState => {
val deployment = schedulesState.firstScheduleData.latestDeployments.head
f.delegateDeploymentManagerStub.setStateStatus(
f.delegateDeploymentManagerStub.setDeploymentStatus(
processName,
SimpleStateStatus.Finished,
Some(deployment.id)
Expand Down Expand Up @@ -565,12 +565,12 @@ class PeriodicProcessServiceIntegrationTest

val deployment = toDeploy.find(_.scheduleName.value.contains(firstSchedule)).value
service.deploy(deployment).futureValue
f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Running, Some(deployment.id))
f.delegateDeploymentManagerStub.setDeploymentStatus(processName, SimpleStateStatus.Running, Some(deployment.id))

val toDeployAfterDeploy = service.findToBeDeployed.futureValue
toDeployAfterDeploy should have length 0

f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deployment.id))
f.delegateDeploymentManagerStub.setDeploymentStatus(processName, SimpleStateStatus.Finished, Some(deployment.id))
service.handleFinished.futureValue

val toDeployAfterFinish = service.findToBeDeployed.futureValue
Expand Down Expand Up @@ -677,7 +677,7 @@ class PeriodicProcessServiceIntegrationTest
.futureValue
.value
.status
.asInstanceOf[PeriodicProcessStatusWithMergedStatus]
.asInstanceOf[PeriodicScenarioStatus]
.activeDeploymentsStatuses
)
.value
Expand Down Expand Up @@ -802,7 +802,7 @@ class PeriodicProcessServiceIntegrationTest
toDeploy should have length 1
val deployment = toDeploy.head
service.deploy(deployment).futureValue
f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deployment.id))
f.delegateDeploymentManagerStub.setDeploymentStatus(processName, SimpleStateStatus.Finished, Some(deployment.id))

tryWithFailedListener { () =>
service.deactivate(processName)
Expand Down Expand Up @@ -834,7 +834,7 @@ class PeriodicProcessServiceIntegrationTest
val deployment = toDeploy.head
service.deploy(deployment).futureValue

f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deployment.id))
f.delegateDeploymentManagerStub.setDeploymentStatus(processName, ProblemStateStatus.Failed, Some(deployment.id))

// this one is cyclically called by RescheduleActor
service.handleFinished.futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DeploymentManagerStub extends BaseDeploymentManager {
)
}

def setStateStatus(
def setDeploymentStatus(
processName: ProcessName,
status: StateStatus,
deploymentIdOpt: Option[PeriodicProcessDeploymentId]
Expand Down
Loading

0 comments on commit cd1bcbd

Please sign in to comment.