Skip to content

Commit

Permalink
TypedObjectTypingResult - use regular Map in place of ListMap, order …
Browse files Browse the repository at this point in the history
…no longer matters - step #2 (related changes)
  • Loading branch information
rafa-minimal committed Jun 12, 2023
1 parent d0b4870 commit a77e110
Show file tree
Hide file tree
Showing 37 changed files with 151 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import pl.touk.nussknacker.engine.api.typed.TypingType.{TypingType, decoder}
import pl.touk.nussknacker.engine.api.typed.typing._
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

import scala.collection.immutable.ListMap
import scala.util.{Failure, Success, Try}

//TODO: refactor way of encoding to easier handle decoding.
Expand Down Expand Up @@ -120,7 +119,7 @@ class TypingResultDecoder(loadClass: String => Class[_]) {

private def typedObjectTypingResult(obj: HCursor): Decoder.Result[TypingResult] = for {
valueClass <- typedClass(obj)
fields <- obj.downField("fields").as[ListMap[String, TypingResult]]
fields <- obj.downField("fields").as[Map[String, TypingResult]]
additional <- obj.downField("additionalInfo").as[Option[Map[String, AdditionalDataValue]]].map(_.getOrElse(Map.empty))
} yield TypedObjectTypingResult(fields, valueClass, additional)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,5 @@ package pl.touk.nussknacker.engine.api.typed

import pl.touk.nussknacker.engine.api.typed.typing.TypingResult

import scala.collection.immutable.ListMap

object TypedObjectDefinition {

def apply(fields: List[(String, TypingResult)]): TypedObjectDefinition =
TypedObjectDefinition(ListMap(fields: _*))
}

case class TypedObjectDefinition(fields: ListMap[String, TypingResult])
case class TypedObjectDefinition(fields: Map[String, TypingResult])
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,21 @@ class CommonSupertypeFinder(classResolutionStrategy: SupertypeClassResolutionStr
}

private def unionOfFields(l: TypedObjectTypingResult, r: TypedObjectTypingResult)
(implicit numberPromotionStrategy: NumberTypesPromotionStrategy): List[(String, TypingResult)] = {
val leftFields = l.fields.toList
val rightFields = r.fields.toList
val leftFieldNames = leftFields.map(_._1)
val (rightIntersect, rightDoesNotIntersect) = rightFields.partition {
case (rightFieldName, _) => leftFieldNames.contains(rightFieldName)
}

val leftFieldsWithRightCommonFields = leftFields.map { case (name, leftType) =>
name -> (leftType :: rightIntersect.filter(name == _._1).map(_._2))
}.flatMap {
case (fieldName, leftType :: rightType :: Nil) if leftType == rightType =>
fieldName -> leftType :: Nil
case (fieldName, leftType :: rightType :: Nil) =>
val leastUpperBound = commonSupertype(leftType, rightType)
if (leastUpperBound == Typed.empty)
Nil // fields type collision - skipping this field
else
(fieldName, leastUpperBound) :: Nil
case (fieldName, types) =>
val leftField = types.head
fieldName -> leftField :: Nil
}
leftFieldsWithRightCommonFields ++ rightDoesNotIntersect
}
(implicit numberPromotionStrategy: NumberTypesPromotionStrategy): Map[String, TypingResult] =
(l.fields.toList ++ r.fields.toList).groupBy(_._1)
.map { case (key, value) => key -> value.map(_._2) }
.flatMap {
case (fieldName, leftType :: rightType :: Nil) =>
val common = commonSupertype(leftType, rightType)
if (common == Typed.empty)
None // fields type collision - skipping this field
else
Some(fieldName -> common)
case (fieldName, singleType :: Nil) =>
Some(fieldName -> singleType)
case (_, longerList) =>
throw new IllegalArgumentException("Computing union of more than two fields: " + longerList) // shouldn't happen
}

// This implementation is because TypedObjectTypingResult has underlying TypedClass instead of TypingResult
private def klassCommonSupertypeReturningTypedClass(left: TypedClass, right: TypedClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.LoneElement
import org.scalatest.matchers.should.Matchers
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing._

import scala.collection.immutable.ListMap

class TypedFromInstanceTest extends AnyFunSuite with Matchers with LoneElement with TableDrivenPropertyChecks {

Expand Down Expand Up @@ -42,7 +40,7 @@ class TypedFromInstanceTest extends AnyFunSuite with Matchers with LoneElement w
}

test("should type map types") {
val fieldTypes = ListMap(
val fieldTypes = Map(
"a" -> TypedObjectWithValue(Typed.typedClass[Int], 1),
"b" -> TypedObjectWithValue(Typed.typedClass[String], "string")
)
Expand Down Expand Up @@ -81,12 +79,12 @@ class TypedFromInstanceTest extends AnyFunSuite with Matchers with LoneElement w
checkTypingResult(listOfSimpleObjects.asJava, classOf[java.util.List[_]], Typed(classOf[Number]))

val listOfTypedMaps = List(TypedMap(Map("a" -> 1, "b" -> "B")), TypedMap(Map("a" -> 1)))
val typedMapTypingResult = TypedObjectTypingResult(ListMap("a" -> Typed(classOf[Integer])))
val typedMapTypingResult = TypedObjectTypingResult(Map("a" -> Typed(classOf[Integer])))
checkTypingResult(listOfTypedMaps, classOf[List[_]], typedMapTypingResult)
checkTypingResult(listOfTypedMaps.asJava, classOf[java.util.List[_]], typedMapTypingResult)
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(ListMap("c" -> Typed(classOf[Integer]))))
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(ListMap("b" -> Typed(classOf[Integer]))))
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(ListMap("a" -> Typed(classOf[String]))))
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(Map("c" -> Typed(classOf[Integer]))))
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(Map("b" -> Typed(classOf[Integer]))))
checkNotASubclassOfOtherParamTypingResult(listOfTypedMaps, TypedObjectTypingResult(Map("a" -> Typed(classOf[String]))))
}

