Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scenario tests with fragment input validation #7159

Merged

Conversation

Elmacioro
Copy link
Contributor

@Elmacioro Elmacioro commented Nov 15, 2024

Describe your changes

Currently when we try to test a scenario which uses a fragment which has some validation set on parameter's expression there is an exception during serialization.

12:47:28.764 [nussknacker-designer-akka.actor.default-dispatcher-10] ERROR p.t.n.ui.api.NuDesignerErrorToHttp$ - Unknown error: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
        at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:631)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.registerInterpretationPart$1(FlinkProcessRegistrar.scala:389)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.registerSourcePart$1(FlinkProcessRegistrar.scala:189)
        at pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar.$anonfun$register$5(FlinkProcessRegistrar.scala:174)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
...

This happens as deep down there is a classloader passed which cannot be serialized.
The problem started to occur with introduction of ValidationExpressionParameterValidator which holds ExpressionEvaluator which was created in an optimized way and thus contains listeners. Especially ResultsCollectingListener which has a variableEncoder which has the problematic classloader.

In proposed changes at the level of ProcessCompilerData I pass ExpressionEvaluator.unOptimizedEvaluator which is used only by the validator.
I also left the ExpressionEvaluator.optimizedEvaluator intact for the Interpreter and ProcessCompilerData creation as it's needed there.

I added a test replicating the issue. It passes with the proposed change but would fail in the same way as on the environment without it:

Zrzut ekranu z 2024-11-18 14-45-40

Checklist before merge

  • Related issue ID is placed at the beginning of PR title in [brackets] (can be GH issue or Nu Jira issue)
  • Code is cleaned from temporary changes and commented out lines
  • Parts of the code that are not easy to understand are documented in the code
  • Changes are covered by automated tests
  • Showcase in dev-application.conf added to demonstrate the feature
  • Documentation added or updated
  • Added entry in Changelog.md describing the change from the perspective of a public distribution user
  • Added MigrationGuide.md entry in the appropriate subcategory if introducing a breaking change
  • Verify that PR will be squashed during merge

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Enhanced testing framework for fragments with parameter validation.
    • Introduced a new test case to ensure fragments execute without exceptions when parameter validation is defined.
    • Added a new Activities panel for better scenario activity tracking and introduced scenario labels for improved organization.
    • Upgraded Flink to version 1.19.1 with performance optimizations.
    • Improved user experience with a revamped UI for aggregation definitions and enhanced validation handling in ad-hoc tests.
  • Bug Fixes

    • Adjusted expression evaluation strategy in the process compilation preparation phase for improved performance.
  • Documentation

    • Updated changelog to reflect new version 1.18 and its significant features and improvements.

Copy link

coderabbitai bot commented Nov 15, 2024

Caution

Review failed

The pull request is closed.

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request introduces enhancements to the testing framework for fragments with parameter validation in the FlinkTestMainSpec.scala file, including new test cases and methods. A new test case verifies that fragments with parameter validation execute without exceptions. Additionally, the ProcessCompilerData object is modified to change the evaluation strategy of the expression evaluator from optimized to unoptimized within the prepare method. The changelog is also updated to version 1.18, detailing new features, improvements, and breaking changes.

Changes

File Change Summary
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala Enhanced testing framework by adding new test cases for fragments with parameter validation and new methods.
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala Modified the prepare method to use an unoptimized expression evaluator instead of an optimized one.
docs/Changelog.md Updated to reflect version 1.18, detailing new features, improvements, and breaking changes.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@Elmacioro Elmacioro marked this pull request as ready for review November 18, 2024 15:06
@Elmacioro Elmacioro force-pushed the fix-scenario-tests-with-fragment-input-validation branch from 8a7be19 to c7832ab Compare November 18, 2024 15:09
@Elmacioro Elmacioro requested a review from arkadius November 18, 2024 15:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (2)

667-682: Enhance the test with specific assertions for fragment behavior

While the test verifies that no exceptions are thrown when a process fragment with parameter validation is defined, it would be more robust to include assertions that check the actual behavior of the fragment. This ensures that not only does the process run without exceptions, but it also produces the expected results.


739-766: Consolidate object and class to improve code organization

Defining an object FlinkTestMainSpec separately in the same file as class FlinkTestMainSpec can be confusing. Consider moving the contents of the object into the class as private members. This aligns with Scala best practices and enhances readability.

Apply this diff to move the definitions into the class:

