Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance optimization for aggregates: do not save context of varia… #1886

Merged
merged 7 commits into from
Jul 15, 2021
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ Nussknacker versions
* [#1772](https://github.com/TouK/nussknacker/pull/1772) Fix for Spel validation when we try use not existing method reference
* [#1741](https://github.com/TouK/nussknacker/pull/1741) KafkaExceptionConsumer can be configured to send errors to Kafka
* [#1809](https://github.com/TouK/nussknacker/pull/1809) Performance optimization for aggregates: do not update state if added element is neutral for current state
* [#1886](https://github.com/TouK/nussknacker/pull/1886) Performance optimization for aggregates: do not save context in state
* [#1820](https://github.com/TouK/nussknacker/pull/1820) Added missing support for some logical types (LocalDate, LocalTime, UUID) in json encoding
* [#1799](https://github.com/TouK/nussknacker/pull/1799) ConfluentAvroToJsonFormatter produces and reads test data in valid json format with full kafka metadata and schema ids.
* [#1839](https://github.com/TouK/nussknacker/pull/1839) Set up `explicitUidInStatefulOperators` model's flag to `true` by default.
* [#1886](https://github.com/TouK/nussknacker/pull/1886) Added `#AGG` utility for easier switching from simple aggregating functions like `'Sum'` to more complex `#AGG.map()`

0.3.1 (not released yet)
------------------------
Expand Down
11 changes: 7 additions & 4 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,13 @@ To see biggest differences please consult the [changelog](Changelog.md).
```
{"keySchemaId":null,"valueSchemaId":1,"consumerRecord":{"key":null,"value":{"first":"Jan","last":"Kowalski"},"topic":"testAvroRecordTopic1","partition":0,"offset":0,"timestamp":1624279687756,"timestampType":"CreateTime","headers":{},"leaderEpoch":0}}
```
* [#1663](https://github.com/TouK/nussknacker/pull/1663) Default `FlinkExceptionHandler` implementations are deprecated, use `ConfigurableExceptionHandler` instead.
* [#1731](https://github.com/TouK/nussknacker/pull/1731) RockDB config's flag `incrementalCheckpoints` is turned on by default.
* [#1825](https://github.com/TouK/nussknacker/pull/1825) Default dashboard renamed from `flink-esp` to `nussknacker-scenario`
* [#1836](https://github.com/TouK/nussknacker/pull/1836) Change default `kafka.consumerGroupNamingStrategy` to `processId-nodeId`.
* [#1836](https://github.com/TouK/nussknacker/pull/1836) Change default `kafka.consumerGroupNamingStrategy` to `processId-nodeId`.
* [#1886](https://github.com/TouK/nussknacker/pull/1886) aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now
doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted).
If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`.

## In version 0.3.0

Expand Down Expand Up @@ -172,10 +177,8 @@ that will be hidden before parameter's evaluation
- Added methods: `cancelJob`, `submitJob`, `listJobs`, `runningJobs` to `FlinkMiniClusterHolder`
- Deprecated: `runningJobs`, from `MiniClusterExecutionEnvironment`
- Removed: `getClusterClient` from `FlinkMiniClusterHolder` interface, because of flink compatibility at Flink 1.9
- Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager`
- Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager`
* [#1303](https://github.com/TouK/nussknacker/pull/1303) TypedObjectTypingResult has a new field: additionalInfo
* [#1663](https://github.com/TouK/nussknacker/pull/1663) Default `FlinkExceptionHandler` implementations are deprecated, use `ConfigurableExceptionHandler` instead.
* [#1731](https://github.com/TouK/nussknacker/pull/1731) RockDB config's flag `incrementalCheckpoints` is turned on by default.

## In version 0.2.0

Expand Down
1 change: 1 addition & 0 deletions docs/designingProcesses/Spel.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,4 @@ If you need to invoke the same method in many places, probably the best solution
| `CONV` | General conversion functions |
| `DATE` | Date operations (parsing, printing) |
| `UTIL` | Various utilities (e.g. identifier generation) |
| `AGG` | Aggregator functions |
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ object VariableConstants {
final val InputMetaVariableName = "inputMeta"
final val MetaVariableName = "meta"
final val OutputVariableName = "output"
final val KeyVariableName = "key"

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ case class ValidationContext(localVariables: Map[String, TypingResult] = Map.emp
def withVariable(outputVar: OutputVar, value: TypingResult)(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] =
withVariable(outputVar.outputName, value, Some(outputVar.fieldName))

def withVariableOverriden(name: String, value: TypingResult, paramName: Option[String])
(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] = {
validateVariableFormat(name, paramName)
.map(_ => copy(localVariables = localVariables + (name -> value)))
}

private def validateVariableExists(name: String, paramName: Option[String])(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, String] =
if (variables.contains(name)) Invalid(OverwrittenVariable(name, paramName)).toValidatedNel else Valid(name)

Expand Down Expand Up @@ -90,4 +96,4 @@ object OutputVar {

def customNode(outputName: String): OutputVar =
OutputVar(CustomNodeFieldName, outputName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import pl.touk.nussknacker.engine.avro.sink.{KafkaAvroSinkFactory, KafkaAvroSink
import pl.touk.nussknacker.engine.avro.source.KafkaAvroSourceFactory
import pl.touk.nussknacker.engine.flink.util.exception.ConfigurableExceptionHandlerFactory
import pl.touk.nussknacker.engine.flink.util.sink.EmptySink
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.AggregateHelper
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.sampleTransformers.{SessionWindowAggregateTransformer, SlidingAggregateTransformerV2, TumblingAggregateTransformer}
import pl.touk.nussknacker.engine.flink.util.transformer.outer.OuterJoinTransformer
import pl.touk.nussknacker.engine.flink.util.transformer.{DelayTransformer, PeriodicSourceFactory, PreviousValueTransformer, UnionTransformer, UnionWithMemoTransformer}
Expand Down Expand Up @@ -67,7 +68,8 @@ class GenericConfigCreator extends EmptyProcessConfigCreator {
"NUMERIC" -> defaultCategory(numeric),
"CONV" -> defaultCategory(conversion),
"DATE" -> defaultCategory(date),
"UTIL" -> defaultCategory(util)
"UTIL" -> defaultCategory(util),
"AGG" -> defaultCategory(new AggregateHelper)
),
List()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
public class AggregateHelper {

public static final SimpleParameterEditor SIMPLE_EDITOR = new FixedValuesParameterEditor(JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").FIRST", "First"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").LAST", "Last"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").MIN", "Min"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").MAX", "Max"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").SUM", "Sum"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").LIST", "List"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").SET", "Set"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").APPROX_CARDINALITY", "ApproximateSetCardinality"))).asScala().toList());
new FixedExpressionValue("#AGG.sum", "First"),
new FixedExpressionValue("#AGG.last", "Last"),
new FixedExpressionValue("#AGG.min", "Min"),
new FixedExpressionValue("#AGG.max", "Max"),
new FixedExpressionValue("#AGG.sum", "Sum"),
new FixedExpressionValue("#AGG.list", "List"),
new FixedExpressionValue("#AGG.set", "Set"),
new FixedExpressionValue("#AGG.approxCardinality", "ApproximateSetCardinality"))).asScala().toList());

public static final DualParameterEditor DUAL_EDITOR = new DualParameterEditor(SIMPLE_EDITOR, DualEditorMode.SIMPLE);

Expand Down Expand Up @@ -55,7 +55,7 @@ public class AggregateHelper {

public Aggregator approxCardinality = APPROX_CARDINALITY;

public Aggregator map(@ParamName("parts" ) Map<String, Aggregator> parts) {
public Aggregator map(@ParamName("parts") Map<String, Aggregator> parts) {
return new aggregates.MapAggregator(parts);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, NodeId}
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.flink.util.keyed.KeyEnricher

/*
This class serves two purposes:
Expand Down Expand Up @@ -49,9 +50,14 @@ abstract class Aggregator extends AggregateFunction[AnyRef, AnyRef, AnyRef] {

override final def merge(a: AnyRef, b: AnyRef): AnyRef = mergeAggregates(a.asInstanceOf[Aggregate], b.asInstanceOf[Aggregate])

final def toContextTransformation(variableName: String, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx => computeOutputType(aggregateBy.returnType)
final def toContextTransformation(variableName: String, emitContext: Boolean, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx =>
computeOutputType(aggregateBy.returnType)
//TODO: better error?
.leftMap(message => NonEmptyList.of(CannotCreateObjectError(message, nodeId.id)))
.andThen(validationCtx.withVariable(variableName, _, paramName = None))
.andThen { outputType =>
val ctx = if (emitContext) validationCtx else ValidationContext.empty
ctx.withVariable(variableName, outputType, paramName = None)
}.andThen(KeyEnricher.contextTransformation)

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import java.util.concurrent.TimeUnit

import cats.data.NonEmptyList
import com.codahale.metrics.{Histogram, SlidingTimeWindowReservoir}
import org.apache.flink.api.common.functions.RuntimeContext
Expand All @@ -17,7 +16,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.{ValueWithContext, Context => NkContext}
import pl.touk.nussknacker.engine.flink.api.state.{LatelyEvictableStateFunction, StateHolder}
import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyedValue
import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue}
import pl.touk.nussknacker.engine.flink.util.metrics.MetricUtils
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap._
Expand Down Expand Up @@ -45,7 +44,7 @@ class AggregatorFunction[MapT[K,V]](protected val aggregator: Aggregator, protec

}

trait AggregatorFunctionMixin[MapT[K,V]] { self: StateHolder[MapT[Long, AnyRef]] =>
trait AggregatorFunctionMixin[MapT[K,V]] extends KeyEnricher { self: StateHolder[MapT[Long, AnyRef]] =>

def getRuntimeContext: RuntimeContext

Expand Down Expand Up @@ -90,7 +89,7 @@ trait AggregatorFunctionMixin[MapT[K,V]] { self: StateHolder[MapT[Long, AnyRef]]
val newState = addElementToState(value, timestamp, timeService, out)
val finalVal = computeFinalValue(newState, timestamp)
timeHistogram.update(System.nanoTime() - start)
out.collect(ValueWithContext(finalVal, value.context))
out.collect(ValueWithContext(finalVal, enrichWithKey(value.context, value.value)))
}

protected def addElementToState(value: ValueWithContext[StringKeyedValue[AnyRef]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v
(implicit override val rangeMap: FlinkRangeMap[MapT])
extends KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]
with StateHolder[MapT[Long, AnyRef]]
with AggregatorFunctionMixin[MapT] with AddedElementContextStateHolder[MapT] {
with AggregatorFunctionMixin[MapT] {

type FlinkCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#Context
type FlinkOnTimerCtx = KeyedProcessFunction[String, ValueWithContext[StringKeyedValue[AnyRef]], ValueWithContext[AnyRef]]#OnTimerContext
Expand All @@ -38,7 +38,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getState(stateDescriptor)
addedElementContext = getRuntimeContext.getState(addedElementContextDescriptor)
}

override protected val minimalResolutionMs: Long = timeWindowLengthMillis
Expand All @@ -49,20 +48,15 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override protected def handleElementAddedToState(newElementInStateTimestamp: Long, newElement: aggregator.Element, nkCtx: NkContext,
timerService: TimerService, out: Collector[ValueWithContext[AnyRef]]): Unit = {
addedElementContext.update(readAddedElementContextOrInitial().updated(newElementInStateTimestamp, nkCtx))
timerService.registerEventTimeTimer(newElementInStateTimestamp + timeWindowLengthMillis)
}

override def onTimer(timestamp: Long, ctx: FlinkOnTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
val currentStateValue = readStateOrInitial()
val previousTimestamp = timestamp - timeWindowLengthMillis
val currentStateValue = readStateOrInitial()
val finalVal = computeFinalValue(currentStateValue, previousTimestamp)
out.collect(ValueWithContext(finalVal, enrichWithKey(NkContext(""), ctx.getCurrentKey)))

readAddedElementContextOrInitial().toRO(previousTimestamp).toScalaMapRO.lastOption.foreach {
case (_, nkCtx) =>
val finalVal = computeFinalValue(currentStateValue, previousTimestamp)
out.collect(ValueWithContext(finalVal, nkCtx))
}

val previousTimestampStateAndRest = stateForTimestampToReadUntilEnd(currentStateValue, previousTimestamp)
if (previousTimestampStateAndRest.toScalaMapRO.isEmpty) {
evictStates()
Expand All @@ -73,7 +67,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

override protected def updateState(stateValue: MapT[Long, AnyRef], stateValidity: Long, timeService: TimerService): Unit = {
state.update(stateValue)
invalidateAddedElementContextState(stateValue)
}

override protected def doMoveEvictionTime(time: Long, timeService: TimerService): Unit = {
Expand All @@ -82,7 +75,6 @@ class EmitExtraWindowWhenNoDataTumblingAggregatorFunction[MapT[K,V]](protected v

protected def evictStates(): Unit = {
state.clear()
addedElementContext.clear()
}

override protected def readState(): MapT[Long, AnyRef] = state.value()
Expand Down
Loading