Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Benchmarking to evaluate the core tools performance #10

Open
wants to merge 31 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5fa3fff
Add Benchmarking to evaluate the core tools performance
amahussein Jul 1, 2024
cf099b6
Adding changes for argument parsing
sayedbilalbari Jul 2, 2024
1f5155d
changing warmup time to warmup iterations
sayedbilalbari Jul 2, 2024
ad4e562
Removing unsed imports
sayedbilalbari Jul 2, 2024
303ee60
Adding Qualification Benchmark
sayedbilalbari Jul 2, 2024
6137021
Changes for review comments
sayedbilalbari Jul 2, 2024
4f85b8d
Changes for review comments
sayedbilalbari Jul 2, 2024
0131c71
removing name param from scallop options
sayedbilalbari Jul 2, 2024
8e3b18b
Merge pull request #11 from amahussein/spark-rapids-tools-1120-FEA_ARGS
bilalbari Jul 2, 2024
1026e69
Adding GC Metrics (#12)
bilalbari Jul 5, 2024
d9a8be1
Review changes
sayedbilalbari Jul 9, 2024
9033c82
Correcting scalastyle failure
sayedbilalbari Jul 9, 2024
9199881
Correcting passed argument name for semantic clarity
sayedbilalbari Jul 10, 2024
3001cb3
Short flag + desc - update
sayedbilalbari Jul 10, 2024
118a505
Updating short flag usage
sayedbilalbari Jul 10, 2024
eaa6a2c
Refactor for correcting structure
sayedbilalbari Jul 10, 2024
b6e7b3f
Review comments changes
sayedbilalbari Jul 11, 2024
73d3992
Review comment changes
sayedbilalbari Jul 11, 2024
3d9290f
Adding separator as a val
sayedbilalbari Jul 11, 2024
d6bc2da
Adding README for benchmark
sayedbilalbari Jul 11, 2024
3a8d4c5
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into spark…
sayedbilalbari Jul 12, 2024
25a4d80
Review changes for README
sayedbilalbari Jul 12, 2024
d0bc2c8
Add evaluate_summary command to internal CLI (#1185)
leewyang Jul 12, 2024
9b59546
Corrected wording in README
sayedbilalbari Jul 12, 2024
4470d7e
Updated models for new training data (#1186)
leewyang Jul 12, 2024
2000e28
Updated README review changes
sayedbilalbari Jul 12, 2024
f4d3293
Correcting README typo
sayedbilalbari Jul 12, 2024
a7db0dd
Correcting README typo
sayedbilalbari Jul 12, 2024
c424cc9
README typo - RAPIDS_TOOLs -> SPARK_RAPIDS..
sayedbilalbari Jul 12, 2024
9fe291e
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into spark…
sayedbilalbari Jul 15, 2024
17c2a05
Adding license header + README changes
sayedbilalbari Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.tool.benchmarks

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.NANOSECONDS

import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, ToolsTimer}

/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
*
* Utility class to benchmark components. An example of how to use this is:
* val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
* benchmark.addCase("V1")(<function>)
* benchmark.addCase("V2")(<function>)
* benchmark.run
* This will output the average time to run each function and the rate of each function.
*/
class Benchmark(
name: String = "Benchmarker",
minNumIters: Int,
warmUpIterations: Int,
outputPerIteration: Boolean = false) {
import Benchmark._

val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]

/**
* Adds a case to run when run() is called. The given function will be run for several
* iterations to collect timing statistics.
*
* @param name of the benchmark case
* @param numIters if non-zero, forces exactly this many iterations to be run
*/
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
addTimerCase(name, numIters) { timer =>
timer.startTiming()
f(timer.iteration)
timer.stopTiming()
}
}

/**
* Adds a case with manual timing control. When the function is run, timing does not start
* until timer.startTiming() is called within the given function. The corresponding
* timer.stopTiming() method must be called before the function returns.
*
* @param name of the benchmark case
* @param numIters if non-zero, forces exactly this many iterations to be run
*/
def addTimerCase(name: String, numIters: Int = 0)(f: ToolsTimer => Unit): Unit = {
benchmarks += Benchmark.Case(name, f, numIters)
}

/**
* Runs the benchmark and outputs the results to stdout. This should be copied and added as
* a comment with the benchmark. Although the results vary from machine to machine, it should
* provide some baseline.
*/
def run(): Seq[Result] = {
require(benchmarks.nonEmpty)
val separator = "-" * 80
println(separator)
println("Running benchmark: " + name)
println(separator)
val results = benchmarks.map { c =>
println(" RUNNING CASE : " + c.name)
println(separator)
measure(c.name, c.numIters)(c.fn)
}
println
results
}

/**
* Runs a single function `f` for iters, returning the average time the function took and
* the rate of the function.
*/
def measure(name: String, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
val separator = "-" * 80
for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
val runTimes = ArrayBuffer[Long]()
val gcCounts = ArrayBuffer[Long]()
val gcTimes = ArrayBuffer[Long]()
//For tracking maximum GC over iterations
for (i <- 0 until minIters) {
val timer = new ToolsTimer(i)
val memoryTracker = new MemoryMetricsTracker
f(timer)
val runTime = timer.totalTime()
runTimes += runTime
gcCounts += memoryTracker.getTotalGCCount
gcTimes += memoryTracker.getTotalGCTime
if (outputPerIteration) {
println(separator)
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
println(separator)
}
}
println(separator)
println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
println(separator)
assert(runTimes.nonEmpty)
val bestRuntime = runTimes.min
val avgRuntime = runTimes.sum / runTimes.size
val stdevRunTime = if (runTimes.size > 1) {
math.sqrt(runTimes.map(time => (time - avgRuntime) *
(time - avgRuntime)).sum / (runTimes.size - 1))
} else {
0
}
val maxGcCount = gcCounts.max
val stdevGcCount = if (gcCounts.size > 1) {
math.sqrt(gcCounts.map(gc => (gc - maxGcCount) *
(gc - maxGcCount)).sum / (gcCounts.size - 1))
} else {
0
}
val avgGcCount = gcCounts.sum / minIters
val avgGcTime = gcTimes.sum / minIters
val maxGcTime = gcTimes.max
Benchmark.Result(name, avgRuntime / 1000000.0,
bestRuntime / 1000000.0,
stdevRunTime / 1000000.0,
JVMMemoryParams(avgGcTime, avgGcCount, stdevGcCount, maxGcCount, maxGcTime))
}
}


object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
case class JVMMemoryParams( avgGCTime:Double, avgGCCount:Double,
stdDevGCCount: Double, maxGCCount: Long, maxGcTime:Long)
case class Result(caseName: String, avgMs: Double, bestMs: Double, stdevMs: Double,
memoryParams: JVMMemoryParams)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.tool.benchmarks

import org.rogach.scallop.{ScallopConf, ScallopOption}

class BenchmarkArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

banner("""
Benchmarker class for running various benchmarks.
""")

val iterations: ScallopOption[Int] = opt[Int](short = 'i', default = Some(5),
descr = "Total iterations to run excluding warmup (for avg time calculation)." +
" Default is 5 iterations", validate = _ > 0)
val warmupIterations: ScallopOption[Int] = opt[Int](short = 'w' ,
default = Some(3), descr = "Total number of warmup iterations to run. Can take " +
"any input >=0. Warm up is important for benchmarking to ensure initial " +
"JVM operations do not skew the result ( classloading etc. )", validate = _ >= 0)
val outputDirectory: ScallopOption[String] = opt[String](short = 'o',
default = Some("."), descr = "Base output directory for benchmark results. " +
"Default is current directory. The final output will go into a subdirectory called" +
" rapids-tools-benchmark. It will override any directory with the same name")
val outputFormat: ScallopOption[String] = opt[String](short = 'f',
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
// Conflict with `iterations` for using short flag - `i`.
// Going with - `a` for now
val inputArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
descr = "Input arguments to pass to the benchmark suite. Used as common arguments across " +
"benchmarks. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.tool.benchmarks

import java.io.{File, FileOutputStream, PrintStream}

import org.apache.commons.io.output.TeeOutputStream

import org.apache.spark.sql.rapids.tool.util.RuntimeUtil

/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
*
* A base class for generating benchmark results to a file.
* For JDK9+, JDK major version number is added to the result file to distinguish
* the generated results.
*/
abstract class BenchmarkBase {
private var output: Option[PrintStream] = None
private var benchmark: Option[Benchmark] = None
/**
* Main process of the whole benchmark.
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
* for each benchmark scenario.
*/
def runBenchmarkSuite(inputArgs: Array[String]): Unit

final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes
output.foreach(_.write(testHeader))
func
output.foreach(_.write('\n'))
}
def prefix: String = "rapids-tools-benchmark"
def suffix: String = ""

/**
* Add a benchmark case to the suite
* @param name Name of the benchmark case
* @param numIters Number of iterations to run
* @param f Function to run
*/
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
benchmark.get.addCase(name, numIters)(f)
}

/**
* Method to trigger the benchmarker cases run method
*/
def run(): Unit = {
// Getting correct output stream
val printStream = output.getOrElse(System.out)
// Running the underlying benchmark
val results: Seq[Benchmark.Result] = benchmark.get.run()
// Generating the output report
val firstBest = results.head.bestMs
val nameLen = Math.max(40, results.map(_.caseName.length).max)
printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
"Benchmark :", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
"Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
printStream.println("-" * (nameLen + 160))
results.foreach { result =>
printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
result.caseName,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
"%5.1f" format result.memoryParams.avgGCTime,
"%5.1f" format result.memoryParams.avgGCCount,
"%5.0f" format result.memoryParams.stdDevGCCount,
"%5d" format result.memoryParams.maxGcTime,
"%5d" format result.memoryParams.maxGCCount,
"%3.2fX" format (firstBest / result.bestMs))
}
printStream.println()
}

/**
* Method to print the system run specific information
* @param warmUpIterations Total warm up iterations
* @param iterations Total runtime iterations
* @param inputArgs Input arguments
*/
private def printSystemInformation(warmUpIterations: Int, iterations: Int,
inputArgs: String ): Unit = {
val jvmInfo = RuntimeUtil.getJVMOSInfo
output.get.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
output.get.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
output.get.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
output.get.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
output.get.printf(s"%-26s : %s MB \n","MaxHeapMemory",
(Runtime.getRuntime.maxMemory()/1024/1024).toString)
output.get.printf(s"%-26s : %s \n","Total Warm Up Iterations",
warmUpIterations.toString)
output.get.printf(s"%-26s : %s \n","Total Runtime Iterations",
iterations.toString)
output.get.printf(s"%-26s : %s \n \n","Input Arguments", inputArgs)
}

/**
* Any shutdown code to ensure a clean shutdown
*/
def afterAll(): Unit = {}

def main(args: Array[String]): Unit = {

val benchArgs = new BenchmarkArgs(args)
val dirRoot = benchArgs.outputDirectory().stripSuffix("/")
val resultFileName = "results.txt"
val dir = new File(s"$dirRoot/$prefix/")
if (!dir.exists()) {
dir.mkdirs()
}
val file = new File(dir, resultFileName)
if (!file.exists()) {
file.createNewFile()
}
// Creating a new output stream
// Using TeeOutputStream to multiplex output to both file and stdout
val outputStream = new FileOutputStream(file)
output = Some(new PrintStream(new TeeOutputStream(System.out, outputStream)))
benchmark = Some(new Benchmark(minNumIters = benchArgs.iterations(),
warmUpIterations = benchArgs.warmupIterations(),
outputPerIteration = true))
// Printing the system information
printSystemInformation(benchArgs.warmupIterations(),
benchArgs.iterations(), benchArgs.inputArgs())
// Passing the input arguments to the suite function
runBenchmarkSuite(benchArgs.inputArgs().split("\\s+").filter(_.nonEmpty))
// Closing the output stream
outputStream.close()
afterAll()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Benchmarking Tools

This package contains the relevant classes to write and run benchmarks for `SPARK RAPIDS TOOLS` for Apache Spark.

## Writing a benchmark -
* Extend `BenchmarkBase` and override the `runBenchmarkSuite` function
* Write logically similar benchmarks inside the `runBenchmark` which will add a header to the output with the name provided.
* Now for each case to be tested, use the Benchmark class `addCase` to write various cases of the same Benchmark.
* Call the run function to run the benchmark
* Refer included example benchmark - [`SingleThreadedQualToolBenchmark`](./SingleThreadedQualToolBenchmark.scala) for implementation details

## Running the benchmark -
Use the java command to run the created benchmark class with the following supported params -
* `-i` : total number of iterations to run to calculate average metrics
* `-w` : total number of warmup iterations to run before calculating the final metrics ( warmup is relevant so that final results are not skewed by the initial java classloading times )
* `-o` : output directory where to store the final result file. Defaults to the directory rapids-tools-benchmark in the root directory
* `-f` : output format of the stored result. Currently supports text. Json to be added in future iterations
* `-a` : input arguments to pass the underlying benchmark classes

#### Running the Benchmark class directly
```shell
java -cp $CLASSPATH:$SPARK_HOME/jars/*:$MAVEN-ARTIFACT-JAR \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
* `$CLASSPATH` : Path to the compiled class directory. Ex - `<SPARK_RAPIDS_TOOLS_REPO_PATH>/core/target/*`
* `$MAVEN-ARTIFACT-JAR` : Path to maven-artifact jar. Download jar from [here](https://mvnrepository.com/artifact/org.apache.maven/maven-artifact/3.9.0)
* `$EVENT_LOGS_DIR` : Path to the event logs directory

#### Running the Benchmark class using tools jar
```shell
java -cp $SPARK_RAPIDS_TOOLS_JAR:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
* `$SPARK_RAPIDS_TOOLS_JAR` : Path to the spark rapids tools jar
* `$EVENT_LOGS_DIR` : Path to the event logs directory

#### NOTES
* `$SPARK_HOME/jars/*` : Include the `SPARK` jars in the classpath in case benchmarking `QUALIFICATION/PROFILING` tool
Loading