Skip to content

Commit

Permalink
[NU-1979] Scenario testing improvement: configurable timeout added (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius authored Feb 7, 2025
1 parent 85792ca commit 8c1c554
Show file tree
Hide file tree
Showing 43 changed files with 557 additions and 339 deletions.
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1127,8 +1127,6 @@ lazy val testUtils = (project in utils("test-utils"))
"com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
"com.softwaremill.sttp.tapir" %% "tapir-apispec-docs" % tapirV,
"com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV,
// for patience -> retry conversion
"com.softwaremill.retry" %% "retry" % retryV,
) ++ restAssuredDependency(scalaVersion.value)
}
)
Expand Down Expand Up @@ -1211,9 +1209,11 @@ lazy val flinkMiniCluster = (project in flink("minicluster"))
}
)
.dependsOn(
extensionsApi % Provided,
utilsInternal % Provided,
testUtils % Test,
extensionsApi % Provided,
utilsInternal % Provided,
// For ResultsCollectingListener purpose
scenarioCompiler % Provided,
testUtils % Test,
)

lazy val flinkTestUtils = (project in flink("test-utils"))
Expand Down Expand Up @@ -1911,6 +1911,7 @@ lazy val deploymentManagerApi = (project in file("designer/deployment-manager-ap
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"com.softwaremill.sttp.client3" %% "core" % sttpV,
"com.github.ben-manes.caffeine" % "caffeine" % caffeineCacheV,
"org.scalatestplus" %% "mockito-5-10" % scalaTestPlusV % Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine

import akka.actor.ActorSystem
import cats.effect.unsafe.IORuntime
import pl.touk.nussknacker.engine.api.deployment.{
ProcessingTypeActionService,
ProcessingTypeDeployedScenariosProvider,
Expand All @@ -16,11 +17,13 @@ case class DeploymentManagerDependencies(
actionService: ProcessingTypeActionService,
scenarioActivityManager: ScenarioActivityManager,
executionContext: ExecutionContext,
ioRuntime: IORuntime,
actorSystem: ActorSystem,
sttpBackend: SttpBackend[Future, Any],
configsFromProvider: Map[DesignerWideComponentId, ComponentAdditionalConfig] = Map.empty
) {
implicit def implicitExecutionContext: ExecutionContext = executionContext
implicit def implicitIORuntime: IORuntime = ioRuntime
implicit def implicitActorSystem: ActorSystem = actorSystem
implicit def implicitSttpBackend: SttpBackend[Future, Any] = sttpBackend
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ import pl.touk.nussknacker.ui.process.newdeployment.synchronize.{
DeploymentsStatusesSynchronizer
}
import pl.touk.nussknacker.ui.process.newdeployment.{DeploymentRepository, DeploymentService}
import pl.touk.nussknacker.ui.process.processingtype.ProcessingTypeData
import pl.touk.nussknacker.ui.process.processingtype.ProcessingTypeData.SchedulingForProcessingType
import pl.touk.nussknacker.ui.process.processingtype.{ModelClassLoaderProvider, ProcessingTypeData}
import pl.touk.nussknacker.ui.process.processingtype.loader.ProcessingTypeDataLoader
import pl.touk.nussknacker.ui.process.processingtype.provider.ReloadableProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.processingtype.{ModelClassLoaderProvider, ProcessingTypeData}
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.process.repository.activities.{DbScenarioActivityRepository, ScenarioActivityRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.repository.stickynotes.DbStickyNotesRepository
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.process.test.{PreliminaryScenarioTestDataSerDe, ScenarioTestService}
import pl.touk.nussknacker.ui.process.version.{ScenarioGraphVersionRepository, ScenarioGraphVersionService}
import pl.touk.nussknacker.ui.processreport.ProcessCounter
Expand Down Expand Up @@ -723,6 +721,8 @@ class AkkaHttpBasedRouteProvider(
featureTogglesConfig: FeatureTogglesConfig,
globalNotificationRepository: InMemoryTimeseriesRepository[Notification],
modelClassLoaderProvider: ModelClassLoaderProvider
)(
implicit executionContextWithIORuntime: ExecutionContextWithIORuntime
): Resource[IO, ReloadableProcessingTypeDataProvider] = {
Resource
.make(
Expand Down Expand Up @@ -763,7 +763,7 @@ class AkkaHttpBasedRouteProvider(
dbioActionRunner: DBIOActionRunner,
sttpBackend: SttpBackend[Future, Any],
processingType: ProcessingType
) = {
)(implicit executionContextWithIORuntime: ExecutionContextWithIORuntime) = {
val additionalConfigsFromProvider = additionalUIConfigProvider.getAllForProcessingType(processingType)
DeploymentManagerDependencies(
DefaultProcessingTypeDeployedScenariosProvider(dbRef, processingType),
Expand All @@ -775,7 +775,8 @@ class AkkaHttpBasedRouteProvider(
scenarioActivityRepository,
dbioActionRunner,
),
system.dispatcher,
executionContextWithIORuntime,
executionContextWithIORuntime.ioRuntime,
system,
sttpBackend,
additionalConfigsFromProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.api.{ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.flink.minicluster.FlinkMiniClusterConfig
import pl.touk.nussknacker.engine.management.{FlinkDeploymentManager, FlinkStreamingDeploymentManagerProvider}
import pl.touk.nussknacker.engine.flink.minicluster.scenariotesting.ScenarioStateVerificationConfig
import pl.touk.nussknacker.engine.management.{
FlinkConfig,
FlinkDeploymentManager,
FlinkStreamingDeploymentManagerProvider
}
import pl.touk.nussknacker.engine.util.loader.{DeploymentManagersClassLoader, ModelClassLoader}
import pl.touk.nussknacker.test.config.ConfigWithScalaVersion
import pl.touk.nussknacker.test.utils.domain.TestFactory
Expand Down Expand Up @@ -64,7 +68,7 @@ class MockDeploymentManager private (
scenarioActivityManager: ScenarioActivityManager = NoOpScenarioActivityManager,
customProcessStateDefinitionManager: Option[ProcessStateDefinitionManager],
deploymentManagersClassLoader: (DeploymentManagersClassLoader, IO[Unit]),
)(implicit executionContext: ExecutionContext, IORuntime: IORuntime)
)(implicit executionContext: ExecutionContext, ioRuntime: IORuntime)
extends FlinkDeploymentManager(
ModelData(
ProcessingTypeConfig.read(ConfigWithScalaVersion.StreamingProcessTypeConfig),
Expand All @@ -79,13 +83,13 @@ class MockDeploymentManager private (
deployedScenariosProvider,
actionService,
scenarioActivityManager,
ExecutionContext.global,
executionContext,
ioRuntime,
ActorSystem("MockDeploymentManager"),
SttpBackendStub.asynchronousFuture
),
shouldVerifyBeforeDeploy = false,
mainClassName = "UNUSED",
scenarioTestingConfig = FlinkMiniClusterConfig()
flinkConfig = FlinkConfig(None, scenarioStateVerification = ScenarioStateVerificationConfig(enabled = false))
) {

import MockDeploymentManager._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ object TestFactory {
new ProcessingTypeActionServiceStub,
NoOpScenarioActivityManager,
actorSystem.dispatcher,
IORuntime.global,
actorSystem,
SttpBackendStub.asynchronousFuture
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.ui.process.newdeployment

import cats.effect.unsafe.IORuntime
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
Expand All @@ -22,7 +23,8 @@ import pl.touk.nussknacker.ui.process.repository.DBIOActionRunner
import pl.touk.nussknacker.ui.process.repository.ProcessRepository.CreateProcessAction

import java.time.{Clock, Instant, ZoneOffset}
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.ExecutionContext
import scala.util.Failure

class DeploymentServiceTest
Expand All @@ -35,6 +37,9 @@ class DeploymentServiceTest
with EitherValuesDetailedMessage
with BeforeAndAfterEach {

private implicit val ec: ExecutionContext = ExecutionContext.global
private implicit val ioRuntime: IORuntime = IORuntime.global

override protected val dbioRunner: DBIOActionRunner = DBIOActionRunner(testDbRef)

private val writeScenarioRepository = TestFactory.newWriteProcessRepository(testDbRef, clock, modelVersions = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,9 @@ class PeriodicProcessServiceIntegrationTest

def tryWithFailedListener[T](action: () => Future[T]): Unit = {
f.failListener = true
val exception = intercept[TestFailedException](action().futureValue)
exception.getCause shouldBe a[PeriodicProcessException]
intercept[TestFailedException](action().futureValue) should matchPattern {
case ex: TestFailedException if ex.getCause.isInstanceOf[PeriodicProcessException] =>
}
f.failListener = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,14 @@ class PeriodicProcessServiceTest
tryToSchedule(cronInFuture) shouldBe (())
tryToSchedule(MultipleScheduleProperty(Map("s1" -> cronInFuture, "s2" -> cronInPast))) shouldBe (())

intercept[TestFailedException](tryToSchedule(cronInPast)).getCause shouldBe a[PeriodicProcessException]
intercept[TestFailedException](tryToSchedule(cronInPast)) should matchPattern {
case ex: TestFailedException if ex.getCause.isInstanceOf[PeriodicProcessException] =>
}
intercept[TestFailedException](
tryToSchedule(MultipleScheduleProperty(Map("s1" -> cronInPast, "s2" -> cronInPast)))
).getCause shouldBe a[PeriodicProcessException]
) should matchPattern {
case ex: TestFailedException if ex.getCause.isInstanceOf[PeriodicProcessException] =>
}
}

test("pickMostImportantActiveDeployment - should return correct deployment for multiple schedules") {
Expand Down
12 changes: 7 additions & 5 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* maxContentLength - max length of a sticky notes content (characters)
* maxNotesCount - max count of sticky notes inside one scenario/fragment
* enabled - if set to false stickyNotes feature is disabled, stickyNotes cant be created, they are also not loaded to graph
* [#7534](https://github.com/TouK/nussknacker/pull/7534) `shouldVerifyBeforeDeploy` configuration entry available for Flink deployment
was renamed to `scenarioStateVerification.enabled`

### Other changes

Expand Down Expand Up @@ -46,7 +48,6 @@ To see the biggest differences please consult the [changelog](Changelog.md).
deploymentConfig: {
type: "flinkPeriodic"
restUrl: "http://jobmanager:8081"
shouldVerifyBeforeDeploy: true
deploymentManager {
db: { <config of the custom db data source> },
processingType: streaming,
Expand All @@ -69,13 +70,14 @@ To see the biggest differences please consult the [changelog](Changelog.md).
legacyDb: { <OPTIONAL config of the custom db data source> },
}
restUrl: "http://jobmanager:8081"
shouldVerifyBeforeDeploy: true
}
```
* [#7335](https://github.com/TouK/nussknacker/pull/7335) Deployment managers are loaded using separate class loader (not the Application ClassLoader - `/opt/nussknacker/managers/*` should be removed from CLASSPATH definition). The default location for deployment managers jars is the `managers` folder inside the working directory.
* [#7458](https://github.com/TouK/nussknacker/pull/7458) Flink scenario testing mechanism and scenario state verification mechanism: by default mini cluster is created once and reused each time
To revert previous behaviour (creating minicluster each time), change `deploymentConfig.miniCluster.reuseMiniClusterForScenarioTesting` or/and
`deploymentConfig.miniCluster.reuseMiniClusterForScenarioStateVerification` to `false`
* [#7458](https://github.com/TouK/nussknacker/pull/7458) [#7534](https://github.com/TouK/nussknacker/pull/7534) Flink scenario testing mechanism and scenario state verification mechanism changes
* By default, shared mini cluster is created once and reused each time. To revert previous behaviour (creating minicluster each time),
switch `deploymentConfig.scenarioTesting.reuseSharedMiniCluster` or/and `deploymentConfig.scenarioStateVerification.reuseSharedMiniCluster` to `false`
* Scenario testing and scenario state verification is now limited by a timeout to ensure proper resources cleaning. In some cases it might be needed to change the timeout
value. To do that, set `deploymentConfig.scenarioTesting.timeout` or/and `deploymentConfig.scenarioStateVerification.timeout` to desired values. Notice that this properties should be configured along with `akka.http.server.request-timeout`
* [#7468](https://github.com/TouK/nussknacker/pull/7468) When a namespace is configured, Kafka consumer groups are also namespaced.
This change should have been introduced as of starting from Nussknacker 1.15 when a feature flag `useNamingStrategyForConsumerGroupId`
was removed to temporarily disable consumer group namespacing.
Expand Down
Loading

0 comments on commit 8c1c554

Please sign in to comment.