Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance optimization for aggregates: do not save context of varia… #1886

Merged
merged 7 commits into from
Jul 15, 2021
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Nussknacker versions
* [#1772](https://github.com/TouK/nussknacker/pull/1772) Fix for Spel validation when we try use not existing method reference
* [#1741](https://github.com/TouK/nussknacker/pull/1741) KafkaExceptionConsumer can be configured to send errors to Kafka
* [#1809](https://github.com/TouK/nussknacker/pull/1809) Performance optimization for aggregates: do not update state if added element is neutral for current state
* [#1886](https://github.com/TouK/nussknacker/pull/1886) Performance optimization for aggregates: do not save context in state
* [#1820](https://github.com/TouK/nussknacker/pull/1820) Added missing support for some logical types (LocalDate, LocalTime, UUID) in json encoding
* [#1799](https://github.com/TouK/nussknacker/pull/1799) ConfluentAvroToJsonFormatter produces and reads test data in valid json format with full kafka metadata and schema ids.
* [#1839](https://github.com/TouK/nussknacker/pull/1839) Set up `explicitUidInStatefulOperators` model's flag to `true` by default.
Expand Down
3 changes: 3 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ To see biggest differences please consult the [changelog](Changelog.md).
- `KafkaAvroValueDeserializationSchemaFactory` (source requires deserialization to `ConsumerRecord[K, V]`, there are only deserializers based on `KafkaAvroKeyValueDeserializationSchemaFactory`)
- `ConfluentKafkaAvroDeserializationSchemaFactory`, use `ConfluentKeyValueKafkaAvroDeserializationFactory`
- `TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory`, this approach is deprecated due to #inputMeta variable that contains key data
* [#1886](https://github.com/TouK/nussknacker/pull/1886) aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now
doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted).
If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`.

To migrate `KafkaAvroSourceFactory`:
- Provide `KafkaConfig` with correct `useStringForKey` flag value. By default we want to EvictableStatehandle keys as ordinary String and all topics related to such config require only value schema definitions (key schemas are ignored).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ object VariableConstants {
final val InputMetaVariableName = "inputMeta"
final val MetaVariableName = "meta"
final val OutputVariableName = "output"
final val KeyVariableName = "key"

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ case class ValidationContext(localVariables: Map[String, TypingResult] = Map.emp
def withVariable(outputVar: OutputVar, value: TypingResult)(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] =
withVariable(outputVar.outputName, value, Some(outputVar.fieldName))

def withVariableOverriden(name: String, value: TypingResult, paramName: Option[String])
(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] = {
validateVariableFormat(name, paramName)
.map(_ => copy(localVariables = localVariables + (name -> value)))
}

private def validateVariableExists(name: String, paramName: Option[String])(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, String] =
if (variables.contains(name)) Invalid(OverwrittenVariable(name, paramName)).toValidatedNel else Valid(name)

Expand Down Expand Up @@ -90,4 +96,4 @@ object OutputVar {

def customNode(outputName: String): OutputVar =
OutputVar(CustomNodeFieldName, outputName)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, NodeId}
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.flink.util.keyed.KeyEnricher

/*
This class serves two purposes:
Expand Down Expand Up @@ -49,9 +50,14 @@ abstract class Aggregator extends AggregateFunction[AnyRef, AnyRef, AnyRef] {

override final def merge(a: AnyRef, b: AnyRef): AnyRef = mergeAggregates(a.asInstanceOf[Aggregate], b.asInstanceOf[Aggregate])

final def toContextTransformation(variableName: String, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx => computeOutputType(aggregateBy.returnType)
final def toContextTransformation(variableName: String, emitContext: Boolean, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx =>
computeOutputType(aggregateBy.returnType)
//TODO: better error?
.leftMap(message => NonEmptyList.of(CannotCreateObjectError(message, nodeId.id)))
.andThen(validationCtx.withVariable(variableName, _, paramName = None))
.andThen { outputType =>
val ctx = if (emitContext) validationCtx else ValidationContext.empty
ctx.withVariable(variableName, outputType, paramName = None)
}.andThen(KeyEnricher.contextTransformation)

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import java.util.concurrent.TimeUnit

import cats.data.NonEmptyList
import com.codahale.metrics.{Histogram, SlidingTimeWindowReservoir}
import org.apache.flink.api.common.functions.RuntimeContext
Expand All @@ -17,7 +16,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.{ValueWithContext, Context => NkContext}
import pl.touk.nussknacker.engine.flink.api.state.{LatelyEvictableStateFunction, StateHolder}
import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyedValue
import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue}
import pl.touk.nussknacker.engine.flink.util.metrics.MetricUtils
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap._
Expand Down Expand Up @@ -45,7 +44,7 @@ class AggregatorFunction[MapT[K,V]](protected val aggregator: Aggregator, protec

}

trait AggregatorFunctionMixin[MapT[K,V]] { self: StateHolder[MapT[Long, AnyRef]] =>
trait AggregatorFunctionMixin[MapT[K,V]] extends KeyEnricher { self: StateHolder[MapT[Long, AnyRef]] =>

def getRuntimeContext: RuntimeContext

Expand Down Expand Up @@ -90,7 +89,7 @@ trait AggregatorFunctionMixin[MapT[K,V]] { self: StateHolder[MapT[Long, AnyRef]]
val newState = addElementToState(value, timestamp, timeService, out)
val finalVal = computeFinalValue(newState, timestamp)
timeHistogram.update(System.nanoTime() - start)
out.collect(ValueWithContext(finalVal, value.context))
out.collect(ValueWithContext(finalVal, enrichWithKey(value.context, value.value)))
}

protected def addElementToState(value: ValueWithContext[StringKeyedValue[AnyRef]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v
(implicit override val rangeMap: FlinkRangeMap[MapT])
extends KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]
with StateHolder[MapT[Long, AnyRef]]
with AggregatorFunctionMixin[MapT] with AddedElementContextStateHolder[MapT] {
with AggregatorFunctionMixin[MapT] {

type FlinkCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#Context
type FlinkOnTimerCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#OnTimerContext
Expand All @@ -38,7 +38,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getState(stateDescriptor)
addedElementContext = getRuntimeContext.getState(addedElementContextDescriptor)
}

override protected val minimalResolutionMs: Long = timeWindowLengthMillis
Expand All @@ -49,20 +48,15 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override protected def handleElementAddedToState(newElementInStateTimestamp: Long, newElement: aggregator.Element, nkCtx: NkContext,
timerService: TimerService, out: Collector[ValueWithContext[AnyRef]]): Unit = {
addedElementContext.update(readAddedElementContextOrInitial().updated(newElementInStateTimestamp, nkCtx))
timerService.registerEventTimeTimer(newElementInStateTimestamp + timeWindowLengthMillis)
}

override def onTimer(timestamp: Long, ctx: FlinkOnTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
val currentStateValue = readStateOrInitial()
val previousTimestamp = timestamp - timeWindowLengthMillis
val currentStateValue = readStateOrInitial()
val finalVal = computeFinalValue(currentStateValue, previousTimestamp)
out.collect(ValueWithContext(finalVal, enrichWithKey(NkContext(""), ctx.getCurrentKey)))

readAddedElementContextOrInitial().toRO(previousTimestamp).toScalaMapRO.lastOption.foreach {
case (_, nkCtx) =>
val finalVal = computeFinalValue(currentStateValue, previousTimestamp)
out.collect(ValueWithContext(finalVal, nkCtx))
}

val previousTimestampStateAndRest = stateForTimestampToReadUntilEnd(currentStateValue, previousTimestamp)
if (previousTimestampStateAndRest.toScalaMapRO.isEmpty) {
evictStates()
Expand All @@ -73,7 +67,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override protected def updateState(stateValue: MapT[Long, AnyRef], stateValidity: Long, timeService: TimerService): Unit = {
state.update(stateValue)
invalidateAddedElementContextState(stateValue)
}

override protected def doMoveEvictionTime(time: Long, timeService: TimerService): Unit = {
Expand All @@ -82,7 +75,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

protected def evictStates(): Unit = {
state.clear()
addedElementContext.clear()
}

override protected def readState(): MapT[Long, AnyRef] = state.value()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K,V]](protected val aggregator: A
override protected val aggregateTypeInformation: TypeInformation[AnyRef])
(implicit override val rangeMap: FlinkRangeMap[MapT])
extends LatelyEvictableStateFunction[ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef], MapT[Long, AnyRef]]
with AggregatorFunctionMixin[MapT] with AddedElementContextStateHolder[MapT] {
with AggregatorFunctionMixin[MapT] {

type FlinkCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#Context
type FlinkOnTimerCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#OnTimerContext

override def open(parameters: Configuration): Unit = {
super.open(parameters)
addedElementContext = getRuntimeContext.getState(addedElementContextDescriptor)
}

override def processElement(value: ValueWithContext[StringKeyedValue[AnyRef]], ctx: FlinkCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
Expand All @@ -39,39 +38,24 @@ class EmitWhenEventLeftAggregatorFunction[MapT[K,V]](protected val aggregator: A

override protected def handleElementAddedToState(newElementInStateTimestamp: Long, newElement: aggregator.Element, nkCtx: NkContext,
timerService: TimerService, out: Collector[ValueWithContext[AnyRef]]): Unit = {
addedElementContext.update(readAddedElementContextOrInitial().updated(newElementInStateTimestamp, nkCtx))
timerService.registerEventTimeTimer(newElementInStateTimestamp + timeWindowLengthMillis)
}

override def onTimer(timestamp: Long, ctx: FlinkOnTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
val currentStateValue = readStateOrInitial()
handleElementLeftSlide(currentStateValue, timestamp, ctx.timerService(), out)
handleElementLeftSlide(currentStateValue, timestamp, ctx, out)
super.onTimer(timestamp, ctx, out)
}

protected def handleElementLeftSlide(currentStateValue: MapT[Long, aggregator.Aggregate], timestamp: Long,
timerService: TimerService, out: Collector[ValueWithContext[AnyRef]]): Unit = {
ctx: FlinkOnTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
val stateForRecentlySentEvent = currentStateValue.toScalaMapRO.lastOption.map {
case (lastTimestamp, _) => stateForTimestampToReadUntilEnd(currentStateValue, lastTimestamp) // shouldn't we save somewhere recently sent timestamp?
case (lastTimestamp, _) => stateForTimestampToReadUntilEnd(currentStateValue, lastTimestamp) // shouldn't we save somewhere recently sent timestamp?
}.getOrElse(currentStateValue)
for {
lastEntryToRemove <- stateForRecentlySentEvent.toRO(timestamp - timeWindowLengthMillis).toScalaMapRO.lastOption
(lastTimestampToRemove, _) = lastEntryToRemove
matchingContext <- readAddedElementContextOrInitial().toScalaMapRO.get(lastTimestampToRemove)
} {
if (stateForRecentlySentEvent.toRO(timestamp - timeWindowLengthMillis).toScalaMapRO.nonEmpty) {
val finalVal = computeFinalValue(currentStateValue, timestamp)
out.collect(ValueWithContext(finalVal, matchingContext))
out.collect(ValueWithContext(finalVal, enrichWithKey(NkContext(""), ctx.getCurrentKey)))
}
}

override protected def updateState(stateValue: MapT[Long, AnyRef], stateValidity: Long, timeService: TimerService): Unit = {
super.updateState(stateValue, stateValidity, timeService)
invalidateAddedElementContextState(stateValue)
}

override protected def evictStates(): Unit = {
super.evictStates()
addedElementContext.clear()
}

}

This file was deleted.

Loading