Skip to content

Commit

Permalink
TestDeploymentServiceFactory, removed ActionService -> DeploymentMana…
Browse files Browse the repository at this point in the history
…gerDispatcher dependency
  • Loading branch information
arkadius committed Feb 19, 2025
1 parent ac015aa commit b0ebb97
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import java.net.URI
* May contain longer, detailed status description.
*/
@JsonCodec case class ScenarioStatusDto(
// This field is not used by frontend but is useful for scripting
statusName: String,
// TODO it is a temporary solution - it should be removed after full migration to statusName
status: LegacyScenarioStatusNameDto,
// TODO: flatten it
status: ScenarioStatusNameWrapperDto,
visibleActions: List[ScenarioActionName],
allowedActions: List[ScenarioActionName],
actionTooltips: Map[ScenarioActionName, String],
Expand All @@ -31,7 +29,7 @@ import java.net.URI
description: String,
)

@JsonCodec case class LegacyScenarioStatusNameDto(name: String)
@JsonCodec case class ScenarioStatusNameWrapperDto(name: String)

object ScenarioStatusDto {
implicit val uriEncoder: Encoder[URI] = Encoder.encodeString.contramap(_.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.util.ExecutionContextWithIORuntime
import pl.touk.nussknacker.engine.util.Implicits.RichTupleList
import pl.touk.nussknacker.engine.version.BuildInfo
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioStatusDto
import pl.touk.nussknacker.restmodel.scenariodetails.{ScenarioStatusDto, ScenarioStatusNameWrapperDto}
import pl.touk.nussknacker.ui.api.description.AppApiEndpoints
import pl.touk.nussknacker.ui.api.description.AppApiEndpoints.Dtos._
import pl.touk.nussknacker.ui.process.ProcessService.GetScenarioWithDetailsOptions
Expand Down Expand Up @@ -172,8 +172,12 @@ class AppApiHttpService(
GetScenarioWithDetailsOptions.detailsOnly.copy(fetchState = true)
)
statusMap = processes.flatMap(process => process.state.map(process.name -> _)).toMap
// TODO: we should use domain objects instead of DTOs
withProblem = statusMap.collect {
case (name, processStatus @ ScenarioStatusDto(ProblemStateStatus.name, _, _, _, _, _, _, _)) =>
case (
name,
processStatus @ ScenarioStatusDto(ScenarioStatusNameWrapperDto(ProblemStateStatus.name), _, _, _, _, _, _)
) =>
(name, processStatus)
}
} yield withProblem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import pl.touk.nussknacker.engine.api.deployment.ProcessStateDefinitionManager.S
import pl.touk.nussknacker.engine.api.deployment.StateStatus
import pl.touk.nussknacker.engine.api.process.VersionId
import pl.touk.nussknacker.restmodel.scenariodetails.{
LegacyScenarioStatusNameDto,
ScenarioStatusDto,
ScenarioStatusNameWrapperDto,
ScenarioWithDetails
}
import pl.touk.nussknacker.ui.process.deployment.DeploymentManagerDispatcher
Expand All @@ -30,8 +30,7 @@ class ScenarioStatusPresenter(dispatcher: DeploymentManagerDispatcher) {
)
)
ScenarioStatusDto(
statusName = scenarioStatus.name,
status = LegacyScenarioStatusNameDto(scenarioStatus.name),
status = ScenarioStatusNameWrapperDto(scenarioStatus.name),
visibleActions = presentation.visibleActions,
allowedActions = presentation.allowedActions.toList.sortBy(_.value),
actionTooltips = presentation.actionTooltips,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import scala.util.{Failure, Success}
// Responsibility of this class is to wrap deployment actions with persistent, transactional context.
// It ensures that all actions are done consistently: do validations and ensures that only allowed actions
// will be executed in given state. It sends notifications about finished actions.
// Also thanks to it we are able to check if state on remote engine is the same as persisted state.
// Also thanks to that, we are able to check if state on remote engine is the same as persisted state.
class ActionService(
dispatcher: DeploymentManagerDispatcher,
processRepository: FetchingProcessRepository[DB],
actionRepository: ScenarioActionRepository,
dbioRunner: DBIOActionRunner,
Expand Down Expand Up @@ -138,18 +137,12 @@ class ActionService(
p => Some(modelInfos.forProcessingTypeUnsafe(p.processingType))
)

def processAction[COMMAND <: ScenarioCommand[RESULT], RESULT](
command: COMMAND,
actionName: ScenarioActionName,
dmCommandCreator: CommandContext[LatestScenarioDetailsShape] => DMScenarioCommand[RESULT],
def processAction[COMMAND <: ScenarioCommand[RESULT], RESULT](command: COMMAND, actionName: ScenarioActionName)(
runAction: CommandContext[LatestScenarioDetailsShape] => Future[RESULT],
): Future[RESULT] = {
import command.commonData._
processActionWithCustomFinalization[COMMAND, RESULT](command, actionName) { case (ctx, actionFinalizer) =>
val dmCommand = dmCommandCreator(ctx)
actionFinalizer.handleResult {
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(dmCommand)
runAction(ctx)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,34 @@ class DeploymentService(
// TODO: This inconsistent action-state handling needs a fix.
actionService
.actionProcessorForVersion[Unit](_.lastDeployedAction.map(_.processVersionId))
.processAction(
command = command,
actionName = ScenarioActionName.Cancel,
dmCommandCreator = _ =>
DMCancelScenarioCommand(
command.commonData.processIdWithName.name,
command.commonData.user.toManagerUser
.processAction[CancelScenarioCommand, Unit](command = command, actionName = ScenarioActionName.Cancel) { ctx =>
import command.commonData._
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(
DMCancelScenarioCommand(command.commonData.processIdWithName.name, command.commonData.user.toManagerUser)
)
)
}
}

private def runOffSchedule(command: RunOffScheduleCommand): Future[RunOffScheduleResult] = {
actionService
.actionProcessorForLatestVersion[CanonicalProcess]
.processAction(
.processAction[RunOffScheduleCommand, RunOffScheduleResult](
command = command,
actionName = ScenarioActionName.RunOffSchedule,
dmCommandCreator = ctx =>
DMRunOffScheduleCommand(
ctx.latestScenarioDetails.toEngineProcessVersion,
ctx.latestScenarioDetails.json,
command.commonData.user.toManagerUser,
actionName = ScenarioActionName.RunOffSchedule
) { ctx =>
import command.commonData._
dispatcher
.deploymentManagerUnsafe(ctx.latestScenarioDetails.processingType)
.processCommand(
DMRunOffScheduleCommand(
ctx.latestScenarioDetails.toEngineProcessVersion,
ctx.latestScenarioDetails.json,
command.commonData.user.toManagerUser,
)
)
)
}
}

private def runDeployment(command: RunDeploymentCommand): Future[Future[Option[ExternalDeploymentId]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ class AkkaHttpBasedRouteProvider(
dbioRunner,
)
val actionService = new ActionService(
dmDispatcher,
processRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object UsageStatisticsReportsSettingsService extends LazyLogging {
isFragment = scenario.isFragment,
processingMode = scenario.processingMode,
deploymentManagerType = deploymentManagerTypeByProcessingType(scenario.processingType),
status = scenario.state.map(_.statusName),
status = scenario.state.map(_.status.name),
nodesCount = scenario.scenarioGraph.map(_.nodes.length).getOrElse(0),
scenarioCategory = scenario.processCategory,
scenarioVersion = scenario.processVersionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ trait NuResourcesTest
protected val scenarioStatusPresenter = new ScenarioStatusPresenter(dmDispatcher)

protected val actionService: ActionService = new ActionService(
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class ManagementResourcesSpec
status shouldBe StatusCodes.Conflict
}
getProcess(invalidScenario.name) ~> check {
decodeDetails.state.value.statusName shouldEqual SimpleStateStatus.NotDeployed.name
decodeDetails.state.value.status.name shouldEqual SimpleStateStatus.NotDeployed.name
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ class NotificationServiceTest
dbioRunner
)
val actionService = new ActionService(
managerDispatcher,
dbProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentStat
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.{FragmentStateException, ScenarioStatusProvider}
import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub
import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun
import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState}
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction
import pl.touk.nussknacker.ui.process.repository.{CommentValidationError, DBIOActionRunner}
import pl.touk.nussknacker.ui.security.api.LoggedUser
Expand Down Expand Up @@ -64,50 +63,35 @@ class DeploymentServiceSpec
private implicit val user: LoggedUser = TestFactory.adminUser("user")
private implicit val ds: ExecutionContextExecutor = system.dispatcher

private var deploymentManager: MockDeploymentManager = _
override protected val dbioRunner: DBIOActionRunner = newDBIOActionRunner(testDbRef)
private val fetchingProcessRepository = newFetchingProcessRepository(testDbRef)
private val futureFetchingProcessRepository = newFutureFetchingScenarioRepository(testDbRef)
private val writeProcessRepository = newWriteProcessRepository(testDbRef, clock)
private val actionRepository = newActionProcessRepository(testDbRef)
private val activityRepository = newScenarioActivityRepository(testDbRef, clock)
override protected val dbioRunner: DBIOActionRunner = newDBIOActionRunner(testDbRef)
private val fetchingProcessRepository = newFetchingProcessRepository(testDbRef)
private val futureFetchingProcessRepository = newFutureFetchingScenarioRepository(testDbRef)
private val writeProcessRepository = newWriteProcessRepository(testDbRef, clock)
private val actionRepository = newActionProcessRepository(testDbRef)
private val activityRepository = newScenarioActivityRepository(testDbRef, clock)

private val processingTypeDataProvider: ProcessingTypeDataProvider[DeploymentManager, Nothing] =
new ProcessingTypeDataProvider[DeploymentManager, Nothing] {

override val state: ProcessingTypeDataState[DeploymentManager, Nothing] =
new ProcessingTypeDataState[DeploymentManager, Nothing] {

override def all: Map[ProcessingType, ValueWithRestriction[DeploymentManager]] = Map(
"streaming" -> ValueWithRestriction.anyUser(deploymentManager)
)
private val listener = new TestProcessChangeListener
private val actionService = createActionService(deploymentCommentSettings = None)

override def getCombined: () => Nothing = noCombinedDataFun
override def stateIdentity: Any = deploymentManager
}
private val deploymentManager: MockDeploymentManager = MockDeploymentManager.create(
defaultProcessStateStatus = SimpleStateStatus.Running,
scenarioActivityManager = new RepositoryBasedScenarioActivityManager(activityRepository, dbioRunner),
)

}
private val processingTypeDataProvider =
ProcessingTypeDataProvider.withEmptyCombinedData(
Map("streaming" -> ValueWithRestriction.anyUser(deploymentManager))
)

private val dmDispatcher =
new DeploymentManagerDispatcher(processingTypeDataProvider, futureFetchingProcessRepository)

private val listener = new TestProcessChangeListener

private val scenarioStatusProvider = createScenarioStatusProvider(scenarioStateTimeout = None)

private val actionService = createActionService(deploymentCommentSettings = None)

private val deploymentService = createDeploymentService()

private val initialVersionId = ProcessVersion.empty.versionId

deploymentManager = MockDeploymentManager.create(
defaultProcessStateStatus = SimpleStateStatus.Running,
deployedScenariosProvider = DefaultProcessingTypeDeployedScenariosProvider(testDbRef, "streaming"),
actionService = new DefaultProcessingTypeActionService("streaming", actionService),
scenarioActivityManager = new RepositoryBasedScenarioActivityManager(activityRepository, dbioRunner),
)

private def createDeploymentService(
deploymentCommentSettings: Option[DeploymentCommentSettings] = None,
) = {
Expand All @@ -123,7 +107,6 @@ class DeploymentServiceSpec

private def createActionService(deploymentCommentSettings: Option[DeploymentCommentSettings]) = {
new ActionService(
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pl.touk.nussknacker.ui.process.deployment

import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment.DeploymentManager
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.test.mock.TestProcessChangeListener
import pl.touk.nussknacker.test.utils.domain.TestFactory
import pl.touk.nussknacker.test.utils.domain.TestFactory._
import pl.touk.nussknacker.ui.api.DeploymentCommentSettings
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.process.deployment.deploymentstatus.DeploymentStatusesProvider
import pl.touk.nussknacker.ui.process.deployment.scenariostatus.ScenarioStatusProvider
import pl.touk.nussknacker.ui.process.processingtype.ValueWithRestriction
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider.noCombinedDataFun
import pl.touk.nussknacker.ui.process.processingtype.provider.{ProcessingTypeDataProvider, ProcessingTypeDataState}

import java.time.Clock
import scala.concurrent.duration.FiniteDuration

object TestDeploymentServiceFactory {

val processingType = "streaming"

def create(
testDbRef: DbRef,
// deploymentManager is as
getDeploymentManager: () => DeploymentManager,
scenarioStateTimeout: Option[FiniteDuration] = None,
deploymentCommentSettings: Option[DeploymentCommentSettings] = None
)(implicit actorSystem: ActorSystem): TestDeploymentServiceServices = {
import actorSystem.dispatcher
val clock = Clock.systemUTC()
val dbioRunner = newDBIOActionRunner(testDbRef)
val fetchingProcessRepository = newFetchingProcessRepository(testDbRef)
val actionRepository = newActionProcessRepository(testDbRef)

val processingTypeDataProvider: ProcessingTypeDataProvider[DeploymentManager, Nothing] =
new ProcessingTypeDataProvider[DeploymentManager, Nothing] {

override val state: ProcessingTypeDataState[DeploymentManager, Nothing] =
new ProcessingTypeDataState[DeploymentManager, Nothing] {

override def all: Map[ProcessingType, ValueWithRestriction[DeploymentManager]] = {
Map(
processingType -> ValueWithRestriction.anyUser(getDeploymentManager())
)
}

override def getCombined: () => Nothing = noCombinedDataFun
override def stateIdentity: Any = getDeploymentManager()
}

}

val dmDispatcher = {
val futureFetchingProcessRepository = newFutureFetchingScenarioRepository(testDbRef)
new DeploymentManagerDispatcher(processingTypeDataProvider, futureFetchingProcessRepository)
}

val scenarioStatusProvider = {
val deploymentsStatusesProvider =
new DeploymentStatusesProvider(dmDispatcher, scenarioStateTimeout)
new ScenarioStatusProvider(
deploymentsStatusesProvider,
dmDispatcher,
fetchingProcessRepository,
actionRepository,
dbioRunner
)
}

val actionService = {
val listener = new TestProcessChangeListener
new ActionService(
fetchingProcessRepository,
actionRepository,
dbioRunner,
listener,
scenarioStatusProvider,
deploymentCommentSettings,
modelInfoProvider,
clock
)
}

val deploymentService = new DeploymentService(
dmDispatcher,
processValidatorByProcessingType,
TestFactory.scenarioResolverByProcessingType,
actionService,
additionalComponentConfigsByProcessingType,
)
TestDeploymentServiceServices(scenarioStatusProvider, actionService, deploymentService)
}

}

case class TestDeploymentServiceServices(
scenarioStatusProvider: ScenarioStatusProvider,
actionService: ActionService,
deploymentService: DeploymentService
)
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class BaseK8sDeploymentManagerTest
eventually {
val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue
logger.debug(s"Current process state: $state")
state.flatMap(_.version) shouldBe List(version)
state.flatMap(_.version) shouldBe List(version.versionId)
state.map(_.status) shouldBe List(SimpleStateStatus.Running)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class K8sDeploymentManagerKafkaTest
def waitForRunning(version: ProcessVersion) = {
eventually {
val state = manager.getScenarioDeploymentsStatuses(version.processName).map(_.value).futureValue
state.flatMap(_.version) shouldBe List(version)
state.flatMap(_.version) shouldBe List(version.versionId)
state.map(_.status) shouldBe List(SimpleStateStatus.Running)
}
}
Expand Down

0 comments on commit b0ebb97

Please sign in to comment.