Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename keyBy to groupBy #1920

Merged
merged 3 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"nodeType" : "aggregate-sliding",
"parameters" : [
{
"name" : "keyBy",
"name" : "groupBy",
"expression" : {
"language" : "spel",
"expression" : "#input.clientId"
Expand Down
8 changes: 4 additions & 4 deletions docs/designingProcesses/FlinkCustomTransformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ subsequent events from any branch during `stateTimeout`. Produced object has val
![aggregate_window](../img/aggregate_window.png)

This element defines generic aggregation of values in sliding time window of given length. Parameters are:
- keyBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- groupBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- aggregator - type of aggregation (see below)
- aggregateBy - value which will be aggregated (e.g. `#input.callDuration`, `#input.productId`)
- windowLength - length of time window, window will cover range: (exclusive) now-windowLength, (inclusive) now
Expand Down Expand Up @@ -81,11 +81,11 @@ Example, for aggregate-tumbling node with length of 10 minutes, aggregation max,
![previous_value_window](../img/previous_value_window.png)

Simple transformation which stores arbitrary value for given key. This element has two parameters:
- keyBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- groupBy - expression defining key for which we compute aggregate, e.g. `#input.userId`
- value - stored value

So, for example, given stream of events which contain users with their current location, when we set
- keyBy is `#input.userId`
- groupBy is `#input.userId`
- value is `#input.location`
then the value of output variable is the previous location for current user. If this is the first appearance of this user,
**current** location will be returned
Expand All @@ -104,4 +104,4 @@ Events from MAIN branch will be enriched with output variable having aggregated

## Delay

Delay event processing for a given delay duration
Delay event processing for a given delay duration
2 changes: 1 addition & 1 deletion docs/operations_guide/Operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ Data serialization is one of the most costly operations in the Nussknacker/Flink



* Data is reshuffled between nodes during keyBy operations (e.g. before aggregations)
* Data is reshuffled between nodes during groupBy operations (e.g. before aggregations)
* Data is serialized to disk during state processing (e.g. aggregations)

Nussknacker needs to know the exact type of processed data during compilation of scenario. Avoid using sources and expressions which do not provide such information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ abstract class CustomStreamTransformer extends Component {
def canHaveManyInputs: Boolean = false

// For now it is only supported by Flink streaming runtime
def canBeEnding: Boolean = false
def canBeEnding: Boolean = false

}

/**
* Lazy parameter is representation of parameter of custom node which should be evaluated for each record:
* ```def execute(@ParamName("keyBy") keyBy: LazyParameter[String], @ParamName ("length") length: String)```
* In this case, length is computed as constant during process compilation, while keyBy is evaluated for each event
* ```def execute(@ParamName("groupBy") groupBy: LazyParameter[String], @ParamName ("length") length: String)```
* In this case, length is computed as constant during process compilation, while groupBy is evaluated for each event
* Cannot be evaluated directly (no method like 'evaluate'),
* as evaluation may need lifecycle handling, to use it see LazyParameterInterpreter
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.genericmodel.GenericMigrations
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pl.touk.nussknacker.genericmodel

import pl.touk.nussknacker.engine.migration.{ProcessMigration, ProcessMigrations}
import pl.touk.nussknacker.genericmodel.migrations.GroupByMigration

class GenericMigrations extends ProcessMigrations {

override def processMigrations: Map[Int, ProcessMigration] = ProcessMigrations.listOf(GroupByMigration).processMigrations
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.touk.nussknacker.genericmodel.migrations

import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.graph.node.{CustomNode, NodeData}
import pl.touk.nussknacker.engine.migration.NodeMigration

object GroupByMigration extends NodeMigration {

override val description = "GroupByMigration"

private val keyByParameterName = "keyBy"

override def failOnNewValidationError: Boolean = false

override def migrateNode(metadata: MetaData): PartialFunction[NodeData, NodeData] = {
case node@CustomNode(_, _, nodeType, parameters, _)
if parameters.exists(_.name == keyByParameterName) && nodeType.startsWith("aggregate-") =>
node.copy(parameters = node.parameters.map(p => if (p.name == keyByParameterName) p.copy(name = "groupBy") else p))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ object LastVariableFilterTransformer extends CustomStreamTransformer with Single

private val valueParameter = ParameterWithExtractor.lazyMandatory[AnyRef](valueParameterName)

private val keyByParameterName = "keyBy"
private val groupByParameterName = "groupBy"

private val keyByParameter = ParameterWithExtractor.lazyMandatory[String](keyByParameterName)
private val groupByParameter = ParameterWithExtractor.lazyMandatory[String](groupByParameterName)

private def conditionParameter(valueType: TypingResult) = Parameter(conditionParameterName, Typed[Boolean])
.copy(isLazyParameter = true, additionalVariables = Map("current" -> valueType, "previous" -> valueType))

type State = Nothing

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: ProcessCompilationError.NodeId): NodeTransformationDefinition = {
case TransformationStep(Nil, _) => NextParameters(keyByParameter.parameter :: valueParameter.parameter ::Nil)
case TransformationStep((`keyByParameterName`,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
case TransformationStep(Nil, _) => NextParameters(groupByParameter.parameter :: valueParameter.parameter ::Nil)
case TransformationStep((gropuByParameterName,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
//if we cannot determine value, we'll assume it's type is Unknown
case TransformationStep((`keyByParameterName`, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((`keyByParameterName`, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
}

override def initialParameters: List[Parameter] = List(keyByParameter.parameter, valueParameter.parameter, conditionParameter(Unknown))
override def initialParameters: List[Parameter] = List(groupByParameter.parameter, valueParameter.parameter, conditionParameter(Unknown))

override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency)

override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[State]): FlinkCustomStreamTransformation= {
val value = valueParameter.extractValue(params)
val condition = params(conditionParameterName).asInstanceOf[LazyParameter[java.lang.Boolean]]
val keyBy = keyByParameter.extractValue(params)
val groupBy = groupByParameter.extractValue(params)

FlinkCustomStreamTransformation((str: DataStream[Context], ctx: FlinkCustomNodeContext) => {
str
.map(new StringKeyedValueMapper(ctx.lazyParameterHelper, keyBy, value))
.map(new StringKeyedValueMapper(ctx.lazyParameterHelper, groupBy, value))
.keyBy(_.value.key)
.process(new ConditionalUpdateFunction(condition, ctx.lazyParameterHelper))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import org.apache.flink.streaming.api.scala._
case object StatefulTransformer extends CustomStreamTransformer with LazyLogging {

@MethodToInvoke
def execute(@ParamName("keyBy") keyBy: LazyParameter[String]): FlinkCustomStreamTransformation
def execute(@ParamName("groupBy") groupBy: LazyParameter[String]): FlinkCustomStreamTransformation
= FlinkCustomStreamTransformation((start: DataStream[Context], ctx: FlinkCustomNodeContext) => {
start
.map(ctx.lazyParameterHelper.lazyMapFunction(keyBy))
.map(ctx.lazyParameterHelper.lazyMapFunction(groupBy))
.keyBy(_.value)
.mapWithState[ValueWithContext[AnyRef], List[String]] { case (StringFromIr(ir, sr), oldState) =>
logger.info(s"received: $sr, current state: $oldState")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object StatefulSampleProcess {
.id(id)
.exceptionHandler()
.source("state", "oneSource")
.customNode("stateful", "stateVar", "stateful", "keyBy" -> "#input")
.customNode("stateful", "stateVar", "stateful", "groupBy" -> "#input")
.sink("end", "#stateVar": Expression, "kafka-string", "topic" -> s"'output-$id'")
}

Expand All @@ -35,7 +35,7 @@ object StatefulSampleProcess {
.exceptionHandler()
.source("state", "oneSource")
.customNode("transform", "aggregate", "aggregate",
"keyBy" -> "'test'",
"groupBy" -> "'test'",
"aggregator" -> s"#AGG.map({x: $aggegatorExpression})",
"aggregateBy" -> "{ x: 1 }",
"windowLength" -> "T(java.time.Duration).parse('PT1H')",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ case object PreviousValueTransformer extends CustomStreamTransformer with Explic
type Value = AnyRef

@MethodToInvoke(returnType = classOf[Value])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("gropuBy") groupBy: LazyParameter[CharSequence],
@ParamName("value") value: LazyParameter[Value])
= FlinkCustomStreamTransformation((start: DataStream[Context], ctx: FlinkCustomNodeContext) =>
setUidToNodeIdIfNeed(ctx,
start
.map(ctx.lazyParameterHelper.lazyMapFunction(keyBy))
.map(ctx.lazyParameterHelper.lazyMapFunction(groupBy))
.keyBy(vCtx => Option(vCtx.value).map(_.toString).orNull)
.map(new PreviousValueFunction(value, ctx.lazyParameterHelper))), value.returnType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.concurrent.duration._
object TransformStateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def invoke(@ParamName("key") key: LazyParameter[CharSequence],
def invoke(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("transformWhen") transformWhen: LazyParameter[java.lang.Boolean],
@AdditionalVariables(Array(new AdditionalVariable(name = "previous", clazz = classOf[AnyRef])))
@ParamName("newValue") newValue: LazyParameter[AnyRef],
Expand All @@ -44,7 +44,7 @@ object TransformStateTransformer extends CustomStreamTransformer with ExplicitUi
implicit val nctx: FlinkCustomNodeContext = nodeContext
setUidToNodeIdIfNeed(nodeContext,
stream
.keyBy(key)
.groupBy(groupBy)
.process(new TransformStateFunction[String](
nodeContext.lazyParameterHelper, transformWhen, newValue, stateTimeoutSeconds.seconds)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object sampleTransformers {
object SlidingAggregateTransformerV2 extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -48,7 +48,7 @@ object sampleTransformers {
@ParamName("emitWhenEventLeft") emitWhenEventLeft: Boolean,
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val windowDuration = Duration(length.toMillis, TimeUnit.MILLISECONDS)
transformers.slidingTransformer(keyBy, aggregateBy, aggregator, windowDuration, variableName, emitWhenEventLeft, explicitUidInStatefulOperators)
transformers.slidingTransformer(groupBy, aggregateBy, aggregator, windowDuration, variableName, emitWhenEventLeft, explicitUidInStatefulOperators)
}
}

Expand All @@ -60,7 +60,7 @@ object sampleTransformers {
object TumblingAggregateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -79,7 +79,7 @@ object sampleTransformers {
@ParamName("emitWhen") trigger: TumblingWindowTrigger,
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val windowDuration = Duration(length.toMillis, TimeUnit.MILLISECONDS)
transformers.tumblingTransformer(keyBy, aggregateBy, aggregator, windowDuration, variableName, trigger, explicitUidInStatefulOperators)
transformers.tumblingTransformer(groupBy, aggregateBy, aggregator, windowDuration, variableName, trigger, explicitUidInStatefulOperators)
}

}
Expand All @@ -92,7 +92,7 @@ object sampleTransformers {
object SessionWindowAggregateTransformer extends CustomStreamTransformer with ExplicitUidInOperatorsSupport {

@MethodToInvoke(returnType = classOf[AnyRef])
def execute(@ParamName("keyBy") keyBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("aggregator")
@DualEditor(simpleEditor = new SimpleEditor(
`type` = SimpleEditorType.FIXED_VALUES_EDITOR,
Expand All @@ -113,7 +113,7 @@ object sampleTransformers {
@OutputVariableName variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
val sessionTimeoutDuration = Duration(sessionTimeout.toMillis, TimeUnit.MILLISECONDS)
transformers.sessionWindowTransformer(
keyBy, aggregateBy, aggregator, sessionTimeoutDuration, endSessionCondition, trigger, variableName)
groupBy, aggregateBy, aggregator, sessionTimeoutDuration, endSessionCondition, trigger, variableName)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import scala.concurrent.duration.Duration
// in the future - see ExplicitUidInOperatorsCompat for more info
object transformers {

def slidingTransformer(keyBy: LazyParameter[CharSequence],
def slidingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
variableName: String)(implicit nodeId: NodeId): ContextTransformation =
slidingTransformer(keyBy, aggregateBy, aggregator, windowLength, variableName, emitWhenEventLeft = false,
slidingTransformer(groupBy, aggregateBy, aggregator, windowLength, variableName, emitWhenEventLeft = false,
ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)

def slidingTransformer(keyBy: LazyParameter[CharSequence],
def slidingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
Expand All @@ -52,22 +52,22 @@ object transformers {
else
new AggregatorFunction[SortedMap](aggregator, windowLength.toMillis, nodeId, aggregateBy.returnType, typeInfos.storedTypeInfo)
start
.keyByWithValue(keyBy, _ => aggregateBy)
.groupByByWithValue(groupBy, _ => aggregateBy)
.process(aggregatorFunction)
.setUidWithName(ctx, explicitUidInStatefulOperators)
}))
}

def tumblingTransformer(keyBy: LazyParameter[CharSequence],
def tumblingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
variableName: String)(implicit nodeId: NodeId): ContextTransformation = {
tumblingTransformer(keyBy, aggregateBy, aggregator, windowLength, variableName, TumblingWindowTrigger.OnEnd,
tumblingTransformer(groupBy, aggregateBy, aggregator, windowLength, variableName, TumblingWindowTrigger.OnEnd,
ExplicitUidInOperatorsSupport.defaultExplicitUidInStatefulOperators)
}

def tumblingTransformer(keyBy: LazyParameter[CharSequence],
def tumblingTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
windowLength: Duration,
Expand All @@ -83,7 +83,7 @@ object transformers {
val typeInfos = AggregatorTypeInformations(ctx, aggregator, aggregateBy)

val keyedStream = start
.keyByWithValue(keyBy, _ => aggregateBy)
.groupByByWithValue(groupBy, _ => aggregateBy)
(tumblingWindowTrigger match {
case TumblingWindowTrigger.OnEvent =>
keyedStream
Expand All @@ -106,7 +106,7 @@ object transformers {
}))

//Experimental component, API may change in the future
def sessionWindowTransformer(keyBy: LazyParameter[CharSequence],
def sessionWindowTransformer(groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
aggregator: Aggregator,
sessionTimeout: Duration,
Expand All @@ -128,7 +128,7 @@ object transformers {
case SessionWindowTrigger.OnEnd => baseTrigger
}
start
.keyByWithValue(keyBy, _.product(aggregateBy, endSessionCondition))
.groupByByWithValue(groupBy, _.product(aggregateBy, endSessionCondition))
.window(EventTimeSessionWindows.withGap(Time.milliseconds(sessionTimeout.toMillis)))
.trigger(trigger)
.aggregate(
Expand Down
Loading