-class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with BeforeAndAfterEach with OptionValues {
+class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with BeforeAndAfterEach with OptionValues {
+  private val fragmentWithValidationName = "fragmentWithValidation"
+
+  private val processWithFragmentParameterValidation: CanonicalProcess = {
+    // existing code from the object
+  }
+
   // Rest of the class code...
-}
-
-object FlinkTestMainSpec {
-  private val fragmentWithValidationName = "fragmentWithValidation"
-
-  private val processWithFragmentParameterValidation: CanonicalProcess = {
-    // existing code...
-  }
-}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 5ebf989 and c7832ab.

📒 Files selected for processing (2)
  • engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (4 hunks)
  • utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • utils/utils/src/main/scala/pl/touk/nussknacker/engine/util/json/ToJsonEncoder.scala

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala (1)

43-43: Document performance implications of using unoptimized evaluator.

While switching to unOptimizedEvaluator may resolve the serialization issue in tests, it's important to document this decision as it represents a performance trade-off.

Add a comment explaining why we're using the unoptimized evaluator:

    val expressionEvaluator =
+     // Using unoptimized evaluator to avoid serialization issues in tests
      ExpressionEvaluator.unOptimizedEvaluator(globalVariablesPreparer)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c7832ab and 0600213.

📒 Files selected for processing (2)
  • engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (4 hunks)
  • scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala (1 hunks)
🔇 Additional comments (2)
scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala (1)

43-43: Verify that this change resolves the serialization issue.

The switch to unOptimizedEvaluator and removal of listeners parameter might help with serialization, but we should verify this thoroughly.

Let's verify the impact:

✅ Verification successful

Based on the search results, I can now provide a definitive response regarding the switch to unoptimized evaluator:

The change to unoptimized evaluator is appropriate for test scenarios

The switch from optimizedEvaluator to unOptimizedEvaluator and removal of listeners is justified because:

  1. This is in test context where performance is less critical
  2. The change is consistent with other test-related code that uses unoptimized evaluator (as seen in multiple test files)
  3. The removal of listeners helps with serialization which is important for Flink test execution
  4. The change is localized to test scenarios with fragment input validation

The evidence shows this is a valid change that prioritizes test stability over performance optimization, which is the right tradeoff for test scenarios. The unoptimized evaluator is already used in other test contexts throughout the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for related test cases and their outcomes
rg -A 5 "RichFlatMapFunction.*serializ" 

# Look for other uses of optimizedEvaluator to ensure we haven't missed any cases
ast-grep --pattern 'ExpressionEvaluator.optimizedEvaluator($$$)'

Length of output: 338


Script:

#!/bin/bash
# Let's check for RichFlatMapFunction usage and serialization context
rg -B 5 -A 5 "RichFlatMapFunction"

# Let's also check where unOptimizedEvaluator is used
ast-grep --pattern 'ExpressionEvaluator.unOptimizedEvaluator($$$)'

# Check for any serialization-related tests or issues
rg -B 3 -A 3 "NotSerializableException|Serializable"

Length of output: 266090

engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (1)

140-161: Add documentation and improve type safety

The fragment definition could benefit from:

  1. ScalaDoc explaining the purpose and usage of this test fragment
  2. More type-safe alternatives for empty collections

Consider these improvements:

+  /**
+   * Creates a test fragment with parameter validation.
+   * This fragment is used to verify that the parameter validation mechanism
+   * works correctly in the context of fragment processing.
+   */
   private val processWithFragmentParameterValidation: CanonicalProcess = {
     val fragmentParamName = ParameterName("param")
     val fragmentParam = FragmentParameter(fragmentParamName, FragmentClazzRef[String]).copy(
       valueCompileTimeValidation = Some(
         ParameterValueCompileTimeValidation(
           validationExpression = Expression.spel("true"),
           validationFailedMessage = Some("param validation failed")
         )
       )
     )

     CanonicalProcess(
       MetaData(fragmentWithValidationName, FragmentSpecificData()),
       List(
         FlatNode(
-          FragmentInputDefinition("start", List(fragmentParam))
+          FragmentInputDefinition("start", fragmentParam :: Nil)
         ),
-        FlatNode(FragmentOutputDefinition("out1", "output", List.empty))
+        FlatNode(FragmentOutputDefinition("out1", "output", Nil))
       ),
-      List.empty
+      Nil
     )
   }

Additionally, according to the PR objectives, there's a serialization issue with RichFlatMapFunction. However, I don't see any @transient annotations being added to address this. Could you clarify if this is being handled in a different file?

Let me check for any related changes:

@Elmacioro Elmacioro force-pushed the fix-scenario-tests-with-fragment-input-validation branch 2 times, most recently from dd3b6c1 to 5a0e87d Compare November 19, 2024 15:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between dd3b6c1 and 5a0e87d.

📒 Files selected for processing (2)
  • engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (6 hunks)
  • scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala
🔇 Additional comments (1)
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (1)

808-829: 🛠️ Refactor suggestion

Provide a meaningful validation expression

The current validation expression Expression.spel("true") will always pass validation, making it ineffective for testing the validation mechanism. Consider using an expression that actually validates the parameter value.

Apply this diff to use a meaningful validation expression:

     val processWithFragmentParameterValidation: CanonicalProcess = {
       val fragmentParamName = ParameterName("param")
       val fragmentParam = FragmentParameter(fragmentParamName, FragmentClazzRef[String]).copy(
         valueCompileTimeValidation = Some(
           ParameterValueCompileTimeValidation(
-            validationExpression = Expression.spel("true"),
+            validationExpression = Expression.spel("#param != null and #param.length() > 0"),
             validationFailedMessage = Some("param validation failed")
           )
         )
       )

Likely invalid or redundant comment.

@Elmacioro Elmacioro changed the base branch from staging to release/1.18 November 20, 2024 22:20
@Elmacioro Elmacioro force-pushed the fix-scenario-tests-with-fragment-input-validation branch from b4c2d43 to 1d3611c Compare November 20, 2024 22:21
@github-actions github-actions bot added the docs label Nov 20, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (1)

24-29: Consider consolidating imports from the same package

The imports from pl.touk.nussknacker.engine.api.parameter and pl.touk.nussknacker.engine.graph.node packages can be consolidated to improve readability.

-import pl.touk.nussknacker.engine.api.parameter.ParameterName
-import pl.touk.nussknacker.engine.api.parameter.ParameterValueCompileTimeValidation
+import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b4c2d43 and 1d3611c.

📒 Files selected for processing (3)
  • docs/Changelog.md (1 hunks)
  • engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (4 hunks)
  • scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/ProcessCompilerData.scala
🔇 Additional comments (4)
engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala (3)

668-683: 🛠️ Refactor suggestion

Enhance test coverage for fragment parameter validation

The current test only verifies the happy path. Consider:

  1. Testing with invalid parameters to ensure validation works
  2. Verifying validation error messages
  3. Using a more descriptive test name that indicates what validation scenario is being tested
-    "should not throw exception when process fragment has parameter validation defined" in {
+    "should validate fragment parameters according to validation rules" in {
       val scenario = ScenarioBuilder
         .streaming("scenario1")
         .source(sourceNodeId, "input")
         .fragmentOneOut("sub", fragmentWithValidationName, "output", "fragmentResult", "param" -> "'asd'".spel)
         .emptySink("out", "valueMonitor", "Value" -> "1".spel)

       val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario)

-      val results = runFlinkTest(
+      // Test valid parameter
+      val validResults = runFlinkTest(
         resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") },
         ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))),
         useIOMonadInInterpreter
       )
-      results.exceptions.length shouldBe 0
+      validResults.exceptions shouldBe empty
+      
+      // Test invalid parameter
+      val scenarioWithInvalidParam = scenario.copy(nodes = scenario.nodes.map {
+        case n if n.id == "sub" => n.copy(parameters = Map("param" -> "''".spel))
+        case n => n
+      })
+      val resolvedInvalid = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenarioWithInvalidParam)
+      val invalidResults = runFlinkTest(
+        resolvedInvalid.valueOr { _ => throw new IllegalArgumentException("Won't happen") },
+        ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))),
+        useIOMonadInInterpreter
+      )
+      invalidResults.exceptions should not be empty
+      invalidResults.exceptions.head.getMessage should include("param validation failed")
     }

Likely invalid or redundant comment.


744-765: ⚠️ Potential issue

Improve fragment validation implementation

The current implementation has several areas for improvement:

  1. The validation expression Expression.spel("true") always returns true, making the validation ineffective
  2. The validation message could be more descriptive
  3. Consider adding documentation for the fragment parameter type and validation rules
     val fragmentParam = FragmentParameter(fragmentParamName, FragmentClazzRef[String]).copy(
       valueCompileTimeValidation = Some(
         ParameterValueCompileTimeValidation(
-          validationExpression = Expression.spel("true"),
-          validationFailedMessage = Some("param validation failed")
+          validationExpression = Expression.spel("#param != null and #param.length() > 0"),
+          validationFailedMessage = Some("Parameter 'param' must not be null or empty")
         )
       )
     )

Likely invalid or redundant comment.


675-676: Verify serialization handling in FragmentResolver

Since the original issue was related to serialization of RichFlatMapFunction, let's verify that the FragmentResolver properly handles serialization.

docs/Changelog.md (1)

103-103: LGTM: Changelog entry is accurate and well-placed.

The entry properly documents the fix for scenario tests with fragment input validation, with correct PR reference and placement in version 1.18 section.

Copy link
Member

@arkadius arkadius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants