From 1f68dc73f69fa577619d9fdf99482c108e69dbe1 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 22 Jan 2024 09:50:03 +0900 Subject: [PATCH] [SPARK-46775][DOCS] Fix formatting of Kinesis docs ### What changes were proposed in this pull request? - Convert the mixed indentation styles (tabs and spaces) to spaces only. - Add syntax highlighting to the code blocks. - Fix a couple of broken links to API docs. ### Why are the changes needed? This makes the docs a bit easier to read and edit. ### Does this PR introduce _any_ user-facing change? Yes, it changes the formatting of this documentation. ### How was this patch tested? I built the docs and manually reviewed them. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44802 from nchammas/kinesis-docs. Authored-by: Nicholas Chammas Signed-off-by: Hyukjin Kwon --- docs/streaming-kinesis-integration.md | 399 ++++++++++++++------------ 1 file changed, 208 insertions(+), 191 deletions(-) diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index ed19ddcc9b087..0396d3cc64d14 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -32,201 +32,216 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m 1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). - groupId = org.apache.spark - artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} + groupId = org.apache.spark + artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} + version = {{site.SPARK_VERSION_SHORT}} - For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. - **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** + For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. + **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** 2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows: -
+
- from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream - - kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2) - - See the [API docs](api/python/reference/pyspark.streaming.html#kinesis) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. - - - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details. Default is MetricsLevel.DETAILED - -
- -
- import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming.kinesis.KinesisInputDStream - import org.apache.spark.streaming.{Seconds, StreamingContext} - import org.apache.spark.streaming.kinesis.KinesisInitialPositions - - val kinesisStream = KinesisInputDStream.builder - .streamingContext(streamingContext) - .endpointUrl([endpoint URL]) - .regionName([region name]) - .streamName([streamName]) - .initialPosition([initial position]) - .checkpointAppName([Kinesis app name]) - .checkpointInterval([checkpoint interval]) - .metricsLevel([metricsLevel.DETAILED]) - .storageLevel(StorageLevel.MEMORY_AND_DISK_2) - .build() - - See the [API docs](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.html) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example. - -
- -
- import org.apache.spark.storage.StorageLevel; - import org.apache.spark.streaming.kinesis.KinesisInputDStream; - import org.apache.spark.streaming.Seconds; - import org.apache.spark.streaming.StreamingContext; - import org.apache.spark.streaming.kinesis.KinesisInitialPositions; - - KinesisInputDStream kinesisStream = KinesisInputDStream.builder() - .streamingContext(streamingContext) - .endpointUrl([endpoint URL]) - .regionName([region name]) - .streamName([streamName]) - .initialPosition([initial position]) - .checkpointAppName([Kinesis app name]) - .checkpointInterval([checkpoint interval]) - .metricsLevel([metricsLevel.DETAILED]) - .storageLevel(StorageLevel.MEMORY_AND_DISK_2) - .build(); - - See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisInputDStream.html) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. - -
- -
- - You may also provide the following settings. This is currently only supported in Scala and Java. - - - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. - -
-
- import collection.JavaConverters._ - import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming.kinesis.KinesisInputDStream - import org.apache.spark.streaming.{Seconds, StreamingContext} - import org.apache.spark.streaming.kinesis.KinesisInitialPositions - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration - import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel - - val kinesisStream = KinesisInputDStream.builder - .streamingContext(streamingContext) - .endpointUrl([endpoint URL]) - .regionName([region name]) - .streamName([streamName]) - .initialPosition([initial position]) - .checkpointAppName([Kinesis app name]) - .checkpointInterval([checkpoint interval]) - .storageLevel(StorageLevel.MEMORY_AND_DISK_2) - .metricsLevel(MetricsLevel.DETAILED) - .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) - .buildWithMessageHandler([message handler]) - -
-
- import org.apache.spark.storage.StorageLevel; - import org.apache.spark.streaming.kinesis.KinesisInputDStream; - import org.apache.spark.streaming.Seconds; - import org.apache.spark.streaming.StreamingContext; - import org.apache.spark.streaming.kinesis.KinesisInitialPositions; - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; - import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; - import scala.collection.JavaConverters; - - KinesisInputDStream kinesisStream = KinesisInputDStream.builder() - .streamingContext(streamingContext) - .endpointUrl([endpoint URL]) - .regionName([region name]) - .streamName([streamName]) - .initialPosition([initial position]) - .checkpointAppName([Kinesis app name]) - .checkpointInterval([checkpoint interval]) - .storageLevel(StorageLevel.MEMORY_AND_DISK_2) - .metricsLevel(MetricsLevel.DETAILED) - .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) - .buildWithMessageHandler([message handler]); - -
-
- - - `streamingContext`: StreamingContext containing an application name used by Kinesis to tie this Kinesis application to the Kinesis stream - - - `[Kinesis app name]`: The application name that will be used to checkpoint the Kinesis - sequence numbers in DynamoDB table. - - The application name must be unique for a given account and region. - - If the table exists but has incorrect checkpoint information (for a different stream, or - old expired sequenced numbers), then there may be temporary errors. - - - `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from. - - - `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). - - - `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html). - - - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application. - - - `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details). - - - `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`. - - In other versions of the API, you can also specify the AWS access key and secret key directly. + ```python + from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream + + kinesisStream = KinesisUtils.createStream( + streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], + [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], + StorageLevel.MEMORY_AND_DISK_2) + ``` + + See the [API docs](api/python/reference/pyspark.streaming.html#kinesis) + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. + + - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details. Default is `MetricsLevel.DETAILED`. + +
+ +
+ ```scala + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.{Seconds, StreamingContext} + import org.apache.spark.streaming.kinesis.KinesisInitialPositions + + val kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPosition([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .metricsLevel([metricsLevel.DETAILED]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .build() + ``` + + See the [API docs](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream$.html) + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example. + +
+ +
+ ```java + import org.apache.spark.storage.StorageLevel; + import org.apache.spark.streaming.kinesis.KinesisInputDStream; + import org.apache.spark.streaming.Seconds; + import org.apache.spark.streaming.StreamingContext; + import org.apache.spark.streaming.kinesis.KinesisInitialPositions; + + KinesisInputDStream kinesisStream = KinesisInputDStream.builder() + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPosition([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .metricsLevel([metricsLevel.DETAILED]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .build(); + ``` + + See the [API docs](api/java/org/apache/spark/streaming/kinesis/package-summary.html) + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. + +
+ + + + You may also provide the following settings. This is currently only supported in Scala and Java. + + - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. + +
+
+ ```scala + import collection.JavaConverters._ + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.{Seconds, StreamingContext} + import org.apache.spark.streaming.kinesis.KinesisInitialPositions + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration + import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel + + val kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPosition([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .metricsLevel(MetricsLevel.DETAILED) + .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) + .buildWithMessageHandler([message handler]) + ``` + +
+
+ ```java + import org.apache.spark.storage.StorageLevel; + import org.apache.spark.streaming.kinesis.KinesisInputDStream; + import org.apache.spark.streaming.Seconds; + import org.apache.spark.streaming.StreamingContext; + import org.apache.spark.streaming.kinesis.KinesisInitialPositions; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; + import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; + import scala.collection.JavaConverters; + + KinesisInputDStream kinesisStream = KinesisInputDStream.builder() + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPosition([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .metricsLevel(MetricsLevel.DETAILED) + .metricsEnabledDimensions( + JavaConverters.asScalaSetConverter( + KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS + ) + .asScala().toSet() + ) + .buildWithMessageHandler([message handler]); + ``` + +
+
+ + - `streamingContext`: StreamingContext containing an application name used by Kinesis to tie this Kinesis application to the Kinesis stream + + - `[Kinesis app name]`: The application name that will be used to checkpoint the Kinesis + sequence numbers in DynamoDB table. + - The application name must be unique for a given account and region. + - If the table exists but has incorrect checkpoint information (for a different stream, or + old expired sequenced numbers), then there may be temporary errors. + + - `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from. + + - `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). + + - `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html). + + - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application. + + - `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details). + + - `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`. + + In other versions of the API, you can also specify the AWS access key and secret key directly. 3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. - For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). - For Python applications which lack SBT/Maven project management, `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, + For Python applications which lack SBT/Maven project management, `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, - ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... - Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kinesis-asl-assembly` from the - [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. + Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kinesis-asl-assembly` from the + [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. -

- Spark Streaming Kinesis Architecture + Spark Streaming Kinesis Architecture - -

+

- *Points to remember at runtime:* + *Points to remember at runtime:* - - Kinesis data processing is ordered per partition and occurs at-least once per message. + - Kinesis data processing is ordered per partition and occurs at-least once per message. - - Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB. + - Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB. - - A single Kinesis stream shard is processed by one input DStream at a time. + - A single Kinesis stream shard is processed by one input DStream at a time. - - A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads. + - A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads. - - Multiple input DStreams running in separate processes/instances can read from a Kinesis stream. + - Multiple input DStreams running in separate processes/instances can read from a Kinesis stream. - - You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard. + - You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard. - - Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point. + - Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point. - - The Kinesis input DStream will balance the load between all DStreams - even across processes/instances. + - The Kinesis input DStream will balance the load between all DStreams - even across processes/instances. - - The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load. + - The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load. - - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible. + - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible. - - Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details. + - Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details. - - There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes. + - There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes. #### Running the Example To run the example, @@ -239,37 +254,39 @@ To run the example, - In the Spark root directory, run the example as -
+
- - ./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ - connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ - [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name] - -
- -
- - ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] - -
- -
- - ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] - -
- -
+ ```sh + ./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ + connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ + [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name] + ``` +
+ +
+ ```sh + ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] + ``` +
+ +
+ ```sh + ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL] + ``` +
+ + This will wait for data to be received from the Kinesis stream. - To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer. - ./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10 + ```sh + ./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10 + ``` - This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example. + This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example. #### Record De-aggregation