test("should find element type for lists of different elements") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.{AdditionalDataValue, Typed, TypedNull, TypedObjectTypingResult, TypedObjectWithValue, TypedUnion, Unknown}

import scala.collection.immutable.ListMap

class TypingResultDecoderSpec extends AnyFunSuite with Matchers {

Expand All @@ -27,8 +26,8 @@ class TypingResultDecoderSpec extends AnyFunSuite with Matchers {
TypedUnion(Set(Typed.typedClass[String], Typed.typedClass[java.lang.Long])),
//this wont' work, handling primitives should be done with more sophisticated classloading
//Typed[Long]
TypedObjectTypingResult(ListMap("field1" -> Typed[String], "field2" -> Unknown)),
TypedObjectTypingResult(ListMap("field1" -> Typed[String]), Typed.typedClass[Map[String, Any]],
TypedObjectTypingResult(Map("field1" -> Typed[String], "field2" -> Unknown)),
TypedObjectTypingResult(Map("field1" -> Typed[String]), Typed.typedClass[Map[String, Any]],
Map[String, AdditionalDataValue]("ad1" -> "aaa", "ad2" -> 22L, "ad3" -> true))
).foreach { typing =>
decoder.decodeTypingResults.decodeJson(TypeEncoders.typingResultEncoder(typing)) shouldBe Right(typing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.api.typed.typing._

class TypingResultErrorMessagesSpec extends AnyFunSuite with Matchers with OptionValues with Inside {

private def typeMap(args: (String, TypingResult)*) = TypedObjectTypingResult(args.toList)
private def typeMap(args: (String, TypingResult)*) = TypedObjectTypingResult(args.toMap)

private def list(arg: TypingResult) = Typed.genericTypeClass[java.util.List[_]](List(arg))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import pl.touk.nussknacker.engine.api.typed.supertype.{CommonSupertypeFinder, Nu
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypedObjectTypingResult, TypedUnion, TypingResult, Unknown}

import java.util
import scala.collection.immutable.ListMap

class TypingResultSpec extends AnyFunSuite with Matchers with OptionValues with Inside {

private val commonSuperTypeFinder = new CommonSupertypeFinder(SupertypeClassResolutionStrategy.Intersection, true)

private def typeMap(args: (String, TypingResult)*) = TypedObjectTypingResult(args.toList)
private def typeMap(args: (String, TypingResult)*) = TypedObjectTypingResult(args.toMap)

private def list(arg: TypingResult) = Typed.genericTypeClass[java.util.List[_]](List(arg))

Expand Down Expand Up @@ -115,13 +114,13 @@ class TypingResultSpec extends AnyFunSuite with Matchers with OptionValues with
commonSuperTypeFinder.commonSupertype(Unknown, Typed[Long]) shouldEqual Unknown

commonSuperTypeFinder.commonSupertype(
TypedObjectTypingResult(ListMap("foo" -> Typed[String], "bar" -> Typed[Int], "baz" -> Typed[String])),
TypedObjectTypingResult(ListMap("foo" -> Typed[String], "bar" -> Typed[Long], "baz2" -> Typed[String]))) shouldEqual
TypedObjectTypingResult(ListMap("foo" -> Typed[String], "bar" -> Typed[java.lang.Long], "baz" -> Typed[String], "baz2" -> Typed[String]))
TypedObjectTypingResult(Map("foo" -> Typed[String], "bar" -> Typed[Int], "baz" -> Typed[String])),
TypedObjectTypingResult(Map("foo" -> Typed[String], "bar" -> Typed[Long], "baz2" -> Typed[String]))) shouldEqual
TypedObjectTypingResult(Map("foo" -> Typed[String], "bar" -> Typed[java.lang.Long], "baz" -> Typed[String], "baz2" -> Typed[String]))

commonSuperTypeFinder.commonSupertype(
TypedObjectTypingResult(ListMap("foo" -> Typed[String])), TypedObjectTypingResult(ListMap("foo" -> Typed[Long]))) shouldEqual
TypedObjectTypingResult(ListMap.empty[String, TypingResult])
TypedObjectTypingResult(Map("foo" -> Typed[String])), TypedObjectTypingResult(Map("foo" -> Typed[Long]))) shouldEqual
TypedObjectTypingResult(Map.empty[String, TypingResult])
}

test("find common supertype for complex types with inheritance in classes hierarchy") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging {
.map { case (tableName, entries) =>
val columnTypings = entries.map { case (_, columnName, klassName, _) => columnName -> Typed.typedClass(Class.forName(klassName)) }

tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings))
tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings.toMap))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object TableDefinition {
}

case class TableDefinition(columnDefs: List[ColumnDefinition]) {
val rowType: TypedObjectTypingResult = TypedObjectTypingResult(columnDefs.map(col => col.name -> col.typing))
val rowType: TypedObjectTypingResult = TypedObjectTypingResult(columnDefs.map(col => col.name -> col.typing).toMap)

val resultSetType: TypingResult = Typed.genericTypeClass(classOf[java.util.List[_]], rowType :: Nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import pl.touk.nussknacker.ui.api.helpers._
import pl.touk.nussknacker.ui.process.marshall.ProcessConverter
import pl.touk.nussknacker.ui.process.subprocess.{SubprocessDetails, SubprocessResolver}

import scala.collection.immutable.{ListMap, Map}
import scala.jdk.CollectionConverters._

class ProcessValidationSpec extends AnyFunSuite with Matchers {
Expand Down Expand Up @@ -213,7 +212,7 @@ class ProcessValidationSpec extends AnyFunSuite with Matchers {

val result = validator.validate(process)

result.errors.globalErrors shouldBe List((NodeValidationError("DuplicatedNodeIds", "Two nodes cannot have same id", "Duplicate node ids: switchID", None, RenderNotAllowed)))
result.errors.globalErrors shouldBe List(NodeValidationError("DuplicatedNodeIds", "Two nodes cannot have same id", "Duplicate node ids: switchID", None, RenderNotAllowed))
result.errors.invalidNodes shouldBe empty
result.warnings shouldBe ValidationWarnings.success
}
Expand Down Expand Up @@ -429,7 +428,7 @@ class ProcessValidationSpec extends AnyFunSuite with Matchers {
validationResult.errors.invalidNodes shouldBe Symbol("empty")
validationResult.nodeResults("sink2").variableTypes("input") shouldBe typing.Unknown
validationResult.nodeResults("sink2").variableTypes("var2") shouldBe Typed.fromInstance("42")
validationResult.nodeResults("sink2").variableTypes("subOut2") shouldBe TypedObjectTypingResult(ListMap(
validationResult.nodeResults("sink2").variableTypes("subOut2") shouldBe TypedObjectTypingResult(Map(
"bar" -> Typed.fromInstance("42")
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class UnionWithMemoTransformer(timestampAssigner: Option[TimestampWatermarkHandl
val finalContext = finalContextValidated.toOption.get

val mapTypeInfo = context.typeInformationDetection
.forType(TypedObjectTypingResult(valueByBranchId.mapValuesNow(_.returnType).toList, Typed.typedClass[java.util.Map[_, _]]))
.forType(TypedObjectTypingResult(valueByBranchId.mapValuesNow(_.returnType), Typed.typedClass[java.util.Map[_, _]]))
.asInstanceOf[TypeInformation[java.util.Map[String, AnyRef]]]

val processedTypeInfo = context.typeInformationDetection.forValueWithContext(finalContext, KeyedValueType.info(mapTypeInfo))
Expand Down Expand Up @@ -98,10 +98,10 @@ class UnionWithMemoTransformer(timestampAssigner: Option[TimestampWatermarkHandl

val validatedContext = ContextTransformation.findUniqueParentContext(inputContexts).map { parent =>
val newType = TypedObjectTypingResult(
(KeyField -> Typed[String]) :: inputContexts.map {
inputContexts.map {
case (branchId, _) =>
ContextTransformation.sanitizeBranchName(branchId) -> valueByBranchId(branchId).returnType
}.toList
} + (KeyField -> Typed[String])
)
ValidationContext(Map(variableName -> newType), Map.empty, parent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object aggregates {
.map(key -> _)
.leftMap(m => NonEmptyList.of(s"$key - $m"))
}.toList.sequence.leftMap(list => s"Invalid fields: ${list.toList.mkString(", ")}")
validationRes.map(fields => TypedObjectTypingResult(fields, objType = objType))
validationRes.map(fields => TypedObjectTypingResult(fields.toMap, objType = objType))
case TypedObjectTypingResult(inputFields, _, _) =>
Invalid(s"Fields do not match, aggregateBy: ${inputFields.keys.mkString(", ")}, aggregator: ${scalaFields.keys.mkString(", ")}")
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ class FullOuterJoinTransformer(timestampAssigner: Option[TimestampWatermarkHandl
.map(id -> _)
}.toList.sequence.map(_.toMap)
validatedAggregatorReturnTypes.map(outputTypeByBranchId => {
val outputTypes = outputTypeByBranchId.toList
.map{case (k, v) => ContextTransformation.sanitizeBranchName(k) -> v}
.++(List(KeyFieldName -> Typed.typedClass[String]))
TypedObjectTypingResult(outputTypes, Typed.typedClass[java.util.Map[String, AnyRef]])
val outputTypes = outputTypeByBranchId
.map { case (k, v) => ContextTransformation.sanitizeBranchName(k) -> v } + (KeyFieldName -> Typed.typedClass[String])
TypedObjectTypingResult(outputTypes)
})

case _ => Validated.validNel(Unknown)
Expand Down Expand Up @@ -111,7 +110,7 @@ class FullOuterJoinTransformer(timestampAssigner: Option[TimestampWatermarkHandl

val types = aggregateByByBranchId.mapValuesNow(_.returnType)
val optionTypes = types.mapValuesNow(t => Typed.genericTypeClass(classOf[Option[_]], List(t)))
val inputType = TypedObjectTypingResult(optionTypes.toList, objType = Typed.typedClass[java.util.Map[_, _]])
val inputType = TypedObjectTypingResult(optionTypes)

val storedType = aggregator.computeStoredTypeUnsafe(inputType)
val storedTypeInfo = context.typeInformationDetection.forType[AnyRef](storedType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class AggregatesSpec extends AnyFunSuite with TableDrivenPropertyChecks with Mat

val namedAggregators = aggregators.indices.map(id => s"field$id").zip(aggregators).tail.toMap

val mapAggregator = new MapAggregator(namedAggregators.mapValuesNow(_._1.asInstanceOf[Aggregator]).asJava)
val input = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._2).toList, objType = Typed.typedClass[JMap[_, _]])
val mapAggregator = new MapAggregator(namedAggregators.mapValuesNow(_._1).asJava.asInstanceOf[JMap[String, Aggregator]])
val input = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._2), objType = Typed.typedClass[JMap[_, _]])
val el = namedAggregators.mapValuesNow(_._3).asJava
val stored = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._4).toList, objType = Typed.genericTypeClass(classOf[Map[_, _]], List(Typed[String], Unknown)))
val output = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._5).toList, objType = Typed.genericTypeClass(classOf[JMap[_, _]], List(Typed[String], Unknown)))
val stored = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._4), objType = Typed.genericTypeClass(classOf[Map[_, _]], List(Typed[String], Unknown)))
val output = TypedObjectTypingResult(namedAggregators.mapValuesNow(_._5), objType = Typed.genericTypeClass(classOf[JMap[_, _]], List(Typed[String], Unknown)))
checkAggregator(mapAggregator, input, el, stored, output)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ import java.time.{Duration, OffsetDateTime, ZoneId}
import java.util
import java.util.Arrays.asList
import scala.jdk.CollectionConverters._
import scala.collection.immutable.ListMap
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

import java.util.TimeZone

class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Inside {

Expand All @@ -59,7 +57,7 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
validateOk("#AGG.set", "#input.str", Typed.fromDetailedType[java.util.Set[String]])
validateOk("#AGG.map({f1: #AGG.sum, f2: #AGG.set})",
"{f1: #input.eId, f2: #input.str}",
TypedObjectTypingResult(ListMap("f1" -> Typed[java.lang.Long], "f2" -> Typed.fromDetailedType[java.util.Set[String]])))
TypedObjectTypingResult(Map("f1" -> Typed[java.lang.Long], "f2" -> Typed.fromDetailedType[java.util.Set[String]])))

validateError("#AGG.sum", "#input.str", "Invalid aggregate type: String, should be: Number")
validateError("#AGG.map({f1: #AGG.set, f2: #AGG.set})", "{f1: #input.str}", "Fields do not match, aggregateBy: f1, aggregator: f1, f2")
Expand Down Expand Up @@ -165,7 +163,6 @@ class TransformersTest extends AnyFunSuite with FlinkSpec with Matchers with Ins
}

test("set tumbling aggregate") {
val id1 = ZoneId.of("Europe/Warsaw")
val id = "1"
val model = modelData(List(
TestRecordHours(id, 10, 1, "a"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TypingResultAwareTypeInformationDetection(customisation: TypingResultAware
)

def forContext(validationContext: ValidationContext): TypeInformation[Context] = {
val variables = forType(TypedObjectTypingResult(validationContext.localVariables.toList, Typed.typedClass[Map[String, AnyRef]]))
val variables = forType(TypedObjectTypingResult(validationContext.localVariables, Typed.typedClass[Map[String, AnyRef]]))
.asInstanceOf[TypeInformation[Map[String, Any]]]
val parentCtx = validationContext.parent.map(forContext)

Expand Down
Loading

0 comments on commit a77e110

Please sign in to comment.