Skip to content

Commit

Permalink
TestDeploymentServiceFactory, removed ActionService -> DeploymentMana…
Browse files Browse the repository at this point in the history
…gerDispatcher dependency
  • Loading branch information
arkadius committed Feb 19, 2025
1 parent ac015aa commit 5cf00fd
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import java.net.URI
* May contain longer, detailed status description.
*/
@JsonCodec case class ScenarioStatusDto(
// This field is not used by frontend but is useful for scripting
statusName: String,
// TODO it is a temporary solution - it should be removed after full migration to statusName
status: LegacyScenarioStatusNameDto,
// TODO: flatten it
status: ScenarioStatusNameWrapperDto,
visibleActions: List[ScenarioActionName],
allowedActions: List[ScenarioActionName],
actionTooltips: Map[ScenarioActionName, String],
Expand All @@ -31,7 +29,7 @@ import java.net.URI
description: String,
)

@JsonCodec case class LegacyScenarioStatusNameDto(name: String)
@JsonCodec case class ScenarioStatusNameWrapperDto(name: String)

object ScenarioStatusDto {
implicit val uriEncoder: Encoder[URI] = Encoder.encodeString.contramap(_.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.engine.util.Implicits.RichTupleList
import pl.touk.nussknacker.engine.version.BuildInfo
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto
import pl.touk.nussknacker.restmodel.scenariodetails.{ScenarioStatusDto, ScenarioStatusNameWrapperDto}
import pl.touk.nussknacker.ui.api.description.AppApiEndpoints
import pl.touk.nussknacker.ui.api.description.AppApiEndpoints.Dtos._
import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions
Expand Down Expand Up @@ -172,8 +172,12 @@ class AppApiHttpService(
GetScenarioWithDetailsOptions.detailsOnly.copy(fetchState = true)
)
statusMap = processes.flatMap(process => process.state.map(process.name -> _)).toMap
// TODO: we should use domain objects instead of DTOs
withProblem = statusMap.collect {
case (name, processStatus @ ScenarioStatusDto(ProblemStateStatus.name, _, _, _, _, _, _, _)) =>
case (
name,
processStatus @ ScenarioStatusDto(ScenarioStatusNameWrapperDto(ProblemStateStatus.name), _, _, _, _, _, _)
) =>
(name, processStatus)
}
} yield withProblem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.S
import pl.touk.nussknacker.engine.api.deployment.StateStatus
import pl.touk.nussknacker.engine.api.process.VersionId
import pl.touk.nussknacker.restmodel.scenariodetails.{
LegacyScenarioStatusNameDto,
ScenarioStatusDto,
ScenarioStatusNameWrapperDto,
ScenarioWithDetails
}
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
Expand All @@ -30,8 +30,7 @@ class ScenarioStatusPresenter(dispatcher: DeploymentManagerDispatcher) {
)
)
ScenarioStatusDto(
statusName = scenarioStatus.name,
status = LegacyScenarioStatusNameDto(scenarioStatus.name),
status = ScenarioStatusNameWrapperDto(scenarioStatus.name),
visibleActions = presentation.visibleActions,
allowedActions = presentation.allowedActions.toList.sortBy(_.value),
actionTooltips = presentation.actionTooltips,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import scala.util.{Failure, Success}
// Responsibility of this class is to wrap deployment actions with persistent, transactional context.
// It ensures that all actions are done consistently: do validations and ensures that only allowed actions
// will be executed in given state. It sends notifications about finished actions.
// Also thanks to it we are able to check if state on remote engine is the same as persisted state.
// Also thanks to that, we are able to check if state on remote engine is the same as persisted state.
class ActionService(
dispatcher: DeploymentManagerDispatcher,
processRepository: FetchingProcessRepository[DB],
actionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
Expand Down Expand Up @@ -138,18 +137,12 @@ class ActionService(
p => Some(modelInfos.forProcessingTypeUnsafe(p.processingType))
)

def processAction[COMMAND <: ScenarioCommand[RESULT], RESULT](
command: COMMAND,
actionName: ScenarioActionName,
dmCommandCreator: CommandContext[LatestScenarioDetailsShape] => DMScenarioCommand[RESULT],
def processAction[COMMAND <: ScenarioCommand[RESULT], RESULT](command: COMMAND, actionName: ScenarioActionName)(
runAction: CommandContext[LatestScenarioDetailsShape] => Future[RESULT],
): Future[RESULT] = {
import command.commonData._
processActionWithCustomFinalization[COMMAND, RESULT](command, actionName) { case (ctx, actionFinalizer) =>
val dmCommand = dmCommandCreator(ctx)
actionFinalizer.handleResult {
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(dmCommand)
runAction(ctx)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,34 @@ class DeploymentService(
// TODO: This inconsistent action-state handling needs a fix.
actionService
.actionProcessorForVersion[Unit](_.lastDeployedAction.map(_.processVersionId))
.processAction(
command = command,
actionName = ScenarioActionName.Cancel,
dmCommandCreator = _ =>
DMCancelScenarioCommand(
command.commonData.processIdWithName.name,
command.commonData.user.toManagerUser
.processAction[CancelScenarioCommand, Unit](command = command, actionName = ScenarioActionName.Cancel) { ctx =>
import command.commonData._
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(
DMCancelScenarioCommand(command.commonData.processIdWithName.name, command.commonData.user.toManagerUser)
)
)
}
}

private def runOffSchedule(command: RunOffScheduleCommand): Future[RunOffScheduleResult] = {
actionService
.actionProcessorForLatestVersion[CanonicalProcess]
.processAction(
.processAction[RunOffScheduleCommand, RunOffScheduleResult](
command = command,
actionName = ScenarioActionName.RunOffSchedule,
dmCommandCreator = ctx =>
DMRunOffScheduleCommand(
ctx.latestScenarioDetails.toEngineProcessVersion,
ctx.latestScenarioDetails.json,
command.commonData.user.toManagerUser,
actionName = ScenarioActionName.RunOffSchedule
) { ctx =>
import command.commonData._
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(
DMRunOffScheduleCommand(
ctx.latestScenarioDetails.toEngineProcessVersion,
ctx.latestScenarioDetails.json,
command.commonData.user.toManagerUser,
)
)
)
}
}

private def runDeployment(command: RunDeploymentCommand): Future[Future[Option[ExternalDeploymentId]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ class AkkaHttpBasedRouteProvider(
dbioRunner,
)
val actionService = new ActionService(
dmDispatcher,
processRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object UsageStatisticsReportsSettingsService extends LazyLogging {
isFragment = scenario.isFragment,
processingMode = scenario.processingMode,
deploymentManagerType = deploymentManagerTypeByProcessingType(scenario.processingType),
status = scenario.state.map(_.statusName),
status = scenario.state.map(_.status.name),
nodesCount = scenario.scenarioGraph.map(_.nodes.length).getOrElse(0),
scenarioCategory = scenario.processCategory,
scenarioVersion = scenario.processVersionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ trait NuResourcesTest
protected val scenarioStatusPresenter = new ScenarioStatusPresenter(dmDispatcher)

protected val actionService: ActionService = new ActionService(
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class ManagementResourcesSpec
status shouldBe StatusCodes.Conflict
}
getProcess(invalidScenario.name) ~> check {
decodeDetails.state.value.statusName shouldEqual SimpleStateStatus.NotDeployed.name
decodeDetails.state.value.status.name shouldEqual SimpleStateStatus.NotDeployed.name
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ class NotificationServiceTest
dbioRunner
)
val actionService = new ActionService(
managerDispatcher,
dbProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class DeploymentServiceSpec

private def createActionService(deploymentCommentSettings: Option[DeploymentCommentSettings]) = {
new ActionService(
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pl.touk.nussknacker.ui.process.deployment

import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment.DeploymentManager
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.test.mock.TestProcessChangeListener
import pl.touk.nussknacker.test.utils.domain.TestFactory
import pl.touk.nussknacker.test.utils.domain.TestFactory._
import pl.touk.nussknacker.ui.api.DeploymentCommentSettings
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentStatusesProvider
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.ScenarioStatusProvider
import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun
import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState}

import java.time.Clock
import scala.concurrent.duration.FiniteDuration

object TestDeploymentServiceFactory {

val processingType = "streaming"

def create(
testDbRef: DbRef,
// deploymentManager is as
getDeploymentManager: () => DeploymentManager,
scenarioStateTimeout: Option[FiniteDuration] = None,
deploymentCommentSettings: Option[DeploymentCommentSettings] = None
)(implicit actorSystem: ActorSystem): TestDeploymentServiceServices = {
import actorSystem.dispatcher
val clock = Clock.systemUTC()
val dbioRunner = newDBIOActionRunner(testDbRef)
val fetchingProcessRepository = newFetchingProcessRepository(testDbRef)
val actionRepository = newActionProcessRepository(testDbRef)

val processingTypeDataProvider: ProcessingTypeDataProvider[DeploymentManager, Nothing] =
new ProcessingTypeDataProvider[DeploymentManager, Nothing] {

override val state: ProcessingTypeDataState[DeploymentManager, Nothing] =
new ProcessingTypeDataState[DeploymentManager, Nothing] {

override def all: Map[ProcessingType, ValueWithRestriction[DeploymentManager]] = {
Map(
processingType -> ValueWithRestriction.anyUser(getDeploymentManager())
)
}

override def getCombined: () => Nothing = noCombinedDataFun
override def stateIdentity: Any = getDeploymentManager()
}

}

val dmDispatcher = {
val futureFetchingProcessRepository = newFutureFetchingScenarioRepository(testDbRef)
new DeploymentManagerDispatcher(processingTypeDataProvider, futureFetchingProcessRepository)
}

val scenarioStatusProvider = {
val deploymentsStatusesProvider =
new DeploymentStatusesProvider(dmDispatcher, scenarioStateTimeout)
new ScenarioStatusProvider(
deploymentsStatusesProvider,
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner
)
}

val actionService = {
val listener = new TestProcessChangeListener
new ActionService(
fetchingProcessRepository,
actionRepository,
dbioRunner,
listener,
scenarioStatusProvider,
deploymentCommentSettings,
modelInfoProvider,
clock
)
}

val deploymentService = new DeploymentService(
dmDispatcher,
processValidatorByProcessingType,
TestFactory.scenarioResolverByProcessingType,
actionService,
additionalComponentConfigsByProcessingType,
)
TestDeploymentServiceServices(scenarioStatusProvider, actionService, deploymentService)
}

}

case class TestDeploymentServiceServices(
scenarioStatusProvider: ScenarioStatusProvider,
actionService: ActionService,
deploymentService: DeploymentService
)
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class BaseK8sDeploymentManagerTest
eventually {
val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue
logger.debug(s"Current process state: $state")
state.flatMap(_.version) shouldBe List(version)
state.flatMap(_.version) shouldBe List(version.versionId)
state.map(_.status) shouldBe List(SimpleStateStatus.Running)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class K8sDeploymentManagerKafkaTest
def waitForRunning(version: ProcessVersion) = {
eventually {
val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue
state.flatMap(_.version) shouldBe List(version)
state.flatMap(_.version) shouldBe List(version.versionId)
state.map(_.status) shouldBe List(SimpleStateStatus.Running)
}
}
Expand Down

0 comments on commit 5cf00fd

Please sign in to comment.