Skip to content

Commit

Permalink
Additional integration test for allowed classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkadiusz Panczyk authored and apanczyk committed Jul 16, 2021
1 parent 09225a3 commit 1c0da7b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package pl.touk.nussknacker.engine.management.sample

import java.time.LocalDateTime
import com.typesafe.config.Config
import io.circe.parser.decode
import io.circe.{Decoder, Encoder}
import io.circe.parser.decode
import net.ceedubs.ficus.Ficus._
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
import org.apache.flink.streaming.api.scala._
Expand All @@ -18,7 +19,7 @@ import pl.touk.nussknacker.engine.avro.source.KafkaAvroSourceFactory
import pl.touk.nussknacker.engine.flink.api.process._
import pl.touk.nussknacker.engine.flink.util.exception.ConfigurableExceptionHandlerFactory
import pl.touk.nussknacker.engine.flink.util.sink.EmptySink
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.flink.util.source.{EspDeserializationSchema, ReturningClassInstanceSource, ReturningTestCaseClass}
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.AggregateHelper
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.sampleTransformers.SlidingAggregateTransformerV2
import pl.touk.nussknacker.engine.flink.util.transformer.outer.OuterJoinTransformer
Expand All @@ -39,7 +40,6 @@ import pl.touk.nussknacker.engine.management.sample.transformer._
import pl.touk.nussknacker.engine.util.LoggingListener

import java.nio.charset.StandardCharsets
import java.time.LocalDateTime
import scala.reflect.ClassTag

object DevProcessConfigCreator {
Expand Down Expand Up @@ -96,7 +96,8 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
"communicationSource" -> categories(DynamicParametersSource),
"csv-source" -> categories(FlinkSourceFactory.noParam(new CsvSource)),
"genericSourceWithCustomVariables" -> categories(GenericSourceWithCustomVariablesSample),
"sql-source" -> categories(SqlSource)
"sql-source" -> categories(SqlSource),
"classInstanceSource" -> all(new ReturningClassInstanceSource)
)
}

Expand Down Expand Up @@ -193,10 +194,14 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
"TypedConfig" -> all(ConfigTypedGlobalVariable)
)

val allowedClasses = List(
classOf[ReturningTestCaseClass]
)

ExpressionConfig(
globalProcessVariables,
List.empty,
List.empty,
allowedClasses,
LanguageConfiguration(List()),
dictionaries = Map(
TestDictionary.id -> categories(TestDictionary.definition),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.touk.nussknacker.engine.flink.util.source

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import pl.touk.nussknacker.engine.api.editor.{DualEditor, DualEditorMode, SimpleEditor, SimpleEditorType}
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory

class ReturningClassInstanceSource extends FlinkSourceFactory[Any] {

@MethodToInvoke
def source(@ParamName("Allowed class")
@DualEditor(
simpleEditor = new SimpleEditor(`type` = SimpleEditorType.STRING_EDITOR),
defaultMode = DualEditorMode.SIMPLE
) allowedClass: String) =
new CollectionSource[Any](StreamExecutionEnvironment.getExecutionEnvironment.getConfig, List.empty, None, Typed.typedClass(Class.forName(allowedClass)))

}
case class ReturningTestCaseClass(someMethod: String)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ class DefinitionResourcesSpec extends FunSpec with ScalatestRouteTest with FailF

private implicit final val string: FromEntityUnmarshaller[String] = Unmarshaller.stringUnmarshaller.forContentTypes(ContentTypeRange.*)

it("should return definition data for allowed classes") {
getProcessDefinitionData(existingProcessingType, Map.empty[String, Long].asJson) ~> check {
status shouldBe StatusCodes.OK

val typesInformation = responseAs[Json].hcursor
.downField("processDefinition")
.downField("typesInformation")
.downAt(_.hcursor.downField("clazzName").get[String]("display").right.value == "ReturningTestCaseClass")
.downField("clazzName")
.downField("display")

typesInformation.focus.get shouldBe Json.fromString("ReturningTestCaseClass")
}
}

it("should handle missing processing type") {
getProcessDefinitionData("foo", Map.empty[String, Long].asJson) ~> check {
status shouldBe StatusCodes.NotFound
Expand Down

0 comments on commit 1c0da7b

Please sign in to comment.