Skip to content

Commit

Permalink
Removed unused miniCluster.streamExecutionEnvConfig configuration option
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Feb 20, 2025
1 parent 401dec8 commit 10dc416
Show file tree
Hide file tree
Showing 10 changed files with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MockDeploymentManager private (
FlinkConfig(None, scenarioStateVerification = ScenarioStateVerificationConfig(enabled = false)),
Some(
FlinkMiniClusterFactory
.createMiniClusterWithServices(modelData.modelClassLoader, new Configuration, new Configuration)
.createMiniClusterWithServices(modelData.modelClassLoader, new Configuration)
),
FlinkClientStub,
FlinkScenarioJobRunnerStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher
PeriodicStateStatus.customActionTooltips(
ScenarioStatusWithScenarioContext(
scenarioStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()),
latestVersionId = VersionId(5),
deployedVersionId = Some(VersionId(5)),
currentlyPresentedVersionId = Some(VersionId(5)),
)
Expand All @@ -85,7 +84,6 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher
PeriodicStateStatus.customActionTooltips(
ScenarioStatusWithScenarioContext(
scenarioStatus = ScheduledStatus(nextRunAt = LocalDateTime.now()),
latestVersionId = VersionId(5),
deployedVersionId = Some(VersionId(4)),
currentlyPresentedVersionId = Some(VersionId(5)),
)
Expand All @@ -98,7 +96,6 @@ class PeriodicProcessStateDefinitionManagerTest extends AnyFunSuite with Matcher
PeriodicStateStatus.customActionTooltips(
ScenarioStatusWithScenarioContext(
scenarioStatus = SimpleStateStatus.Canceled,
latestVersionId = VersionId(5),
deployedVersionId = Some(VersionId(4)),
currentlyPresentedVersionId = Some(VersionId(5)),
)
Expand Down
1 change: 0 additions & 1 deletion docs/configuration/ScenarioDeploymentConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ Deployment Manager of type `flinkStreaming` has the following configuration opti
| scenarioStateRequestTimeout | duration | 3 seconds | Request timeout for fetching scenario state from Flink |
| jobConfigsCacheSize | int | 1000 | Maximum number of cached job configuration elements. |
| miniCluster.config | map of strings | [:] | Configuration that will be passed to shared `MiniCluster` |
| miniCluster.streamExecutionEnvConfig | map of strings | [:] | Configuration that will be passed to shared `StreamExecutionEnvironment` used along with `MiniCluster` | |
| miniCluster.waitForJobManagerRestAPIAvailableTimeout | duration | 10 seconds | How long Nussknacker should wait fo Flink Mini Cluster REST endpoint. It is only used when `useMiniClusterForDeployment` is enabled | |
| scenarioTesting.reuseSharedMiniCluster | boolean | true | Reuses shared mini cluster for each scenario testing attempt |
| scenarioTesting.timeout | duration | 55 seconds | Timeout for scenario testing. When scenario test is not finished during this time, testing job will be canceled. This property should be configured along with `akka.http.server.request-timeout` for proper effect. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ object MockableDeploymentManagerProvider {
FlinkMiniClusterFactory.createMiniClusterWithServices(
modelData.modelClassLoader,
new Configuration,
new Configuration
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ trait BaseFlinkDeploymentManagerSpec extends AnyFunSuiteLike with Matchers with
}

test("use deploymentId passed as a jobId") {
val processName = ProcessName("runningFlink")
val processName = ProcessName("jobWithDeploymentIdAsAUuid")

val version = VersionId(15)
val process = SampleProcess.prepareProcess(processName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}

final case class FlinkMiniClusterConfig(
config: Configuration = new Configuration,
streamExecutionEnvConfig: Configuration = new Configuration,
waitForJobManagerRestAPIAvailableTimeout: FiniteDuration = 10.seconds
)
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,24 @@ object FlinkMiniClusterFactory extends LazyLogging {
stateVerificationConfig: ScenarioStateVerificationConfig,
): Option[FlinkMiniClusterWithServices] = {
if (useMiniClusterForDeployment || scenarioTestingConfig.reuseSharedMiniCluster || stateVerificationConfig.reuseSharedMiniCluster) {
Some(createMiniClusterWithServices(modelClassLoader, config.config, config.streamExecutionEnvConfig))
Some(createMiniClusterWithServices(modelClassLoader, config.config))
} else {
None
}
}

def createUnitTestsMiniClusterWithServices(
miniClusterConfigOverrides: Configuration = new Configuration,
streamExecutionConfigOverrides: Configuration = new Configuration
): FlinkMiniClusterWithServices = {
createMiniClusterWithServices(
ModelClassLoader.flinkWorkAroundEmptyClassloader,
miniClusterConfigOverrides,
streamExecutionConfigOverrides
)
}

def createMiniClusterWithServices(
modelClassLoader: URLClassLoader,
miniClusterConfigOverrides: Configuration,
streamExecutionConfigOverrides: Configuration
): FlinkMiniClusterWithServices = {
val miniClusterConfig = DefaultMiniClusterConfig
miniClusterConfig.addAll(miniClusterConfigOverrides)
Expand All @@ -85,7 +82,6 @@ object FlinkMiniClusterFactory extends LazyLogging {
FlinkMiniClusterStreamExecutionEnvironmentFactory.createStreamExecutionEnvironment(
miniCluster,
modelClassLoader,
streamExecutionConfigOverrides,
attached
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ object FlinkMiniClusterStreamExecutionEnvironmentFactory {
def createStreamExecutionEnvironment(
miniCluster: MiniCluster,
modelClassLoader: URLClassLoader,
configuration: Configuration,
attached: Boolean
): StreamExecutionEnvironment = {
val pipelineExecutorServiceLoader = createPipelineExecutorServiceLoader(miniCluster, modelClassLoader)
val configuration = new Configuration()
configuration.set(DeploymentOptions.TARGET, pipelineExecutorName)
configuration.set(PipelineOptions.CLASSPATHS, modelClassLoader.getURLs.map(_.toString).toList.asJava)
configuration.set[java.lang.Boolean](DeploymentOptions.ATTACHED, attached)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class LegacyFallbackToSingleUseMiniClusterHandler(modelClassLoader: URLClassLoad
FlinkMiniClusterFactory.createMiniClusterWithServices(
modelClassLoader,
legacyMiniClusterConfigOverrides,
new Configuration()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ trait FlinkSpec extends BeforeAndAfterAll with BeforeAndAfter with WithConfig {
flinkMiniCluster = FlinkMiniClusterFactory.createMiniClusterWithServices(
ModelClassLoader.flinkWorkAroundEmptyClassloader,
prepareFlinkConfiguration(),
new Configuration()
)
}

Expand Down

0 comments on commit 10dc416

Please sign in to comment.