Skip to content

Commit

Permalink
[NU-1481] Table aggregation component (#5757)
Browse files Browse the repository at this point in the history
added table api aggregation component and refactored flink source api
  • Loading branch information
mslabek authored Apr 18, 2024
1 parent a626588 commit 5b90c1a
Show file tree
Hide file tree
Showing 49 changed files with 1,151 additions and 418 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,10 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests"))
.settings(commonSettings)
.settings(
name := "nussknacker-flink-base-components-tests",
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-connector-files" % flinkV % Test,
"org.apache.flink" % "flink-csv" % flinkV % Test
)
)
.dependsOn(
flinkComponentsTestkit % Test,
Expand Down Expand Up @@ -1792,7 +1796,6 @@ lazy val experimentalFlinkTableApiComponents = (project in flink("components/dev
"org.apache.flink" % "flink-table-runtime" % flinkV,
"org.apache.flink" % "flink-connector-kafka" % flinkConnectorKafkaV,
"org.apache.flink" % "flink-json" % flinkV,
"org.scalatest" %% "scalatest" % scalaTestV % Test
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ scenarioTypes {
]
components.flinkTable {
tableDefinitionFilePath: "designer/server/src/test/resources/config/business-cases/tables-definition.sql"
enableFlinkBatchExecutionMode: true
}
}
category: "Category1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
CREATE TABLE transactions
(
datatime TIMESTAMP,
datetime TIMESTAMP,
client_id STRING,
amount DECIMAL(15, 2),
amount INT,
`date` STRING
) PARTITIONED BY (`date`) WITH (
'connector' = 'filesystem',
'path' = 'file:///transactions',
'format' = 'csv'
);
);

CREATE TABLE transactions_summary
(
datatime TIMESTAMP,
client_id STRING,
amount DECIMAL(15, 2),
amount INT,
`date` STRING
) PARTITIONED BY (`date`) WITH (
'connector' = 'filesystem',
'path' = 'file:///output/transactions_summary',
'format' = 'csv'
)
);
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"2024-01-01 10:00:00",client1,1.12
"2024-01-01 10:01:00",client2,2.21
"2024-01-01 10:00:00",client1,1
"2024-01-01 10:01:00",client2,2
"2024-01-01 10:02:00",client1,3
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"2024-01-02 10:00:00",client1,1.12
"2024-01-02 10:01:00",client2,2.21
"2024-01-02 10:00:00",client1,1
"2024-01-02 10:01:00",client2,2
"2024-01-02 10:02:00",client1,3
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,51 @@ import org.scalatest.Suite
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.testcontainers.containers.BindMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.test.docker.FileSystemBind
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.test.config.WithFlinkContainersDeploymentManager

import scala.jdk.CollectionConverters._
import java.nio.file.Files

trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploymentManager {
self: Suite with LazyLogging with Matchers with Eventually =>

protected val scenarioName = "batch-test"

protected val sourceNodeId = "fooSourceNodeId"

protected val scenario = ScenarioBuilder
.streaming(scenarioName)
.source(sourceNodeId, "table", "Table" -> Expression.spel("'transactions'"))
.customNode(
id = "aggregate",
outputVar = "agg",
customNodeRef = "aggregate",
"groupBy" -> Expression.spel("#input.client_id + ',' + #input.date"),
"aggregateBy" -> Expression.spel("#input.amount"),
"aggregator" -> Expression.spel("'Sum'"),
)
// TODO: get rid of concatenating the key and pass the timedate to output table
.buildSimpleVariable(
"var",
"keyValues",
Expression.spel("#key.split(',')")
)
.emptySink(
id = "sink",
typ = "table",
"Table" -> Expression.spel("'transactions_summary'"),
"Value" -> Expression.spel(
"{client_id: #keyValues[0], date: #keyValues[1], amount: #agg}"
)
)

private lazy val outputDirectory =
Files.createTempDirectory(s"nusssknacker-${getClass.getSimpleName}-transactions_summary-")

// TODO: use DECIMAL(15, 2) type instead of INT after adding support for all primitive types
private lazy val tablesDefinitionBind = FileSystemBind(
"designer/server/src/test/resources/config/business-cases/tables-definition.sql",
"/opt/flink/designer/server/src/test/resources/config/business-cases/tables-definition.sql",
Expand All @@ -40,7 +74,9 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy
List(
tablesDefinitionBind,
// input must be also available on the JM side to allow their to split work into multiple subtasks
inputTransactionsBind
inputTransactionsBind,
// output directory has to be available on JM to allow writing the final output file and deleting the temp files
outputTransactionsSummaryBind
)

override protected def taskManagerExtraFSBinds: List[FileSystemBind] = {
Expand All @@ -62,7 +98,7 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy
// finished deploy doesn't mean that processing is finished
// TODO (next PRs): we need to wait for the job completed status instead
val transactionSummaryDirectories = eventually {
val directories = Option(outputDirectory.toFile.listFiles()).toList.flatten
val directories = Option(outputDirectory.toFile.listFiles().filter(!_.isHidden)).toList.flatten
directories should have size 1
directories
}
Expand All @@ -71,18 +107,15 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy
matchingPartitionDirectory.getName shouldEqual "date=2024-01-01"

eventually {
val partitionFiles = Option(matchingPartitionDirectory.listFiles()).toList.flatten
val partitionFiles = Option(matchingPartitionDirectory.listFiles().filter(!_.isHidden)).toList.flatten
partitionFiles should have size 1
val firstFile = partitionFiles.head

val content =
FileUtils.readFileToString(firstFile, StandardCharset.UTF_8)
val content = FileUtils.readLines(firstFile, StandardCharset.UTF_8).asScala.toSet

// TODO (next PRs): aggregate by clientId
content should include(
""""2024-01-01 10:00:00",client1,1.12
|"2024-01-01 10:01:00",client2,2.21
|"2024-01-01 10:02:00",client1,3""".stripMargin
content shouldBe Set(
"client1,4",
"client2,2"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import io.restassured.RestAssured.`given`
import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse
import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.test.base.it.{NuItTest, WithBatchConfigScenarioHelper}
import pl.touk.nussknacker.test.config.{WithBatchDesignerConfig, WithBusinessCaseRestAssuredUsersExtensions}
import pl.touk.nussknacker.test.{NuRestAssureMatchers, RestAssuredVerboseLogging, VeryPatientScalaFutures}
Expand All @@ -24,20 +22,6 @@ class DeploymentApiHttpServiceBusinessSpec
with VeryPatientScalaFutures
with Matchers {

private val scenarioName = "batch-test"

private val sourceNodeId = "fooSourceNodeId"

private val scenario = ScenarioBuilder
.streaming(scenarioName)
.source(sourceNodeId, "table", "Table" -> Expression.spel("'transactions'"))
.emptySink(
"sink",
"table",
"Table" -> Expression.spel("'transactions_summary'"),
"Value" -> Expression.spel("#input")
)

private val correctDeploymentRequest = s"""{
| "nodesDeploymentData": {
| "$sourceNodeId": "`date` = '2024-01-01'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,6 @@ class DeploymentApiHttpServiceDeploymentCommentSpec
with VeryPatientScalaFutures
with Matchers {

private val scenarioName = "batch-test"

private val sourceNodeId = "fooSourceNodeId"

private val scenario = ScenarioBuilder
.streaming(scenarioName)
.source(sourceNodeId, "table", "Table" -> Expression.spel("'transactions'"))
.emptySink(
"sink",
"table",
"Table" -> Expression.spel("'transactions_summary'"),
"Value" -> Expression.spel("#input")
)

private val configuredPhrase = "foo"

override def designerConfig: Config = {
Expand Down
12 changes: 12 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#5783](https://github.com/TouK/nussknacker/pull/5783) Return type of `allowedProcessingMode` method in `Component` trait has been changed to `AllowedProcessingModes` type which is one of:
* `AllowedProcessingModes.All` in case of all processing modes allowed
* `AllowedProcessingModes.SetOf(nonEmptySetOfAllowedProcessingModes)` in case only set of processing modes is allowed
* [#5757](https://github.com/TouK/nussknacker/pull/5757) Refactored API around `FlinkSource`
* Added `StandardFlinkSource` with more granular additional traits replacing the need for `FlinkIntermediateRawSource`
* Removed `BasicFlinkSource` and `FlinkIntermediateRawSource`. Sources extending these traits should now extend
`StandardFlinkSource`. For reference on how to migrate, see changes in `FlinkKafkaSource` or `CollectionSource`
* Renamed `FlinkSource`'s `sourceStream` method to `contextStream`
* Removed `EmptySourceFunction`
* [#5757](https://github.com/TouK/nussknacker/pull/5757) Added support for bounded sources and Flink runtime mode in
Flink tests
* `CollectionSource` now takes Flink's `Boundedness` with default `Unbounded` and `RuntimeExecutionMode` with default
`None` as a parameters. It's encouraged to set the `Boundedness` to bounded if applicable
* `Boundedness` and `RuntimeExecutionMode` is also possible to set in `FlinkTestScenarioRunner` in new overloading
`runWithData` method

### Configuration changes

Expand Down
53 changes: 33 additions & 20 deletions docs/developers_guide/FlinkComponents.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,45 @@

[Sources, sinks and custom transformations](./Basics.md#components-and-componentproviders) are based on Flink API.
In order to implement any of those you need to provide:
- a Flink function or Flink `DataStream` transformation
- a Flink `DataStreamSource` in case of sources and a `DataStream` into `DataStreamSink` transformation in case of sinks
- a Nussknacker [specification](./Components.md#specification)

## Sources

Implementing a fully functional source ([BasicFlinkSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala))
is more complicated than a sink since the following things has to be provided:
- a Flink [SourceFunction](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html)
- Flink [type information](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#flinks-typeinformation-class)
for serializing/deserializing emitted data (e.g. `#input`)
- a [timestamp watermark handler](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/timestampwatermark/TimestampWatermarkHandler.scala)
### Standard implementation
The recommended way to implement a source is through [StandardFlinkSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala)
interface. Your source only has to implement a `sourceStream` method that provides [DataStreamSource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html)
based on [StreamExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html).

This approach provides a standard transformation of the input value into a Nussknacker `Context` and allows the implementation to customize these steps:
- [Timestamp watermark handler](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/timestampwatermark/TimestampWatermarkHandler.scala)
so that events are correctly processed downstream, for example to avoid (or force!) dropping late events by aggregates. Read more about
[notion of time](../scenarios_authoring/DataSourcesAndSinks.md#notion-of-time--flink-engine-only)
and [watermarks](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/)
- (optionally) generating test data support (trait `FlinkSourceTestSupport`) to ease [scenarios authoring](../scenarios_authoring/TestingAndDebugging.md)
- (optionally) custom [context initializer](https://github.com/TouK/nussknacker/blob/staging/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ContextInitializer.scala)
- [Context initializer](https://github.com/TouK/nussknacker/blob/staging/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/ContextInitializer.scala)
to emit more variables than `#input`. For example built-in Kafka sources emit `#inputMeta` variable with Kafka record metadata like: partition, topic, offset, etc.
The other example could be a file source that emits current line number as a new variable along with the content (as `#input` variable)

Nussknacker also provides a more generic [FlinkSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala)
for implementing sources. The difference is instead of implementing a Flink `SourceFunction`, arbitrary `DataStream[Context]`
can be returned, however you have to remember to assign timestamps, watermarks and initialize the context.
### Generic implementation
Nussknacker also provides a more generic interface for implementing sources - [FlinkSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala).
Instead of providing a Flink `DataStreamSource`, you can provide an arbitrary `DataStream[Context]` directly. However, you
have to remember to assign timestamps, watermarks, and initialize the context.

When using Flink engine, all sources returned by [SourceFactory](https://github.com/TouK/nussknacker/blob/staging/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala)
have to implement `FlinkSource` (or its subtrait `BasicFlinkSource`).
### Test support
To enable testing functionality in scenarios using your source implementation, your source needs to implement certain test-specific interfaces:
- Basic test support - `FlinkSourceTestSupport` - besides the more general `SourceTestSupport`, the implementation:
- has to provide a Flink `TypeInformation` for serializing/deserializing data emitted from source (e.g. `#input`)
- optionally can provide a `TimestampWatermarkHandler` that will be used only for tests
- Test data generation - `TestDataGenerator`
- Ad hoc test support - `TestWithParametersSupport`

### Examples
Read more about testing functionality in [this section](../scenarios_authoring/TestingAndDebugging.md).

### Specification
Your Nussknacker source component specification should be a [SourceFactory](https://github.com/TouK/nussknacker/blob/staging/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Source.scala)
returning your source implementation.

### Examples
- [Periodic source](../scenarios_authoring/DataSourcesAndSinks.md#periodic) and its [implementation](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/PeriodicSourceFactory.scala)
- [FlinkKafkaSource](https://github.com/TouK/nussknacker/blob/staging/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala)
and its factory returning the source implementation along with the fixed specification (e.g. based on a Scala case class) [KafkaSourceFactory](https://github.com/TouK/nussknacker/blob/staging/utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala)
Expand All @@ -43,6 +54,7 @@ All of them can be used to implement a Nussknacker source.

## Sinks

### Implementation
Sinks are easier to implement than sources. Nussknacker provides a [factory](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/SingleValueSinkFactory.scala)
for sinks that take only one parameter. The only thing that has to be provided is a Flink [SinkFunction](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.html).

Expand All @@ -52,17 +64,18 @@ The following things are required:
- `registerSink` - a method that turns `DataStream[ValueWithContext[Value]]` into `DataStreamSink`. It's the place where
a Flink `SinkFunction` should be registered

### Specification
Similarly to sources, all sinks returned by [SinkFactory](https://github.com/TouK/nussknacker/blob/staging/components-api/src/main/scala/pl/touk/nussknacker/engine/api/process/Sink.scala)
have to implement `FlinkSink` (or its subtrait `BasicFlinkSink`).

Again, Flink provides [basic](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/overview/#data-sinks) sinks
and [connectors](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/overview) which can be used while implementing
own Nussknacker sinks.

Examples:
### Examples
- [FlinkKafkaUniversalSink](https://github.com/TouK/nussknacker/blob/staging/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala)
and its factory [UniversalKafkaSinkFactory](https://github.com/TouK/nussknacker/blob/staging/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala)

Flink provides [basic](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/overview/#data-sinks) sinks
and [connectors](https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/overview) which can be used while implementing
own Nussknacker sinks.

## Custom stream transformations

Custom transformation can arbitrarily change `DataStream[Context]`, it is implemented with [FlinkCustomStreamTransformation](https://github.com/TouK/nussknacker/blob/staging/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkCustomStreamTransformation.scala).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pl.touk.nussknacker.engine.flink.api.process

import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.process.{ContextInitializer, ContextInitializingFunction}
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext

class FlinkContextInitializingFunction[Raw](
contextInitializer: ContextInitializer[Raw],
nodeId: String,
convertToEngineRuntimeContext: RuntimeContext => EngineRuntimeContext
) extends RichMapFunction[Raw, Context] {

private var initializingStrategy: ContextInitializingFunction[Raw] = _

override def open(parameters: Configuration): Unit = {
val contextIdGenerator = convertToEngineRuntimeContext(getRuntimeContext).contextIdGenerator(nodeId)
initializingStrategy = contextInitializer.initContext(contextIdGenerator)
}

override def map(input: Raw): Context = {
initializingStrategy(input)
}

}
Loading

0 comments on commit 5b90c1a

Please sign in to comment.