Skip to content

Commit

Permalink
Rename keyBy to groupBy (#1920)
Browse files Browse the repository at this point in the history
* Rename keyBy to groupBy

* Add migration

* Review

Co-authored-by: Damian Święcki <dsw@touk.pl>
  • Loading branch information
dswiecki and Damian Święcki authored Jul 21, 2021
1 parent 24a2ac4 commit c2d9e6d
Show file tree
Hide file tree
Showing 26 changed files with 220 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"nodeType" : "aggregate-sliding",
"parameters" : [
{
"name" : "keyBy",
"name" : "groupBy",
"expression" : {
"language" : "spel",
"expression" : "#input.clientId"
Expand Down
8 changes: 4 additions & 4 deletions docs/designingProcesses/FlinkCustomTransformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ subsequent events from any branch during `stateTimeout`. Produced object has val
![aggregate_window](../img/aggregate_window.png)

This element defines generic aggregation of values in sliding time window of given length. Parameters are:
- keyBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- groupBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- aggregator - type of aggregation (see below)
- aggregateBy - value which will be aggregated (e.g. `#input.callDuration`, `#input.productId`)
- windowLength - length of time window, window will cover range: (exclusive) now-windowLength, (inclusive) now
Expand Down Expand Up @@ -81,11 +81,11 @@ Example, for aggregate-tumbling node with length of 10 minutes, aggregation max,
![previous_value_window](../img/previous_value_window.png)

Simple transformation which stores arbitrary value for given key. This element has two parameters:
- keyBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- groupBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- value - stored value

So, for example, given stream of events which contain users with their current location, when we set
- keyBy is `#input.userId`
- groupBy is `#input.userId`
- value is `#input.location`
then the value of output variable is the previous location for current user. If this is the first appearance of this user,
**current** location will be returned
Expand All @@ -104,4 +104,4 @@ Events from MAIN branch will be enriched with output variable having aggregated

## Delay

Delay event processing for a given delay duration
Delay event processing for a given delay duration
2 changes: 1 addition & 1 deletion docs/operations_guide/Operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ Data serialization is one of the most costly operations in the Nussknacker/Flink



* Data is reshuffled between nodes during keyBy operations (e.g. before aggregations)
* Data is reshuffled between nodes during groupBy operations (e.g. before aggregations)
* Data is serialized to disk during state processing (e.g. aggregations)

Nussknacker needs to know the exact type of processed data during compilation of scenario. Avoid using sources and expressions which do not provide such information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ abstract class CustomStreamTransformer extends Component {
def canHaveManyInputs: Boolean = false

// For now it is only supported by Flink streaming runtime
def canBeEnding: Boolean = false
def canBeEnding: Boolean = false

}

/**
* Lazy parameter is representation of parameter of custom node which should be evaluated for each record:
* ```def execute(@ParamName("keyBy") keyBy: LazyParameter[String], @ParamName ("length") length: String)```
* In this case, length is computed as constant during process compilation, while keyBy is evaluated for each event
* ```def execute(@ParamName("groupBy") groupBy: LazyParameter[String], @ParamName ("length") length: String)```
* In this case, length is computed as constant during process compilation, while groupBy is evaluated for each event
* Cannot be evaluated directly (no method like 'evaluate'),
* as evaluation may need lifecycle handling, to use it see LazyParameterInterpreter
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.genericmodel.GenericMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.touk.nussknacker.genericmodel

import pl.touk.nussknacker.engine.migration.{ProcessMigration, ProcessMigrations}
import pl.touk.nussknacker.genericmodel.migrations.GroupByMigration

class GenericMigrations extends ProcessMigrations {

override def processMigrations: Map[Int, ProcessMigration] = ProcessMigrations.listOf(GroupByMigration).processMigrations
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.touk.nussknacker.genericmodel.migrations

import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.graph.node.{CustomNode, NodeData}
import pl.touk.nussknacker.engine.migration.NodeMigration

object GroupByMigration extends NodeMigration {

override val description = "GroupByMigration"

private val keyByParameterName = "keyBy"

override def failOnNewValidationError: Boolean = false

override def migrateNode(metadata: MetaData): PartialFunction[NodeData, NodeData] = {
case node@CustomNode(_, _, nodeType, parameters, _)
if parameters.exists(_.name == keyByParameterName) && nodeType.startsWith("aggregate-") =>
node.copy(parameters = node.parameters.map(p => if (p.name == keyByParameterName) p.copy(name = "groupBy") else p))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ object LastVariableFilterTransformer extends CustomStreamTransformer with Single

private val valueParameter = ParameterWithExtractor.lazyMandatory[AnyRef](valueParameterName)

private val keyByParameterName = "keyBy"
private val groupByParameterName = "groupBy"

private val keyByParameter = ParameterWithExtractor.lazyMandatory[String](keyByParameterName)
private val groupByParameter = ParameterWithExtractor.lazyMandatory[String](groupByParameterName)

private def conditionParameter(valueType: TypingResult) = Parameter(conditionParameterName, Typed[Boolean])
.copy(isLazyParameter = true, additionalVariables = Map("current" -> valueType, "previous" -> valueType))

type State = Nothing

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: ProcessCompilationError.NodeId): NodeTransformationDefinition = {
case TransformationStep(Nil, _) => NextParameters(keyByParameter.parameter :: valueParameter.parameter ::Nil)
case TransformationStep((`keyByParameterName`,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
case TransformationStep(Nil, _) => NextParameters(groupByParameter.parameter :: valueParameter.parameter ::Nil)
case TransformationStep((gropuByParameterName,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
//if we cannot determine value, we'll assume it's type is Unknown
case TransformationStep((`keyByParameterName`, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((`keyByParameterName`, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
}

override def initialParameters: List[Parameter] = List(keyByParameter.parameter, valueParameter.parameter, conditionParameter(Unknown))
override def initialParameters: List[Parameter] = List(groupByParameter.parameter, valueParameter.parameter, conditionParameter(Unknown))

override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency)

override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[State]): FlinkCustomStreamTransformation= {
val value = valueParameter.extractValue(params)
val condition = params(conditionParameterName).asInstanceOf[LazyParameter[java.lang.Boolean]]
val keyBy = keyByParameter.extractValue(params)
val groupBy = groupByParameter.extractValue(params)

FlinkCustomStreamTransformation((str: DataStream[Context], ctx: FlinkCustomNodeContext) => {
str
.map(new StringKeyedValueMapper(ctx.lazyParameterHelper, keyBy, value))
.map(new StringKeyedValueMapper(ctx.lazyParameterHelper, groupBy, value))
.keyBy(_.value.key)
.process(new ConditionalUpdateFunction(condition, ctx.lazyParameterHelper))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.apache.flink.streaming.api.scala._
case object StatefulTransformer extends CustomStreamTransformer with LazyLogging {

@MethodToInvoke
def execute(@ParamName("keyBy") keyBy: LazyParameter[String]): FlinkCustomStreamTransformation
def execute(@ParamName("groupBy") groupBy: LazyParameter[String]): FlinkCustomStreamTransformation
= FlinkCustomStreamTransformation((start: DataStream[Context], ctx: FlinkCustomNodeContext) => {
start
.map(ctx.lazyParameterHelper.lazyMapFunction(keyBy))
.map(ctx.lazyParameterHelper.lazyMapFunction(groupBy))
.keyBy(_.value)
.mapWithState[ValueWithContext[AnyRef], List[String]] { case (StringFromIr(ir, sr), oldState) =>
logger.info(s"received: $sr, current state: $oldState")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object StatefulSampleProcess {
.id(id)
.exceptionHandler()
.source("state", "oneSource")
.customNode("stateful", "stateVar", "stateful", "keyBy" -> "#input")
.customNode("stateful", "stateVar", "stateful", "groupBy" -> "#input")
.sink("end", "#stateVar": Expression, "kafka-string", "topic" -> s"'output-$id'")
}

Expand All @@ -35,7 +35,7 @@ object StatefulSampleProcess {
.exceptionHandler()
.source("state", "oneSource")
.customNode("transform", "aggregate", "aggregate",
"keyBy" -> "'test'",
"groupBy" -> "'test'",
"aggregator" -> s"#AGG.map({x: $aggegatorExpression})",
"aggregateBy" -> "{ x: 1 }",
"windowLength" -> "T(java.time.Duration).parse('PT1H')",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ case object PreviousValueTransformer extends CustomStreamTransformer with Explic
type Value = AnyRef

@MethodToInvoke(returnType = classOf[Value])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("gropuBy") groupBy: LazyParameter[CharSequence],
@ParamName("value") value: LazyParameter[Value])
= FlinkCustomStreamTransformation((start: DataStream[Context], ctx: FlinkCustomNodeContext) =>
setUidToNodeIdIfNeed(ctx,
start
.map(ctx.lazyParameterHelper.lazyMapFunction(keyBy))
.map(ctx.lazyParameterHelper.lazyMapFunction(groupBy))
.keyBy(vCtx => Option(vCtx.value).map(_.toString).orNull)
.map(new PreviousValueFunction(value, ctx.lazyParameterHelper))), value.returnType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.concurrent.duration._
object TransformStateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def invoke(@ParamName("key") key: LazyParameter[CharSequence],
def invoke(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("transformWhen") transformWhen: LazyParameter[java.lang.Boolean],
@AdditionalVariables(Array(new AdditionalVariable(name = "previous", clazz = classOf[AnyRef])))
@ParamName("newValue") newValue: LazyParameter[AnyRef],
Expand All @@ -44,7 +44,7 @@ object TransformStateTransformer extends CustomStreamTransformer with ExplicitUi
implicit val nctx: FlinkCustomNodeContext = nodeContext
setUidToNodeIdIfNeed(nodeContext,
stream
.keyBy(key)
.groupBy(groupBy)
.process(new TransformStateFunction[String](
nodeContext.lazyParameterHelper, transformWhen, newValue, stateTimeoutSeconds.seconds)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object sampleTransformers {
object SlidingAggregateTransformerV2 extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -48,7 +48,7 @@ object sampleTransformers {
@ParamName("emitWhenEventLeft") emitWhenEventLeft: Boolean,
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val windowDuration = Duration(length.toMillis, TimeUnit.MILLISECONDS)
transformers.slidingTransformer(keyBy, aggregateBy, aggregator, windowDuration, variableName, emitWhenEventLeft, explicitUidInStatefulOperators)
transformers.slidingTransformer(groupBy, aggregateBy, aggregator, windowDuration, variableName, emitWhenEventLeft, explicitUidInStatefulOperators)
}
}

Expand All @@ -60,7 +60,7 @@ object sampleTransformers {
object TumblingAggregateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -79,7 +79,7 @@ object sampleTransformers {
@ParamName("emitWhen") trigger: TumblingWindowTrigger,
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val windowDuration = Duration(length.toMillis, TimeUnit.MILLISECONDS)
transformers.tumblingTransformer(keyBy, aggregateBy, aggregator, windowDuration, variableName, trigger, explicitUidInStatefulOperators)
transformers.tumblingTransformer(groupBy, aggregateBy, aggregator, windowDuration, variableName, trigger, explicitUidInStatefulOperators)
}

}
Expand All @@ -92,7 +92,7 @@ object sampleTransformers {
object SessionWindowAggregateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -113,7 +113,7 @@ object sampleTransformers {
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val sessionTimeoutDuration = Duration(sessionTimeout.toMillis, TimeUnit.MILLISECONDS)
transformers.sessionWindowTransformer(
keyBy, aggregateBy, aggregator, sessionTimeoutDuration, endSessionCondition, trigger, variableName)
groupBy, aggregateBy, aggregator, sessionTimeoutDuration, endSessionCondition, trigger, variableName)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import scala.concurrent.duration.Duration
// in the future - see ExplicitUidInOperatorsCompat for more info
object transformers {

def slidingTransformer(keyBy: LazyParameter[CharSequence],
def slidingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
variableName: String)(implicit nodeId: NodeId): ContextTransformation =
slidingTransformer(keyBy, aggregateBy, aggregator, windowLength, variableName, emitWhenEventLeft = false,
slidingTransformer(groupBy, aggregateBy, aggregator, windowLength, variableName, emitWhenEventLeft = false,
ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)

def slidingTransformer(keyBy: LazyParameter[CharSequence],
def slidingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
Expand All @@ -52,22 +52,22 @@ object transformers {
else
new AggregatorFunction[SortedMap](aggregator, windowLength.toMillis, nodeId, aggregateBy.returnType, typeInfos.storedTypeInfo)
start
.keyByWithValue(keyBy, _ => aggregateBy)
.groupByWithValue(groupBy, _ => aggregateBy)
.process(aggregatorFunction)
.setUidWithName(ctx, explicitUidInStatefulOperators)
}))
}

def tumblingTransformer(keyBy: LazyParameter[CharSequence],
def tumblingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
tumblingTransformer(keyBy, aggregateBy, aggregator, windowLength, variableName, TumblingWindowTrigger.OnEnd,
tumblingTransformer(groupBy, aggregateBy, aggregator, windowLength, variableName, TumblingWindowTrigger.OnEnd,
ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)
}

def tumblingTransformer(keyBy: LazyParameter[CharSequence],
def tumblingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
Expand All @@ -83,7 +83,7 @@ object transformers {
val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy)

val keyedStream = start
.keyByWithValue(keyBy, _ => aggregateBy)
.groupByWithValue(groupBy, _ => aggregateBy)
(tumblingWindowTrigger match {
case TumblingWindowTrigger.OnEvent =>
keyedStream
Expand All @@ -106,7 +106,7 @@ object transformers {
}))

//Experimental component, API may change in the future
def sessionWindowTransformer(keyBy: LazyParameter[CharSequence],
def sessionWindowTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
sessionTimeout: Duration,
Expand All @@ -128,7 +128,7 @@ object transformers {
case SessionWindowTrigger.OnEnd => baseTrigger
}
start
.keyByWithValue(keyBy, _.product(aggregateBy, endSessionCondition))
.groupByWithValue(groupBy, _.product(aggregateBy, endSessionCondition))
.window(EventTimeSessionWindows.withGap(Time.milliseconds(sessionTimeout.toMillis)))
.trigger(trigger)
.aggregate(
Expand Down
Loading

0 comments on commit c2d9e6d

Please sign in to comment.