Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4874] [CORE] Collect record count metrics #4067

Closed
wants to merge 11 commits into from

Conversation

ksakellis
Copy link

Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter every time the iterators get accessed.

For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up.

For backwards compatibility, if we deserialize an older event that doesn't have record metrics, we set the metric to -1.

@@ -17,6 +17,8 @@

package org.apache.spark

import org.apache.spark.util.AfterNextInterceptingIterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go with the other org.apache.spark imports

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25628 has finished for PR 4067 at commit 571cb69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @tparam A the iterable type
*/
private[spark]
class InterceptingIterator[A](sub: Iterator[A]) extends Iterator[A] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this? Seems fairly expensive by adding a lot more method calls ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is supposed to be a generic way of intercepting iterators. If we don't have this, i'd have to do something custom like CompletionIterator - i was trying to make something reusable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the thing is there is only a very limited number of places that you'd need to increment the counters. I'm not sure if this super generic design is worth it, unless you want to do a lot of performance studies of the differences ...

@rxin
Copy link
Contributor

rxin commented Jan 16, 2015

Can you also paste some screenshots on what the UI changes look like? Thanks.

val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency()
override def afterNext(next: T) : T = {
readMetrics.recordsRead += 1
logError("Read record " + next)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not intended to be here, is it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops.. nope.

* Total records read.
*/
def recordsRead: Long = _recordsRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what this does?

@ksakellis
Copy link
Author

This change was dependent on #3120, that just got merged and now there are some merge conflicts. I need to fix those first and will update the pr.

@@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite {
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)

writer.write(Long.box(20))
// Record metrics update on every write
assert(writeMetrics.recordsWritten == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'd want to use === instead of ==

@rxin
Copy link
Contributor

rxin commented Jan 16, 2015

Hey @ksakellis - Thanks for working on this.

I took a very quick look at the patch. Overall I feel the patch should be fairly straightforward, but the specific implementations might've gone a bit over board with Scala language features (a lot of Options, orElse, closures, etc) and design (too many new classes introduced). If we can reduce those, the pr would be a lot easier to understand.

_bytesRead.addAndGet(bytes)
}

def addRecordsRead(records: Long) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe incrementRecordsRead and incrementBytesRead are better names?

@ksakellis
Copy link
Author

@rxin I updated the PR after doing a rebase and also incorporated some of your feedback. You made two general comments:

  1. specific implementations might've gone a bit over board with Scala language features
  2. too many new classes introduced
    Can you please be more specific here pointing to the specific code. These comments are not very actionable as is.

@rxin
Copy link
Contributor

rxin commented Jan 16, 2015

Hi again - can't find my previous comment since the line is no longer in the diff due to the other pr being merged. Can you still add comment for that one (the part with Option and orElse and set ...)? Want to make sure if we read that code one year from now, we can still understand what's going on.

@rxin
Copy link
Contributor

rxin commented Jan 16, 2015

The Scala stuff was mostly about the previous PR that got merged (and now no longer showing up as part of this diff).

@ksakellis
Copy link
Author

So is this code you were referring to in HadoopRDD?

      // Find a function that will return the FileSystem bytes read by this thread. Do this before
      // creating RecordReader, because RecordReader's constructor might read some bytes
      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
        split.inputSplit.value match {
          case split: FileSplit =>
            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
          case _ => None
        }
      )

@ksakellis
Copy link
Author

screen shot 2015-01-16 at 12 04 04 am
Shows a stage that has Input Metrics (reading from a file) and writes data for next stage.

screen shot 2015-01-16 at 12 04 15 am
Shows a stage that has both shuffle reading and writing - no input or output metrics.

screen shot 2015-01-16 at 12 04 25 am
Shows a stage that has outputting to a file.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25638 timed out for PR 4067 at commit 1572054 after a configured wait of 120m.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25641 has finished for PR 4067 at commit 3c2d021.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class AfterNextInterceptingIterator[A](sub: Iterator[A]) extends Iterator[A]

@pwendell
Copy link
Contributor

What about combining the input size and records in the same column. Overall this will help with the expansion in the number of columns. The title could be "Input Size / Records"

@ksakellis
Copy link
Author

If we do that we wouldn't be able to sort on num records and bytes independently.

@pwendell
Copy link
Contributor

Yes - you'd only be able to sort on bytes. Wouldn't that be okay? These would likely track closely in most cases.

@ksakellis
Copy link
Author

A big motivation to add recordsRead/Written was to detect data skew. In these cases bytes and records might not track very closely.

Thinking more about this, I suspect that having an Avg. record Size column (bytesRead/recordsRead) would be what you'd want to sort on. We could add this metric to the UI, make it sortable and then combine the bytesRead and recordsRead metrics into one column. Thoughts?

@@ -238,6 +245,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
_bytesRead.addAndGet(bytes)
}

def addRecordsRead(records: Long) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be incRecordsRead in keeping with SPARK-3288.

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26876 has finished for PR 4067 at commit e156560.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ksakellis
Copy link
Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26896 has finished for PR 4067 at commit e156560.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Kostas Sakellis added 10 commits February 5, 2015 22:22
Collects record counts for both Input/Output and Shuffle
Metrics. For the input/output metrics, it just appends
the counter everytime the iterators get accessed.

For shuffle on the write side, we count the metrics post
aggregation (after a map side combine) and on the read
side we count the metrics pre aggregation. This allows
both the bytes read/written metrics and the
records read/written to line up.

For backwards compatibiliy, if we deserialize an older
event that doesn't have record metrics, we set the
metric to -1.
Also made the availabiliy of the # records more
complete.
- Hide columns in executor summary table if no data
- revert change to show output metrics for hadoop < 2.4
- other cr feedback.
@@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need braces here if it is a single import.

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26904 has finished for PR 4067 at commit dad4d57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26906 has finished for PR 4067 at commit bd919be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pwendell
Copy link
Contributor

pwendell commented Feb 6, 2015

Jenkins, test this please. This LGTM pending tests.

@SparkQA
Copy link

SparkQA commented Feb 6, 2015

Test build #26936 has finished for PR 4067 at commit bd919be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pwendell
Copy link
Contributor

pwendell commented Feb 6, 2015

Merging this, thanks Kos.

asfgit pushed a commit that referenced this pull request Feb 6, 2015
Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter every time the iterators get accessed.

For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up.

For backwards compatibility, if we deserialize an older event that doesn't have record metrics, we set the metric to -1.

Author: Kostas Sakellis <kostas@cloudera.com>

Closes #4067 from ksakellis/kostas-spark-4874 and squashes the following commits:

bd919be [Kostas Sakellis] Changed 'Records Read' in shuffleReadMetrics json output to 'Total Records Read'
dad4d57 [Kostas Sakellis] Add a comment and check to BlockObjectWriter so that it cannot be reopend.
6f236a1 [Kostas Sakellis] Renamed _recordsWritten in ShuffleWriteMetrics to be more consistent
70620a0 [Kostas Sakellis] CR Feedback
17faa3a [Kostas Sakellis] Removed AtomicLong in favour of using Long
b6f9923 [Kostas Sakellis] Merge AfterNextInterceptingIterator with InterruptableIterator to save a function call
46c8186 [Kostas Sakellis] Combined Bytes and # records into one column
57551c1 [Kostas Sakellis] Conforms to SPARK-3288
6cdb44e [Kostas Sakellis] Removed the generic InterceptingIterator and repalced it with specific implementation
1aa273c [Kostas Sakellis] CR Feedback
1bb78b1 [Kostas Sakellis] [SPARK-4874] [CORE] Collect record count metrics

(cherry picked from commit dcd1e42)
Signed-off-by: Patrick Wendell <patrick@databricks.com>
@asfgit asfgit closed this in dcd1e42 Feb 6, 2015
@JoshRosen
Copy link
Contributor

It looks like the "InputOutputMetricsSuite input metrics with mixed read methods" and "InputOutputMetricsSuite input metrics with interleaved reads" test may have started failing in the hadoop-2.2 build since this patch:

https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=centos/29/testReport/

@ksakellis
Copy link
Author

Yikes, @JoshRosen i'm looking into this.

@aguyyala
Copy link

aguyyala commented Apr 27, 2016

@ksakellis, @SparkQA , @preaudc How do I collect these metrics on a console (Spark Shell or Spark submit job) right after the task or job is done.

We are using Spark to load data from Mysql to Cassandra and it is quite huge (ex: ~200 GB and 600M rows). When the task the done, we want to verify how many rows exactly did spark process? We can get the number from Spark UI, but how can we retrieve that number ("Output Records Written") from spark shell or in spark-submit job.

Sample Command to load from Mysql to Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))

I want to retrieve all the Spark UI metrics on the above task mainly Output size and Records Written.

Please help.

Thanks for your time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants