Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable #3857

Closed
wants to merge 1 commit into from

Conversation

JoshRosen
Copy link
Contributor

Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage

Since ReceiverMessage does not extend Serializable and StopReceiver is a regular object, not a case object, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
Thread.sleep(10000)
ssc.stop(true, true)

Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down).

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24954 has started for PR 3857 at commit 71d0eae.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor Author

/cc @tdas.

This might fix both https://issues.apache.org/jira/browse/SPARK-4986 and https://issues.apache.org/jira/browse/SPARK-2892, although there could possibly be more pieces to solving those (e.g. replace 10 second timeout with a configurable timeout).

I want to give a huge thanks to @cleaton for filing SPARK-4986 and for coming up with a workaround patch for SPARK-4986 which helped to spot this issue.

@JoshRosen
Copy link
Contributor Author

EDIT: Disregard; local-cluster would have caught this.

Also, this was a really nasty bug because it seems very hard to test for this in Spark's own unit tests. Akka has a configuration option to force all messages to be serialized, even between local actors, but unfortunately this breaks Spark core because we send some non-serializable SparkContext references when initializing the DAGScheduler actor.

Can we force serialization by spinning up separate actor systems for the master / worker / executor processes when running in local-cluster mode? Or is there some other way that we can selectively force serialization in order to uncover these sorts of issues?

We can definitely reproduce these sorts of issues in my spark-integration-tests system, since that uses multiple JVMs, but for completeness's sake I guess we'd need that tool's suites to send all of the remote messages (so this could be a lot of test code duplication).

Maybe the simplest (general) preventative test would have been something that just tries to call the Java serializer on an instance of each message class, so we just test serializability independent of Akka. Checking for serializability (either manually or through handwritten tests) should be part of our review checklist when adding new Akka messages.

@JoshRosen
Copy link
Contributor Author

Also, just to sanity check and make sure I haven't overlooked something, at least one other person besides me should run the spark-shell reproduction listed in the PR description.

@SparkQA
Copy link

SparkQA commented Dec 31, 2014

Test build #24954 has finished for PR 3857 at commit 71d0eae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24954/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

Goood catch! Merging this right away!

@JoshRosen
Copy link
Contributor Author

Actually, my commit message claimed that the graceful shutdown worked in local-cluster mode, but that's probably not true since the executors would be in separate JVMs there.

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

Can you try to check whether https://issues.apache.org/jira/browse/SPARK-2892 is solved with this? Use a local cluster (not "local-cluster").

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

And if this is to be merged, this should be backported all the way to branch-1.0

@JoshRosen
Copy link
Contributor Author

Yep, turns out that prior to this patch the test fails in local-cluster mode:

[info] - stop gracefully *** FAILED *** (18 seconds, 445 milliseconds)
[info]   1 did not equal 6680633, and 1 did not equal 6680634 Received records = 1, processed records = 6680632 (StreamingContextSuite.scala:198)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)

I tested out the NetworkWordCount example described in https://issues.apache.org/jira/browse/SPARK-2892 and was able to both reproduce the issue reported there and verify that it's been fixed by this patch.

@tdas
Copy link
Contributor

tdas commented Dec 31, 2014

Fantastic. Then I am merging this. Thanks @JoshRosen and @cleaton for finding and fixing this.

asfgit pushed a commit that referenced this pull request Jan 1, 2015
…able

Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
Thread.sleep(10000)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.

(cherry picked from commit fe6efac)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in fe6efac Jan 1, 2015
asfgit pushed a commit that referenced this pull request Jan 1, 2015
…able

Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
Thread.sleep(10000)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.

(cherry picked from commit fe6efac)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
asfgit pushed a commit that referenced this pull request Jan 1, 2015
…able

Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
Thread.sleep(10000)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.

(cherry picked from commit fe6efac)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@cleaton
Copy link

cleaton commented Jan 1, 2015

@tdas I will respond here since JIRA is down. Yes this partly fixes the issue with graceful shutdown mentioned in SPARK-4986. There are still cases when the receivers do not have time to shutdown (within the hardcoded 10 second thread join wait). I have created a PR that solves SPARK-4986 when used together with this fix.

Could you take a look at #3868 to see if the approach is OK or if there is something you want changed (might have to add more config options). Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants