Skip to content

Commit

Permalink
Add store and replay exec feature
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <res_life@163.com>
  • Loading branch information
Chong Gao committed May 17, 2024
1 parent 44c6e6b commit 2702c0a
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 1 deletion.
64 changes: 64 additions & 0 deletions docs/dev/replay-exec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
This doc describes how to dump an Exec runtime meta and column batch data to a local directory,
and replay the Exec with dumped meta and data. When we encounter a perf issue, then use this tool
can dump the runtime meta/data. Because dumped column batch is less than 2G, it's easy to collect
NSYS and NCU information when replaying the dumped runtime meta/data.

# Dump a Exec meta and a column batch data

## compile Spark-Rapids jar
e.g.: compile for Spark330
```
mvn clean install -DskipTests -pl dist -am -DallowConventionalDistJar=true -Dbuildver=330
```

## enable dump
e.g.:
```
// specify the Exec for dumping, currently only supports `project`
spark.conf.set("spark.rapids.sql.test.replay.exec.type", "project")
// specify the local dump directory, default value is "/tmp"
// should first create this directory
spark.conf.set("spark.rapids.sql.test.replay.exec.dumpDir", "/tmp/replay-exec")
// Only dump the column bach when executing time against it exceeds this threshold time in MS
spark.conf.set("spark.rapids.sql.test.replay.exec.threshold.timeMS", 100)
// If sepcified, only dump when the Exec SQL contains this filter pattern
// Default value is empty which means no filter
// This example means enable dumping when Exec SQL contains "get_json_object"
spark.conf.set("spark.rapids.sql.test.replay.exec.filter.include", "get_json_object")
```

## run a Spark job
This dumping only happens when GPU Exec is running, so should enable Spark-Rapids.
After the job is done, check the dump path will have files like:
```
/tmp/replay-exec:
- GpuProjectExec.meta // this is serialized GPU Project Exec case class
- cb_types.meta // this is types for column batch
- cb_data_101656570.parquet // this is data for column batch
```

# Replay saved Exec runtime meta and data

## Set environment variable
```
export PLUGIN_JAR=path_to_spark_rapids_jar
export SPARK_HOME=path_to_spark_home_330
```

## replay command

### Project replay
```
$SPARK_HOME/bin/spark-submit \
--class com.nvidia.spark.ProjectExecReplayer \
--conf spark.rapids.sql.explain=ALL \
--master local[*] \
--jars ${PLUGIN_JAR} \
${PLUGIN_JAR} <path_to_saved_replay_dir>
```

### Agg replay
TODO
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ def test_subtraction(data_gen):
f.col('b') - f.lit(None).cast(data_type),
f.col('a') - f.col('b')))

@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
def test_my_test(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).select(
f.col('a') + f.col('b')))


@pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 5),
DecimalGen(6, 4), DecimalGen(5, 4), DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(3, -2),
DecimalGen(16, 7), DecimalGen(19, 0), DecimalGen(30, 10)], ids=idfn)
Expand Down
153 changes: 153 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/ProjectExecReplayer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark

import java.io.File
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}

import scala.reflect.ClassTag

import ai.rapids.cudf.{NvtxColor, Table}
import com.nvidia.spark.rapids.{GpuBindReferences, GpuColumnVector, GpuMetric, GpuProjectExec, GpuProjectExecLike, NvtxWithMetrics, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuMetric.{DESCRIPTION_OP_TIME, MODERATE_LEVEL, OP_TIME}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* This is a copy from `ProjectExec`.
* Added a replay method which is copied
* from `ProjectExec.internalDoExecuteColumnar`.
*/
case class ProjectExecReplayer(
projectList: List[NamedExpression],
child: SparkPlan)(
useTieredProject: Boolean = false
) extends GpuProjectExecLike {

override def otherCopyArgs: Seq[AnyRef] =
Seq[AnyRef](useTieredProject.asInstanceOf[java.lang.Boolean])

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException()
}

def replay(cb: ColumnarBatch): ColumnarBatch = {
val opTime = gpuLongMetric(OP_TIME)
val boundProjectList = GpuBindReferences.bindGpuReferencesTiered(projectList, child.output,
useTieredProject)
withResource(new NvtxWithMetrics("ProjectExec", NvtxColor.CYAN, opTime)) { _ =>
val sb = SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)

// will close the passed in sb in this invoke
boundProjectList.projectAndCloseWithRetrySingleBatch(sb)
}
}
}

object ProjectExecReplayer extends Logging {
private def deserializeObject[T: ClassTag](readPath: String): T = {
val bytes = Files.readAllBytes(Paths.get(readPath))
val buffer = ByteBuffer.wrap(bytes)
SparkEnv.get.closureSerializer.newInstance().deserialize(buffer)
}

/**
* Replay data dir should contains:
* - GpuProjectExec.meta
* - cb_types.meta
* -
* @param args specify one dir which contains replay data
*/
def main(args: Array[String]): Unit = {
// check arguments and get paths
if (args.length < 1) {
logError("Project Exec replayer: Specify a replay dir that contains replay data")
return
}
val replayDir = args(0)
val cbTypesPath = replayDir + "/cb_types.meta"
if (!(new File(cbTypesPath).exists() && new File(cbTypesPath).isFile)) {
logError(s"Project Exec replayer: there is no cb_types.meta file in $replayDir")
return
}
val projectMetaPath = replayDir + "/GpuProjectExec.meta"
if (!(new File(projectMetaPath).exists() && new File(projectMetaPath).isFile)) {
logError(s"Project Exec replayer: there is no GpuProjectExec.meta file in $replayDir")
return
}
// find Parquet a file, e.g.: cb_data_101656570.parquet
val parquets = new File(replayDir).listFiles(f => f.getName.startsWith("cb_data_")
&& f.getName.endsWith(".parquet"))
if (parquets == null || parquets.isEmpty) {
logError(s"Project Exec replayer: there is no cb_data_xxx.parquet file in $replayDir")
return
}
// only test 1 parquet
val cbPath = parquets(0).getAbsolutePath

logWarning("Project Exec replayer: start running.")

// start a Spark session with Spark-Rapids initialization
SparkSession.builder()
.master("local[*]")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.appName("Test Baidu get_json_object diffs")
.getOrCreate()

logWarning("Project Exec replayer: started a Spark session")

// restore project meta
val restoredProject: GpuProjectExec = deserializeObject[GpuProjectExec](projectMetaPath)
// print project `projectList`
restoredProject.projectList.foreach { expr =>
logWarning(s"Project Exec replayer: Project expression: ${expr.sql}")
}
logWarning("Project Exec replayer: restored Project Exec meta")

// restore column batch data
val restoredCbTypes = deserializeObject[Array[DataType]](cbTypesPath)
withResource(Table.readParquet(new File(cbPath))) { restoredTable =>
// this `restoredCb` will be closed in the `replay`
val restoredCb = GpuColumnVector.from(restoredTable, restoredCbTypes)
logWarning("Project Exec replayer: restored column batch data")

logWarning("Project Exec replayer: begin to replay")
// replay project
val replayer =
ProjectExecReplayer(restoredProject.projectList, restoredProject.child)(false)
withResource(replayer.replay(restoredCb)) { retCB =>
logWarning(s"Project Exec replayer: project result has ${retCB.numRows()} rows.")
}

logWarning("Project Exec replayer: project replay completed successfully!!!")
}
}
}
45 changes: 45 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2188,6 +2188,42 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(1024)

/**
* refer to dev doc: `replay-exec.md`
* only supports "project", will supports "agg" later
*/
val TEST_REPLAY_EXEC_TYPE =
conf("spark.rapids.sql.test.replay.exec.type")
.doc("Only for tests: Define the Exec type for dumping")
.internal()
.stringConf
.createWithDefault("")

val TEST_REPLAY_EXEC_DUMP_DIR =
conf("spark.rapids.sql.test.replay.exec.dumpDir")
.doc("Only for tests: Define the directory when dumping Exec runtime " +
"meta and column batch data")
.internal()
.stringConf
.createWithDefault("/tmp")



val TEST_REPLAY_EXEC_THRESHOLD_TIME_MS =
conf("spark.rapids.sql.test.replay.exec.threshold.timeMS")
.doc("Only for tests: Only dump the column bach when executing time against it " +
" exceeds this threshold time in MS")
.internal()
.integerConf
.createWithDefault(100)

val TEST_REPLAY_EXEC_FILTER_INCLUDE =
conf("spark.rapids.sql.test.replay.exec.filter.include")
.doc("Only for tests: Only dump when the Exec SQL contains this filter pattern")
.internal()
.stringConf
.createWithDefault("")

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -2973,6 +3009,15 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val testGetJsonObjectSaveRows: Int = get(TEST_GET_JSON_OBJECT_SAVE_ROWS)

lazy val testReplayExecDumpDir: String = get(TEST_REPLAY_EXEC_DUMP_DIR)

lazy val testReplayExecType: String = get(TEST_REPLAY_EXEC_TYPE)

lazy val testReplayExecThresholdTimeMS: Int = get(TEST_REPLAY_EXEC_THRESHOLD_TIME_MS)

lazy val testReplayExecFilterInclude: String = get(TEST_REPLAY_EXEC_FILTER_INCLUDE)


private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Loading

0 comments on commit 2702c0a

Please sign in to comment.