-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds #3120
Conversation
Can one of the admins verify this patch? |
import org.scalatest.FunSuite | ||
|
||
import org.apache.spark.util.Utils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imports should be ordered alphabetically, so this should go after the org.apache.spark.scheduler ones
Had a few nitpicks. Otherwise, this looks good to me. |
Jenkins, test this please |
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) | ||
context.taskMetrics.inputMetrics.get.bytesRead += prevBytesRead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be a race here, or is this code always called from one thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code does not change any state in CacheManager itself, it should not affect the thread safety of the outer object. So what is important is, will multiple threads call getOrCompute and pass in the same TaskContext (two threads operating on the same task). I don't think that happens since only a single thread operates on each task. Please let me know if I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good point -- you're right.
Can one of the admins verify this patch? |
Jenkins, this is OK to test |
Jenkins, test this please. |
Test build #23136 has started for PR 3120 at commit
|
Test build #23136 has finished for PR 3120 at commit
|
Test FAILed. |
@ksakellis it looks like this has a merge conflict now -- would you mind updating this PR? |
f1a615f
to
39353cb
Compare
@ash211 Just updated the pr. |
@pwendell Can you please comment on kay's suggestion? |
39353cb
to
e567029
Compare
@kayousterhout The test you pointed out: is actually not valid because of how cartesian is implemented. I added a comment to the interleaved reads unit test to describe the reasoning. |
@kayousterhout @pwendell ping? |
e567029
to
a2ca793
Compare
Jenkins, test this please. |
Test build #25404 has started for PR 3120 at commit
|
val prevBytesRead = existingMetrics | ||
.filter(_.readMethod == blockResult.inputMetrics.readMethod) | ||
.map(_.bytesRead) | ||
.getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what happens if we have input types that intermix here? For instance, what if they interleave between two input sources... will they just keep clobbering over eachother? It might be better to just chose a single input metric and stick with it, i.e. if we happen to be reading a block that wasn't derived from the same input as the one before it, just ignore it.
val blockInput = blockResult.inputMetrics
context.taskMetrics.inputMetrics match {
case Some(existingInput) =>
if (existingInput.readMethod == blockInput.readMethod) {
existingInput.bytesRead += blockInput.bytesRead
}
// NOTE: If we have interleaving of two input types in one task, we currently ignore blocks associated
// with all but one type (whichever type was seen first). See SPARK-XXX.
case None =>
context.taskMetrics.inputMetrics = Some(blockInput)
}
It's easier to document that behavior and also add a unit test for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after looking at Hadoop RDD - it might be necessary to just clobber here to preserve consistency with that case. But it could still be nicer to write this with a match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there are 3 input sources that interleave here? Suppose you have (1) input from cache, (2) input from Hadoop, and (3) input from cache. My understanding is that when (2) starts being read, it will clobber the input metrics from (1). Then, when (3) is read, it will again clobber the input metrics, so the metrics won't properly reflect the total data read from cache (they'll only reflect the data read from (3)). Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, when the cache is being used then yes there will be clobbering. There are a few solutions
- we can not filter on readMethod and just append blindly. That way we don't override any metrics but the eventual read method will not be correct (either first wins or last wins - whatever we choose)
- we model input metrics like we do with shuffle metrics where we collect an array of them and then finally we sum them up. - this is a bigger change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to do @pwendell 's suggestion, where you check the type of the input metrics and only append if it's the same type? I'd actually be slightly in favor of just returning a list of input metrics, one for each input type, because the other solutions seem a little hacky -- but defer to @sryza / @pwendell here (who I think had argued in the past that this extra complexity wasn't worth it).
Test build #25404 has finished for PR 3120 at commit
|
Test FAILed. |
@@ -153,34 +157,19 @@ class NewHadoopRDD[K, V]( | |||
throw new java.util.NoSuchElementException("End of stream") | |||
} | |||
havePair = false | |||
|
|||
// Update bytes read metric every few records | |||
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done intentionally to help keep the callback updates out of the InputMetrics
class and isolate it to Hadoop RDD. This notion of callbacks makes the InputMetrics class more complicated and mutable. Since it's an exposed class we really wanted to keep the interface clean and simple, even if it meant some extra engineering in HadoopRDD. So could this part of the change be reverted back to how it was before (and you don't change the InputMetrics/TaskMetrics classes?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pwendell There is a long thread in this pr between @sryza and @kayousterhout about why we need to add the call back to the input metrics. The reason is to prevent clobbering between different HadoopRdds. For example CartesianRdd - this is why there is a specific unit test for that case. I don't think we can do anything correctly if we don't have the callbacks in the inputMetrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that's fine then. I looked and it's all private[spark] so actually there is no change to visibility.
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method. A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other.
Also added a test for interleaving reads.
Tasks now only store/accumulate input metrics from the same read method. If a task has interleaved reads from more than one block of different read methods, we choose to store the first read methods metrics. https://issues.apache.org/jira/browse/SPARK-5225 addresses keeping track of all input metrics. This change also centralizes this logic in TaskMetrics and gates how inputMetrics can be added to TaskMetrics.
a2ca793
to
54e6658
Compare
Jenkins, test this please. |
LGTM pending tests |
Test build #25623 has started for PR 3120 at commit
|
Test build #25623 has finished for PR 3120 at commit
|
Test PASSed. |
@@ -146,6 +185,10 @@ class TaskMetrics extends Serializable { | |||
} | |||
_shuffleReadMetrics = Some(merged) | |||
} | |||
|
|||
private[spark] def updateInputMetrics() = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in your next pr, can u fix this by adding a return type explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this follows the method above: updateShuffleReadMetrics that doesn't have a return type. Should I change both then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to do that!
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method.
A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other.