Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20244][Core] Handle incorrect bytesRead metrics when using PySpark #17617

Closed
wants to merge 5 commits into from

Conversation

jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

Hadoop FileSystem's statistics in based on thread local variables, this is ok if the RDD computation chain is running in the same thread. But if child RDD creates another thread to consume the iterator got from Hadoop RDDs, the bytesRead computation will be error, because now the iterator's next() and close() may run in different threads. This could be happened when using PySpark with PythonRDD.

So here building a map to track the bytesRead for different thread and add them together. This method will be used in three RDDs, HadoopRDD, NewHadoopRDD and FileScanRDD. I assume FileScanRDD cannot be called directly, so I only fixed HadoopRDD and NewHadoopRDD.

How was this patch tested?

Unit test and local cluster verification.

@jerryshao jerryshao changed the title [SPARK-20244][Core] Handle get bytesRead from different thread in Hadoop RDD [SPARK-20244][Core] Handle incorrect bytesRead metrics when using PySpark Apr 12, 2017
@SparkQA
Copy link

SparkQA commented Apr 12, 2017

Test build #75729 has started for PR 17617 at commit d6f3c42.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 12, 2017

Test build #75738 has finished for PR 17617 at commit d6f3c42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented May 2, 2017

Interesting, more accurate reporting is good but I haven't looked at this block of code in awhile maybe @srowen has the context necessary to take a look?

@jerryshao
Copy link
Contributor Author

@holdenk , the basic problem is that Spark uses Hadoop FileSystem's statistics API to get bytesRead, bytesWrite per task. This statistics API is implemented by thread local variables, it is OK for scala / java RDD computations, since this computation is executed in the same thread as the task thread. But for PythonRDD, Spark will create another thread to consume data. So using current way to count bytesRead will get a wrong number.

This is a generic problem when task thread and RDD computation thread are not the same thread, due to thread local variables problem, the calculated bytesRead metric will be wrong.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change sounds valid, but I tried the test cases in current master branch and that aren't failing.

val f = () => threadStats.map(_.getBytesRead).sum
val baselineBytesRead = f()
() => f() - baselineBytesRead
val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the previous code, threadStats and f function can be executed in two threads, so the metrics we got can be wrong.

context.addTaskCompletionListener { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this duplicate with what we do in close()?

Copy link
Contributor Author

@jerryshao jerryshao May 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close can be called in another thread as I remembered, so I added here to avoid lingering bytesRead in task running thread (Some bytes can be read when creating InputFormat), also it is no harm to call this updateBytesRead again.

Change-Id: I76c6ff84904211e3fae4dcd11772fb7fa5ec503c
@jerryshao
Copy link
Contributor Author

@jiangxb1987 the UT I wrote cannot actually reflect this issue, I just update the UT, please review, thanks!

@SparkQA
Copy link

SparkQA commented May 27, 2017

Test build #77453 has finished for PR 17617 at commit a23633d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some nit in test cases.

val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
val thread = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use ThreadUtils.runInNewThread() to make this shorter, like:

        ThreadUtils.runInNewThread("TestThread") {
          iter.flatMap(_.split(" ")).foreach(buf.append(_))
        }

sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
val thread = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same as above, we could rewrite to:

        ThreadUtils.runInNewThread("TestThread") {
          iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_))
        }

@jiangxb1987
Copy link
Contributor

ping @jerryshao

Change-Id: Ie8cc1f19719956184afea2ba04a59f9221469da7
@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77567 has finished for PR 17617 at commit 8b16017.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

LGTM, cc @cloud-fan @ueshin

@@ -143,14 +144,18 @@ class SparkHadoopUtil extends Logging {
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
*
* @return None if the required method can't be found.
*/
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the document to say that, the returned function may be called in multiple threads.


() => {
bytesReadMap.put(Thread.currentThread().getId, f())
bytesReadMap.asScala.map { case (k, v) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not atomic, shall we synchronize on bytesReadMap when calculating the sum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let me fix it.

val baseline = (Thread.currentThread().getId, f())
val bytesReadMap = new ConcurrentHashMap[Long, Long]()

() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to create an anonymous Function0 instance and treat bytesReadMap as a member variable and document the multi-thread semantic for the apply method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, let me change the code.

Change-Id: I5eba16903914932392e05ba56c27808c36b033b3
Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for a minor comment.

@@ -21,8 +21,10 @@ import java.io.IOException
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
import java.util.concurrent.ConcurrentHashMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded import.

@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77581 has finished for PR 17617 at commit 1e3fb8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: Id3b501645fca858ec4636cee30163ea39fe7ce4f
@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77591 has finished for PR 17617 at commit 2b08b48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val bytesReadMap = new mutable.HashMap[Long, Long]()

/**
* Returns a function that can be called to calculate Hadoop FileSystem bytes read.
Copy link
Contributor

@cloud-fan cloud-fan May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move these comments before new Function0[Long] or before def getFSBytesReadOnThreadCallback. The apply here doesn't return a function...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

buf.iterator
}.count()
}
assert(bytesRead != 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assert is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Change-Id: I6e7870698108a52d577a59478a0f88bc645d1133
@SparkQA
Copy link

SparkQA commented Jun 1, 2017

Test build #77621 has finished for PR 17617 at commit c068f43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jun 1, 2017
…park

## What changes were proposed in this pull request?

Hadoop FileSystem's statistics in based on thread local variables, this is ok if the RDD computation chain is running in the same thread. But if child RDD creates another thread to consume the iterator got from Hadoop RDDs, the bytesRead computation will be error, because now the iterator's `next()` and `close()` may run in different threads. This could be happened when using PySpark with PythonRDD.

So here building a map to track the `bytesRead` for different thread and add them together. This method will be used in three RDDs, `HadoopRDD`, `NewHadoopRDD` and `FileScanRDD`. I assume `FileScanRDD` cannot be called directly, so I only fixed `HadoopRDD` and `NewHadoopRDD`.

## How was this patch tested?

Unit test and local cluster verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #17617 from jerryshao/SPARK-20244.

(cherry picked from commit 5854f77)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.2!

@asfgit asfgit closed this in 5854f77 Jun 1, 2017
@jerryshao
Copy link
Contributor Author

Thanks @jiangxb1987 @cloud-fan @ueshin for your review!

@@ -143,14 +144,29 @@ class SparkHadoopUtil extends Logging {
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
*
* @return None if the required method can't be found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this line instead of the doc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't return a None, but the doc is still corrected about the behavior.

yoonlee95 pushed a commit to yoonlee95/spark that referenced this pull request Aug 17, 2017
…park

Hadoop FileSystem's statistics in based on thread local variables, this is ok if the RDD computation chain is running in the same thread. But if child RDD creates another thread to consume the iterator got from Hadoop RDDs, the bytesRead computation will be error, because now the iterator's `next()` and `close()` may run in different threads. This could be happened when using PySpark with PythonRDD.

So here building a map to track the `bytesRead` for different thread and add them together. This method will be used in three RDDs, `HadoopRDD`, `NewHadoopRDD` and `FileScanRDD`. I assume `FileScanRDD` cannot be called directly, so I only fixed `HadoopRDD` and `NewHadoopRDD`.

Unit test and local cluster verification.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#17617 from jerryshao/SPARK-20244.

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants