Skip to content

Commit

Permalink
DeploymentStatusDetails: removed externalDeploymentId and startTime
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 19, 2025
1 parent acf8e43 commit 28d2486
Show file tree
Hide file tree
Showing 36 changed files with 359 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait DeploymentsStatusesQueryForAllScenariosSupported extends DeploymentsStatus

}

case object NoDeploymentsStatusesQueryForAllScenariosSupport$ extends DeploymentsStatusesQueryForAllScenariosSupport
case object NoDeploymentsStatusesQueryForAllScenariosSupport extends DeploymentsStatusesQueryForAllScenariosSupport

sealed trait DeploymentSynchronisationSupport

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package pl.touk.nussknacker.engine.api.deployment

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId}
import pl.touk.nussknacker.engine.deployment.DeploymentId

case class DeploymentStatusDetails(
status: StateStatus,
// It is optional because some deployment managers (k8s) don't support it
deploymentId: Option[DeploymentId],
// TODO: This field is used only internally in FlinkDeploymentManager - it should be removed from the interface
// In most cases it holds exactly the same information as deploymentId -
// the only difference is scheduling mechanism where deploymentId holds Long and can't be used as Flink's JobID
externalDeploymentId: Option[ExternalDeploymentId] = None,
version: Option[ProcessVersion] = None,
// TODO: This field is used only internally in FlinkDeploymentManager - it should be removed from the interface
startTime: Option[Long] = None,
version: Option[ProcessVersion],
) {

def deploymentIdUnsafe: DeploymentId =
deploymentId.getOrElse(throw new IllegalStateException(s"deploymentId is missing"))

def externalDeploymentIdUnsafe: ExternalDeploymentId =
externalDeploymentId.getOrElse(throw new IllegalStateException(s"externalDeploymentId is missing"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SimpleStateStatus {
tooltip = Some(
nonFinalDeploymentIds
.map { case (deploymentId, deploymentStatus) =>
s"$deploymentId-$deploymentStatus"
s"$deploymentId - $deploymentStatus"
}
.mkString("Expected one job, instead: ", ", ", "")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymen
scenarioName: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[DeploymentStatusDetails]]] = {
Future.successful(
WithDataFreshnessStatus.fresh(scenarioStatusMap.get(scenarioName).map(DeploymentStatusDetails(_, None)).toList)
WithDataFreshnessStatus.fresh(
scenarioStatusMap.get(scenarioName).map(DeploymentStatusDetails(_, None, None)).toList
)
)
}

Expand All @@ -53,7 +55,7 @@ class DeploymentManagerStub(implicit ec: ExecutionContext) extends BaseDeploymen
override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport =
NoDeploymentsStatusesQueryForAllScenariosSupport$
NoDeploymentsStatusesQueryForAllScenariosSupport

override def schedulingSupport: SchedulingSupport = NoSchedulingSupport

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatestplus.mockito.MockitoSugar
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.engine.deployment.DeploymentId
import pl.touk.nussknacker.test.PatientScalaFutures

import java.util.UUID
Expand All @@ -30,7 +30,7 @@ class CachingProcessStateDeploymentManagerSpec
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport$,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
)

Expand All @@ -50,7 +50,7 @@ class CachingProcessStateDeploymentManagerSpec
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport$,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
)

Expand All @@ -69,7 +69,7 @@ class CachingProcessStateDeploymentManagerSpec
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoDeploymentsStatusesQueryForAllScenariosSupport$,
NoDeploymentsStatusesQueryForAllScenariosSupport,
NoSchedulingSupport,
)

Expand All @@ -87,7 +87,7 @@ class CachingProcessStateDeploymentManagerSpec
def getProcessStatesDeploymentIdNow(freshnessPolicy: DataFreshnessPolicy): WithDataFreshnessStatus[List[String]] =
dm.getScenarioDeploymentsStatuses(ProcessName("foo"))(freshnessPolicy)
.futureValue
.map(_.map(_.externalDeploymentId.value.value))
.map(_.map(_.deploymentId.value.value))

}

Expand All @@ -97,8 +97,8 @@ class CachingProcessStateDeploymentManagerSpec
_: InvocationOnMock =>
val randomState = DeploymentStatusDetails(
SimpleStateStatus.Running,
deploymentId = None,
externalDeploymentId = Some(ExternalDeploymentId(UUID.randomUUID().toString))
deploymentId = Some(DeploymentId(UUID.randomUUID().toString)),
version = None,
)
Future.successful(WithDataFreshnessStatus.fresh(List(randomState)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DeploymentStatusesProvider(dispatcher: DeploymentManagerDispatcher, scenar
manager <- dispatcher.deploymentManager(processingType)
managerWithCapability <- manager.deploymentsStatusesQueryForAllScenariosSupport match {
case supported: DeploymentsStatusesQueryForAllScenariosSupported => Some(supported)
case NoDeploymentsStatusesQueryForAllScenariosSupport$ => None
case NoDeploymentsStatusesQueryForAllScenariosSupport => None
}
} yield getAllDeploymentStatuses(processingType, managerWithCapability))
.getOrElse(Future.successful(None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import pl.touk.nussknacker.ui.process.repository.PeriodicProcessesRepository
import java.time.{Clock, Instant}
import scala.concurrent.{ExecutionContext, Future}

// FIXME abr: test on DeploymentService / ScenarioStatusProvider level
object PeriodicDeploymentManager {

def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ class PeriodicProcessService(

}

case NoDeploymentsStatusesQueryForAllScenariosSupport$ =>
NoDeploymentsStatusesQueryForAllScenariosSupport$
case NoDeploymentsStatusesQueryForAllScenariosSupport =>
NoDeploymentsStatusesQueryForAllScenariosSupport
}

private def mergeStatusWithDeployments(
Expand Down Expand Up @@ -779,6 +779,7 @@ object PeriodicProcessService {
DeploymentStatusDetails(
status = toPeriodicProcessStatusWithMergedStatus(mergedStatus),
deploymentId = periodicDeploymentIdOpt.map(_.toString).map(DeploymentId(_)),
version = None
)

pickMostImportantActiveDeployment(activeDeploymentsStatuses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ object InvalidDeploymentManagerStub extends DeploymentManager {

private val stubbedStatus = DeploymentStatusDetails(
ProblemStateStatus("Error in deployment configuration", allowedActions = Set.empty),
deploymentId = None
deploymentId = None,
version = None
)

override def getScenarioDeploymentsStatuses(scenarioName: ProcessName)(
Expand All @@ -35,7 +36,7 @@ object InvalidDeploymentManagerStub extends DeploymentManager {
override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport =
NoDeploymentsStatusesQueryForAllScenariosSupport$
NoDeploymentsStatusesQueryForAllScenariosSupport

override def schedulingSupport: SchedulingSupport = NoSchedulingSupport

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cats.data.Validated.valid
import cats.data.ValidatedNel
import cats.effect.unsafe.IORuntime
import com.typesafe.config.Config
import org.apache.flink.api.common.{JobID, JobStatus}
import org.apache.flink.configuration.Configuration
import sttp.client3.testing.SttpBackendStub
import pl.touk.nussknacker.engine._
Expand All @@ -16,13 +17,14 @@ import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterFactory
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.ScenarioStateVerificationConfig
import pl.touk.nussknacker.engine.management.jobrunner.FlinkScenarioJobRunner
import pl.touk.nussknacker.engine.management.rest.flinkRestModel.{JobOverview, JobTasksOverview}
import pl.touk.nussknacker.engine.management.{FlinkConfig, FlinkDeploymentManager, FlinkDeploymentManagerProvider}
import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader}
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.mock.MockDeploymentManager.{
sampleCustomActionActivity,
sampleDeploymentId,
sampleStatusDetails
sampleDeploymentStatusDetails
}
import pl.touk.nussknacker.test.utils.domain.TestFactory
import pl.touk.nussknacker.ui.process.periodic.flink.FlinkClientStub
Expand All @@ -38,7 +40,7 @@ import scala.util.Try
class MockDeploymentManager private (
modelData: ModelData,
deploymentManagerDependencies: DeploymentManagerDependencies,
defaultProcessStateStatus: StateStatus,
defaultDeploymentStatus: StateStatus,
scenarioActivityManager: ScenarioActivityManager,
customProcessStateDefinitionManager: Option[ProcessStateDefinitionManager],
closeCreatedDeps: () => Unit,
Expand Down Expand Up @@ -75,16 +77,33 @@ class MockDeploymentManager private (
case None => super.processStateDefinitionManager
}

override def getScenarioDeploymentsStatuses(
override protected def getScenarioDeploymentsStatusesWithJobOverview(
scenarioName: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[DeploymentStatusDetails]]] = {
)(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[List[(DeploymentStatusDetails, JobOverview)]]] = {
Future {
Thread.sleep(delayBeforeStateReturn.toMillis)
WithDataFreshnessStatus.fresh(
managerProcessStates.getOrDefault(
scenarioName,
List(sampleStatusDetails(defaultProcessStateStatus, sampleDeploymentId))
)
managerProcessStates
.getOrDefault(
scenarioName,
List(sampleDeploymentStatusDetails(defaultDeploymentStatus, sampleDeploymentId))
)
.map { deploymentStatus =>
val tasksOverview = JobTasksOverview(1, 0, 0, 0, 1, 0, 0, 0, 0, 0, None)
val deploymentIdUuid =
deploymentStatus.deploymentId.map(id => UUID.fromString(id.value)).getOrElse(UUID.randomUUID())
val jobOverview = JobOverview(
new JobID(deploymentIdUuid.getLeastSignificantBits, deploymentIdUuid.getLeastSignificantBits),
"not-important",
-1,
-1,
JobStatus.RUNNING.name(),
tasksOverview
)
(deploymentStatus, jobOverview)
}
)
}
}
Expand All @@ -105,7 +124,7 @@ class MockDeploymentManager private (
// We override this field, because currently, this mock returns fallback for not defined scenarios states.
// To make deploymentsStatusesQueryForAllScenariosSupport consistent with this approach, we should remove this fallback.
override def deploymentsStatusesQueryForAllScenariosSupport: DeploymentsStatusesQueryForAllScenariosSupport =
NoDeploymentsStatusesQueryForAllScenariosSupport$
NoDeploymentsStatusesQueryForAllScenariosSupport

override def close(): Unit = {
super.close()
Expand All @@ -120,7 +139,7 @@ object FlinkScenarioJobRunnerStub extends FlinkScenarioJobRunner {
override def runScenarioJob(
command: DMRunDeploymentCommand,
savepointPathOpt: Option[String]
): Future[Option[ExternalDeploymentId]] =
): Future[Option[JobID]] =
Future.failed(new IllegalAccessException("This implementation shouldn't be used"))

}
Expand Down Expand Up @@ -170,12 +189,12 @@ object MockDeploymentManager {
)
}

private[mock] def sampleStatusDetails(
private[mock] def sampleDeploymentStatusDetails(
status: StateStatus,
deploymentId: DeploymentId,
version: Option[ProcessVersion] = Some(ProcessVersion.empty)
): DeploymentStatusDetails =
DeploymentStatusDetails(status, Some(deploymentId), Some(ExternalDeploymentId("1")), version)
DeploymentStatusDetails(status, Some(deploymentId), version)

// Pass correct deploymentId
private[mock] def sampleDeploymentId: DeploymentId = DeploymentId(UUID.randomUUID().toString)
Expand Down Expand Up @@ -268,13 +287,13 @@ object MockDeploymentManagerSyntaxSugar {
status: StateStatus,
deploymentId: DeploymentId = sampleDeploymentId
)(action: => T): T = {
withProcessStates(processName, List(sampleStatusDetails(status, deploymentId)))(action)
withProcessStates(processName, List(sampleDeploymentStatusDetails(status, deploymentId)))(action)
}

def withProcessStateVersion[T](processName: ProcessName, status: StateStatus, version: Option[ProcessVersion])(
action: => T
): T = {
withProcessStates(processName, List(sampleStatusDetails(status, sampleDeploymentId, version)))(action)
withProcessStates(processName, List(sampleDeploymentStatusDetails(status, sampleDeploymentId, version)))(action)
}

def withEmptyProcessState[T](processName: ProcessName)(action: => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.Proble
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.{Comment, ProcessVersion}
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId}
import pl.touk.nussknacker.engine.deployment.DeploymentId
import pl.touk.nussknacker.test.base.db.WithHsqlDbTesting
import pl.touk.nussknacker.test.base.it.WithClock
import pl.touk.nussknacker.test.mock.MockDeploymentManagerSyntaxSugar.Ops
Expand Down Expand Up @@ -547,10 +547,9 @@ class DeploymentServiceSpec

val state =
DeploymentStatusDetails(
SimpleStateStatus.Restarting,
None,
Some(ExternalDeploymentId("12")),
Some(ProcessVersion.empty)
status = SimpleStateStatus.Restarting,
deploymentId = None,
version = Some(ProcessVersion.empty)
)

deploymentManager.withProcessStates(processName, List(state)) {
Expand Down
Loading

0 comments on commit 28d2486

Please sign in to comment.