Skip to content

Commit

Permalink
[SPARK-17853][STREAMING][KAFKA][DOC] make it clear that reusing group…
Browse files Browse the repository at this point in the history
….id is bad

## What changes were proposed in this pull request?

Documentation fix to make it clear that reusing group id for different streams is super duper bad, just like it is with the underlying Kafka consumer.

## How was this patch tested?

I built jekyll doc and made sure it looked ok.

Author: cody koeninger <cody@koeninger.org>

Closes #15442 from koeninger/SPARK-17853.
  • Loading branch information
koeninger authored and rxin committed Oct 12, 2016
1 parent b512f04 commit c264ef9
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions docs/streaming-kafka-0-10-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "example",
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
Expand All @@ -48,7 +48,7 @@ Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javad
</div>

For possible kafkaParams, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
Note that enable.auto.commit is disabled, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below.
Note that the example sets enable.auto.commit to false, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below.

### LocationStrategies
The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
Expand All @@ -57,6 +57,9 @@ In most cases, you should use `LocationStrategies.PreferConsistent` as shown abo

The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`

The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.


### ConsumerStrategies
The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup. `ConsumerStrategies` provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.

Expand Down

0 comments on commit c264ef9

Please sign in to comment.