Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance optimization for aggregates: do not save context of varia… #1886

Merged
merged 7 commits into from
Jul 15, 2021
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Nussknacker versions
* [#1772](https://github.com/TouK/nussknacker/pull/1772) Fix for Spel validation when we try use not existing method reference
* [#1741](https://github.com/TouK/nussknacker/pull/1741) KafkaExceptionConsumer can be configured to send errors to Kafka
* [#1809](https://github.com/TouK/nussknacker/pull/1809) Performance optimization for aggregates: do not update state if added element is neutral for current state
* [#1886](https://github.com/TouK/nussknacker/pull/1886) Performance optimization for aggregates: do not save context in state. Added `#AGG` utility for easier switching from simple aggregating functions like `'Sum'` to more complex `#AGG.map()`
* [#1820](https://github.com/TouK/nussknacker/pull/1820) Added missing support for some logical types (LocalDate, LocalTime, UUID) in json encoding
* [#1799](https://github.com/TouK/nussknacker/pull/1799) ConfluentAvroToJsonFormatter produces and reads test data in valid json format with full kafka metadata and schema ids.
* [#1839](https://github.com/TouK/nussknacker/pull/1839) Set up `explicitUidInStatefulOperators` model's flag to `true` by default.
Expand Down
10 changes: 7 additions & 3 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,15 @@ To see biggest differences please consult the [changelog](Changelog.md).
```
{"keySchemaId":null,"valueSchemaId":1,"consumerRecord":{"key":null,"value":{"first":"Jan","last":"Kowalski"},"topic":"testAvroRecordTopic1","partition":0,"offset":0,"timestamp":1624279687756,"timestampType":"CreateTime","headers":{},"leaderEpoch":0}}
```
* [#1663](https://github.com/TouK/nussknacker/pull/1663) Default `FlinkExceptionHandler` implementations are deprecated, use `ConfigurableExceptionHandler` instead.
* [#1731](https://github.com/TouK/nussknacker/pull/1731) RockDB config's flag `incrementalCheckpoints` is turned on by default.
* [#1825](https://github.com/TouK/nussknacker/pull/1825) Default dashboard renamed from `flink-esp` to `nussknacker-scenario`
* [#1836](https://github.com/TouK/nussknacker/pull/1836) Change default `kafka.consumerGroupNamingStrategy` to `processId-nodeId`.
* [#1357](https://github.com/TouK/nussknacker/pull/1357) Run mode added to nodes. `ServiceInvoker` interface was extended with new, implicit `runMode` parameter.
* [#1836](https://github.com/TouK/nussknacker/pull/1836) Change default `kafka.consumerGroupNamingStrategy` to `processId-nodeId`.
* [#1886](https://github.com/TouK/nussknacker/pull/1886) aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now
doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted).
If you want to emit some information other than aggregated value and key (availabled via new `#key` variable), you should use `#AGG.map` expression in `aggregateBy`.

## In version 0.3.0

Expand Down Expand Up @@ -173,10 +179,8 @@ that will be hidden before parameter's evaluation
- Added methods: `cancelJob`, `submitJob`, `listJobs`, `runningJobs` to `FlinkMiniClusterHolder`
- Deprecated: `runningJobs`, from `MiniClusterExecutionEnvironment`
- Removed: `getClusterClient` from `FlinkMiniClusterHolder` interface, because of flink compatibility at Flink 1.9
- Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager`
- Renamed: `FlinkStreamingProcessRegistrar` to `FlinkProcessManager`
* [#1303](https://github.com/TouK/nussknacker/pull/1303) TypedObjectTypingResult has a new field: additionalInfo
* [#1663](https://github.com/TouK/nussknacker/pull/1663) Default `FlinkExceptionHandler` implementations are deprecated, use `ConfigurableExceptionHandler` instead.
* [#1731](https://github.com/TouK/nussknacker/pull/1731) RockDB config's flag `incrementalCheckpoints` is turned on by default.

## In version 0.2.0

Expand Down
1 change: 1 addition & 0 deletions docs/designingProcesses/Spel.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,4 @@ If you need to invoke the same method in many places, probably the best solution
| `CONV` | General conversion functions |
| `DATE` | Date operations (parsing, printing) |
| `UTIL` | Various utilities (e.g. identifier generation) |
| `AGG` | Aggregator functions |
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
package pl.touk.nussknacker.engine.api

import java.util.UUID
import scala.util.Random

object Context {

// prefix is to distinguish between externally provided and internal (initially created) id
private val initialContextIdPrefix = "initial-"

/**
* For performance reasons, is used unsecure random - see UUIDBenchmark for details. In this case random correlation id
* is used only for internal purpose so is not important in security context.
*/
private val random = new Random()

/**
* Should be used for newly created context - when there is no suitable external correlation / tracing id
*/
def withInitialId: Context = {
Context(initialContextIdPrefix + new UUID(random.nextLong(), random.nextLong()).toString)
}

def apply(id: String) : Context = Context(id, Map.empty, None)

}

case class ContextId(value: String)

/**
* Context is container for variables used in expression evaluation
* @param id correlation id/trace id used for tracing (logs, error presentation) and for tests mechanism, it should be always defined
* @param variables variables available in evaluation
* @param parentContext context used for scopes handling, mainly for subprocess invocation purpose
*/
case class Context(id: String, variables: Map[String, Any], parentContext: Option[Context]) {

def apply[T](name: String): T =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ object VariableConstants {
final val InputMetaVariableName = "inputMeta"
final val MetaVariableName = "meta"
final val OutputVariableName = "output"
final val KeyVariableName = "key"

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ sealed trait AbstractContextTransformation {
* ContextTransformation
* .definedBy(_.withVariable("foo", Typed[String])
* .implementedBy { () =>
* Future.success(Context("").withVariable("foo", "bar")
* Future.success(Context.withRandomId.withVariable("foo", "bar")
* }
* `
*/
Expand Down Expand Up @@ -128,4 +128,4 @@ trait JoinContextTransformationDef extends AbstractContextTransformationDef {
JoinContextTransformationDef.this.transform(contextByBranchId).andThen(nextTransformation.transform)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ case class ValidationContext(localVariables: Map[String, TypingResult] = Map.emp
def withVariable(outputVar: OutputVar, value: TypingResult)(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] =
withVariable(outputVar.outputName, value, Some(outputVar.fieldName))

def withVariableOverriden(name: String, value: TypingResult, paramName: Option[String])
(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, ValidationContext] = {
validateVariableFormat(name, paramName)
.map(_ => copy(localVariables = localVariables + (name -> value)))
}

private def validateVariableExists(name: String, paramName: Option[String])(implicit nodeId: NodeId): ValidatedNel[PartSubGraphCompilationError, String] =
if (variables.contains(name)) Invalid(OverwrittenVariable(name, paramName)).toValidatedNel else Valid(name)

Expand Down Expand Up @@ -90,4 +96,4 @@ object OutputVar {

def customNode(outputName: String): OutputVar =
OutputVar(CustomNodeFieldName, outputName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pl.touk.nussknacker.engine.benchmarks.spel

import org.openjdk.jmh.annotations.{Benchmark, BenchmarkMode, Mode, OutputTimeUnit, Scope, State, Threads}
import pl.touk.nussknacker.engine.api.Context

import java.util.UUID
import java.util.concurrent.TimeUnit

/**
* results:
* - UUIDBenchmark.secureUUIDTest avgt 25 9.873 ± 0.467 us/op
* - UUIDBenchmark.unsecureUUIDTest avgt 25 6.227 ± 0.130 us/op
*/
@State(Scope.Benchmark)
class UUIDBenchmark {

@Threads(10)
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
def unsecureUUIDTest(): AnyRef = {
Context.withInitialId
}

@Threads(10)
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
def secureUUIDTest(): AnyRef = {
Context(UUID.randomUUID().toString)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import pl.touk.nussknacker.engine.avro.sink.{KafkaAvroSinkFactory, KafkaAvroSink
import pl.touk.nussknacker.engine.avro.source.KafkaAvroSourceFactory
import pl.touk.nussknacker.engine.flink.util.exception.ConfigurableExceptionHandlerFactory
import pl.touk.nussknacker.engine.flink.util.sink.EmptySink
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.AggregateHelper
import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.sampleTransformers.{SessionWindowAggregateTransformer, SlidingAggregateTransformerV2, TumblingAggregateTransformer}
import pl.touk.nussknacker.engine.flink.util.transformer.outer.OuterJoinTransformer
import pl.touk.nussknacker.engine.flink.util.transformer.{DelayTransformer, PeriodicSourceFactory, PreviousValueTransformer, UnionTransformer, UnionWithMemoTransformer}
Expand Down Expand Up @@ -67,7 +68,8 @@ class GenericConfigCreator extends EmptyProcessConfigCreator {
"NUMERIC" -> defaultCategory(numeric),
"CONV" -> defaultCategory(conversion),
"DATE" -> defaultCategory(date),
"UTIL" -> defaultCategory(util)
"UTIL" -> defaultCategory(util),
"AGG" -> defaultCategory(new AggregateHelper)
),
List()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@
import pl.touk.nussknacker.engine.api.editor.DualEditorMode;
import scala.collection.JavaConverters;

// This class is in Java, because constants are used in expressions in editors - see
// `pl.touk.nussknacker.engine.flink.util.transformer.aggregate.SlidingAggregateTransformerV2`. and scala objects are
// not good for that. Be aware that. If you add some new aggregator please add it also there to make sure that it will
// be available in selectbox.
/**
* This class is in Java, because constants are used in expressions in editors - see
* `pl.touk.nussknacker.engine.flink.util.transformer.aggregate.SlidingAggregateTransformerV2`. and scala objects are
* not good for that. Be aware that. If you add some new aggregator please add it also there to make sure that it will
* be available in selectbox.
*
* You should define `#AGG` global variable, because it is used in editors.
*/
public class AggregateHelper {

public static final SimpleParameterEditor SIMPLE_EDITOR = new FixedValuesParameterEditor(JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").FIRST", "First"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").LAST", "Last"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").MIN", "Min"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").MAX", "Max"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").SUM", "Sum"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").LIST", "List"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").SET", "Set"),
new FixedExpressionValue("T(" + AggregateHelper.class.getName() + ").APPROX_CARDINALITY", "ApproximateSetCardinality"))).asScala().toList());
new FixedExpressionValue("#AGG.sum", "First"),
new FixedExpressionValue("#AGG.last", "Last"),
new FixedExpressionValue("#AGG.min", "Min"),
new FixedExpressionValue("#AGG.max", "Max"),
new FixedExpressionValue("#AGG.sum", "Sum"),
new FixedExpressionValue("#AGG.list", "List"),
new FixedExpressionValue("#AGG.set", "Set"),
new FixedExpressionValue("#AGG.approxCardinality", "ApproximateSetCardinality"))).asScala().toList());

public static final DualParameterEditor DUAL_EDITOR = new DualParameterEditor(SIMPLE_EDITOR, DualEditorMode.SIMPLE);

Expand Down Expand Up @@ -55,7 +59,7 @@ public class AggregateHelper {

public Aggregator approxCardinality = APPROX_CARDINALITY;

public Aggregator map(@ParamName("parts" ) Map<String, Aggregator> parts) {
public Aggregator map(@ParamName("parts") Map<String, Aggregator> parts) {
return new aggregates.MapAggregator(parts);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CannotCreateObjectError, NodeId}
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.flink.util.keyed.KeyEnricher

/*
This class serves two purposes:
Expand Down Expand Up @@ -49,9 +50,14 @@ abstract class Aggregator extends AggregateFunction[AnyRef, AnyRef, AnyRef] {

override final def merge(a: AnyRef, b: AnyRef): AnyRef = mergeAggregates(a.asInstanceOf[Aggregate], b.asInstanceOf[Aggregate])

final def toContextTransformation(variableName: String, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx => computeOutputType(aggregateBy.returnType)
final def toContextTransformation(variableName: String, emitContext: Boolean, aggregateBy: LazyParameter[_])(implicit nodeId: NodeId):
ValidationContext => ValidatedNel[ProcessCompilationError, ValidationContext] = validationCtx =>
computeOutputType(aggregateBy.returnType)
//TODO: better error?
.leftMap(message => NonEmptyList.of(CannotCreateObjectError(message, nodeId.id)))
.andThen(validationCtx.withVariable(variableName, _, paramName = None))
.andThen { outputType =>
val ctx = if (emitContext) validationCtx else ValidationContext.empty
ctx.withVariable(variableName, outputType, paramName = None)
}.andThen(KeyEnricher.contextTransformation)

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import java.util.concurrent.TimeUnit

import cats.data.NonEmptyList
import com.codahale.metrics.{Histogram, SlidingTimeWindowReservoir}
import org.apache.flink.api.common.functions.RuntimeContext
Expand All @@ -17,7 +16,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.{ValueWithContext, Context => NkContext}
import pl.touk.nussknacker.engine.flink.api.state.{LatelyEvictableStateFunction, StateHolder}
import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyedValue
import pl.touk.nussknacker.engine.flink.util.keyed.{KeyEnricher, StringKeyedValue}
import pl.touk.nussknacker.engine.flink.util.metrics.MetricUtils
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap
import pl.touk.nussknacker.engine.flink.util.orderedmap.FlinkRangeMap._
Expand Down Expand Up @@ -90,7 +89,8 @@ trait AggregatorFunctionMixin[MapT[K,V]] { self: StateHolder[MapT[Long, AnyRef]]
val newState = addElementToState(value, timestamp, timeService, out)
val finalVal = computeFinalValue(newState, timestamp)
timeHistogram.update(System.nanoTime() - start)
out.collect(ValueWithContext(finalVal, value.context))
out.collect(ValueWithContext(finalVal, KeyEnricher.enrichWithKey(value.context, value.value)))

}

protected def addElementToState(value: ValueWithContext[StringKeyedValue[AnyRef]],
Expand Down
Loading