Skip to content

Commit

Permalink
Merge pull request #1835 from TouK/staging
Browse files Browse the repository at this point in the history
Update demo
  • Loading branch information
dswiecki authored Jun 25, 2021
2 parents ac69d04 + 71f37f2 commit 3e89e3e
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class GenericItSpec extends FunSuite with FlinkSpec with Matchers with KafkaSpec
Map("firstname" -> "Jan"), SecondRecordSchemaV1
)

private def jsonProcess(filter: String) =
private def jsonTypedProcess(filter: String) =
EspProcessBuilder
.id("json-test")
.parallelism(1)
Expand Down Expand Up @@ -203,14 +203,14 @@ class GenericItSpec extends FunSuite with FlinkSpec with Matchers with KafkaSpec
sendAsJson(givenMatchingJsonObj, JsonInTopic, timeAgo)

assertThrows[Exception] {
run(jsonProcess("#input.nestMap.notExist == ''")) {}
run(jsonTypedProcess("#input.nestMap.notExist == ''")) {}
}

assertThrows[Exception] {
run(jsonProcess("#input.list1[0].notExist == ''")) {}
run(jsonTypedProcess("#input.list1[0].notExist == ''")) {}
}

val validJsonProcess = jsonProcess("#input.first == 'Jan' and " +
val validJsonProcess = jsonTypedProcess("#input.first == 'Jan' and " +
"#input.nestMap.nestedField != 'dummy' and " +
"#input.list1[0].listField != 'dummy' and " +
"#input.list2[0] != 15")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Source, TestDataGenerator}
import pl.touk.nussknacker.engine.api.CirceUtil
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue}
import pl.touk.nussknacker.engine.api.definition.{JsonParameterEditor, MandatoryParameterValidator, NotBlankParameterValidator, Parameter}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils}
import pl.touk.nussknacker.engine.api.typed._
import pl.touk.nussknacker.engine.api.{CirceUtil, MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.flink.util.timestamp.BoundedOutOfOrderPreviousElementAssigner
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializationSchemaFactory
import pl.touk.nussknacker.engine.kafka.serialization.NKKafkaDeserializationSchemaWrapper
import pl.touk.nussknacker.engine.kafka.source.{KafkaSource, KafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.{BasicRecordFormatter, KafkaConfig, KafkaUtils, RecordFormatter, RecordFormatterFactory}
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.TopicParamName
import pl.touk.nussknacker.engine.kafka.{BasicRecordFormatter, KafkaConfig, RecordFormatter, RecordFormatterFactory}
import pl.touk.nussknacker.engine.util.Implicits._
import pl.touk.nussknacker.engine.util.typing.TypingUtils

Expand All @@ -33,17 +34,25 @@ object sources {
class GenericJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, java.util.Map[_, _]](
new FixedValueDeserializationSchemaFactory(JsonMapDeserialization), None, FixedRecordFormatterFactoryWrapper(JsonRecordFormatter), processObjectDependencies)

class GenericTypedJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies)
extends FlinkSourceFactory[TypedMap] with Serializable {

@MethodToInvoke
def create(@ParamName("topic") topic: String, @ParamName("type") definition: java.util.Map[String, _]): Source[TypedMap] with TestDataGenerator = {
val kafkaConfig = KafkaConfig.parseProcessObjectDependencies(processObjectDependencies)
val deserializationSchema = new NKKafkaDeserializationSchemaWrapper(JsonTypedMapDeserialization)
val preparedTopics = List(KafkaUtils.prepareKafkaTopic(topic, processObjectDependencies))
new KafkaSource(preparedTopics, kafkaConfig, deserializationSchema, None, JsonRecordFormatter) with ReturningType {
override def returnType: typing.TypingResult = TypingUtils.typeMapDefinition(definition)
}
class GenericTypedJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, TypedMap](
new FixedValueDeserializationSchemaFactory(JsonTypedMapDeserialization), None, FixedRecordFormatterFactoryWrapper(JsonRecordFormatter), processObjectDependencies) {

protected val TypeDefinitionParamName = "type"

protected val TypeParameter: Parameter = Parameter[java.util.Map[String, _]](TypeDefinitionParamName).copy(
validators = List(MandatoryParameterValidator, NotBlankParameterValidator),
editor = Some(JsonParameterEditor)
)

override protected def prepareInitialParameters: List[Parameter] = super.prepareInitialParameters ++ List(TypeParameter)

override protected def nextSteps(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: NodeId): NodeTransformationDefinition = {
case step@TransformationStep((TopicParamName, DefinedEagerParameter(topic: String, _)) ::
(TypeDefinitionParamName, DefinedEagerParameter(definition: Any, _)) :: Nil, _) =>
val typingResult = TypingUtils.typeMapDefinition(definition.asInstanceOf[java.util.Map[String, _]])
prepareSourceFinalResults(context, dependencies, step.parameters, keyTypingResult, typingResult, Nil)
case step@TransformationStep((TopicParamName, top) :: (TypeDefinitionParamName, typ) :: Nil, _) =>
prepareSourceFinalErrors(context, dependencies, step.parameters, errors = Nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.context.transformation.{BaseDefinedParameter, DefinedEagerParameter, DefinedSingleParameter, NodeDependencyValue, SingleInputGenericNodeTransformation}
import pl.touk.nussknacker.engine.api.definition.{WithExplicitTypesToExtract, _}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypingResult}
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceFactoryState
import pl.touk.nussknacker.engine.kafka.validator.WithCachedTopicsExistenceValidator

Expand Down Expand Up @@ -87,7 +87,7 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](deserializationSchemaFactory:

protected def nextSteps(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: ProcessCompilationError.NodeId): NodeTransformationDefinition = {
case step@TransformationStep((KafkaSourceFactory.TopicParamName, DefinedEagerParameter(topic: String, _)) :: _, None) =>
prepareSourceFinalResults(context, dependencies, step.parameters, topicsValidationErrors(topic))
prepareSourceFinalResults(context, dependencies, step.parameters, keyTypingResult, valueTypingResult, topicsValidationErrors(topic))
case step@TransformationStep((KafkaSourceFactory.TopicParamName, _) :: _, None) =>
// Edge case - for some reason Topic is not defined, e.g. when topic does not match DefinedEagerParameter(String, _):
// 1. FailedToDefineParameter
Expand All @@ -99,10 +99,11 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](deserializationSchemaFactory:
protected def prepareSourceFinalResults(context: ValidationContext,
dependencies: List[NodeDependencyValue],
parameters: List[(String, DefinedParameter)],
errors: List[ProcessCompilationError],
contextInitializer: Option[KafkaContextInitializer[K, V, DefinedParameter]] = None
keyTypingResult: TypingResult,
valueTypingResult: TypingResult,
errors: List[ProcessCompilationError]
)(implicit nodeId: NodeId): FinalResults = {
val kafkaContextInitializer = contextInitializer.getOrElse(prepareContextInitializer(parameters))
val kafkaContextInitializer = prepareContextInitializer(parameters, keyTypingResult, valueTypingResult)
FinalResults(
finalContext = kafkaContextInitializer.validationContext(context, dependencies, parameters),
errors = errors,
Expand All @@ -119,7 +120,9 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](deserializationSchemaFactory:
}

// Overwrite this for dynamic type definitions.
protected def prepareContextInitializer(params: List[(String, DefinedParameter)]): KafkaContextInitializer[K, V, DefinedParameter] =
protected def prepareContextInitializer(params: List[(String, DefinedParameter)],
keyTypingResult: TypingResult,
valueTypingResult: TypingResult): KafkaContextInitializer[K, V, DefinedParameter] =
new KafkaContextInitializer[K, V, DefinedSingleParameter](keyTypingResult, valueTypingResult)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class KafkaExceptionConsumerSpec extends FunSuite with FlinkSpec with KafkaSpec
val decoded = CirceUtil.decodeJsonUnsafe[KafkaExceptionInfo](consumed.message())
decoded.nodeId shouldBe Some("shouldFail")
decoded.processName shouldBe "testProcess"
decoded.message shouldBe Some("Unknown exception")
decoded.exceptionInput shouldBe Some("SpelExpressionEvaluationException:Expression [1/0 != 10] evaluation failed, message: / by zero")
decoded.message shouldBe Some("Expression [1/0 != 10] evaluation failed, message: / by zero")
decoded.exceptionInput shouldBe Some("1/0 != 10")
decoded.additionalData shouldBe Map("configurableKey" -> "sampleValue")

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import pl.touk.nussknacker.engine.api
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.dict.DictRegistry
import pl.touk.nussknacker.engine.api.exception.NonTransientException
import pl.touk.nussknacker.engine.api.expression.{ExpressionParseError, ExpressionParser, TypedExpression}
import pl.touk.nussknacker.engine.api.process.ClassExtractionSettings
import pl.touk.nussknacker.engine.api.typed.supertype.{CommonSupertypeFinder, SupertypeClassResolutionStrategy}
Expand Down Expand Up @@ -64,7 +65,7 @@ final case class ParsedSpelExpression(original: String, parser: () => Validated[
}

class SpelExpressionEvaluationException(val expression: String, val ctxId: String, cause: Throwable)
extends RuntimeException(s"Expression [$expression] evaluation failed, message: ${cause.getMessage}", cause)
extends NonTransientException(expression, s"Expression [$expression] evaluation failed, message: ${cause.getMessage}", cause = cause)

class SpelExpression(parsed: ParsedSpelExpression,
expectedReturnType: TypingResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ private[spel] class Typer(classLoader: ClassLoader, commonSupertypeFinder: Commo
}

private def extractIterativeType(parent: TypingResult): Validated[NonEmptyList[ExpressionParseError], TypingResult] = parent match {
case tc: SingleTypingResult if tc.objType.canBeSubclassOf(Typed[java.util.List[_]])
|| tc.objType.canBeSubclassOf(Typed[java.util.Set[_]])=> Valid(tc.objType.params.headOption.getOrElse(Unknown))
case tc: SingleTypingResult if tc.objType.canBeSubclassOf(Typed[java.util.Collection[_]]) => Valid(tc.objType.params.headOption.getOrElse(Unknown))
case tc: SingleTypingResult if tc.objType.canBeSubclassOf(Typed[java.util.Map[_, _]]) =>
Valid(TypedObjectTypingResult(List(
("key", tc.objType.params.headOption.getOrElse(Unknown)),
Expand Down
4 changes: 3 additions & 1 deletion ui/client/components/graph/SelectionContextProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type UserAction = ((e: Event) => unknown) | null
interface UserActions {
copy: UserAction,
paste: UserAction,
canPaste: boolean,
cut: UserAction,
delete: UserAction,
undo: UserAction,
Expand Down Expand Up @@ -188,7 +189,8 @@ export default function SelectionContextProvider(props: PropsWithChildren<{ past
{category: events.categories.keyboard, action: events.actions.keyboard.copy},
),
)),
paste: canAccessClipboard && capabilities.write && ((e) => dispatch(
canPaste: !!canAccessClipboard,
paste: capabilities.write && ((e) => dispatch(
pasteSelection(
() => paste(e),
{category: events.categories.keyboard, action: events.actions.keyboard.paste},
Expand Down
4 changes: 2 additions & 2 deletions ui/client/components/toolbars/edit/buttons/PasteButton.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import {CapabilitiesToolbarButton} from "../../../toolbarComponents/Capabilities
function PasteButton(): JSX.Element {
const {t} = useTranslation()

const {paste} = useSelectionActions()
const {paste, canPaste} = useSelectionActions()
return (
<CapabilitiesToolbarButton
write
name={t("panels.actions.edit-paste.button", "paste")}
icon={<Icon/>}
disabled={!paste}
disabled={!paste || !canPaste}
onClick={paste ? event => paste(event.nativeEvent) : null}
/>
)
Expand Down

0 comments on commit 3e89e3e

Please sign in to comment.