Skip to content

Commit

Permalink
Merge remote-tracking branch 'nv/dev' into issue1552-bilal
Browse files Browse the repository at this point in the history
  • Loading branch information
sayedbilalbari committed Feb 27, 2025
2 parents 595edca + be68be4 commit 88142e8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,48 @@ object DataWritingCommandExecParser {
args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT)
// Extract the data format from the third argument
val thirdArg = args.lift(2).getOrElse("").trim
val format = if (thirdArg.startsWith("[")) {
// Optional parameter is present in the eventlog. Get the fourth parameter by skipping the
// optional parameter string.
val rawFormat = if (thirdArg.startsWith("[")) {
// Optional parameter is present in the eventlog.
// Skip the optional parameters( `[params,*], FileFormat` )
// and pick the FileFormat
thirdArg.split("(?<=],)")
.map(_.trim).lift(1).getOrElse("").split(",").headOption.getOrElse("").trim
} else {
thirdArg.split(",").headOption.getOrElse("").trim
}
val format = extractFormatName(rawFormat)
(path, format)
}

/**
* Extracts the file format from a class object string, such as
* "com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c".
*
* This function is designed to handle cases where the RAPIDS plugin logs raw object names
* instead of a user-friendly file format name. For example, it extracts "Parquet" from
* "com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c".
* Refer: https://github.com/NVIDIA/spark-rapids-tools/issues/1561
*
* If the input string does not match the expected pattern, the function returns the original
* string as a fallback.
*
* @param formatStr The raw format string, typically containing the class name of the file
* format.
* @return A user-friendly file format name (e.g., "Parquet") or the original string if no
* match is found.
*/
def extractFormatName(formatStr: String): String = {
// Extracting file format from the full object string
// 1. `.*\.` - Matches sequence of character between literal dots
// 2. `([a-zA-Z]+)FileFormat` - Captures fileFormat from the class name
// 3. `(@.*)` - Group capturing @ followed by any character
val formatRegex = """.*\.Gpu([a-zA-Z]+)FileFormat(@.*)?""".r
formatStr match {
case formatRegex(fileFormat, _) => fileFormat
case _ => formatStr // Return original if no match
}
}

// Helper function to determine the write mode (e.g., Append, Overwrite) from the description.
def extractWriteMode(description: String): String = {
val modes = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,39 @@ class WriteOperationParserSuite extends FunSuite {
)
}

test("getWriteOpMetaFromNode - Gpu logs profiler case") {
val testFileFormats = Seq(
("com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c", "Parquet"),
("com.nvidia.spark.rapids.GpuOrcFileFormat@123abc", "Orc"),
("com.nvidia.spark.rapids.GpuHiveTextFileFormat@123abc", "HiveText"),
("com.nvidia.spark.rapids.GpuHiveParquetFileFormat@123abc", "HiveParquet"),
("com.nvidia.spark.rapids.GpuDeltaFileFormat@123abc", "Delta")
)
testFileFormats.foreach { case (format, expectedDataFormat) =>
val node = new SparkPlanGraphNode(
id = 1,
name = "Execute GpuInsertIntoHadoopFsRelationCommand",
desc = s"Execute GpuInsertIntoHadoopFsRelationCommand gs://path/to/database/table1, " +
s"false, $format, " +
"[serialization.format=1, mergeschema=false, __hive_compatible_bucketed_table_insertion__=true], " +
"Append, `spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " +
"org.apache.spark.sql.execution.datasources.InMemoryFileIndex(gs://path/to/database/table1), " +
"[col01, col02, col03]",
Seq.empty
)
testGetWriteOpMetaFromNode(
node,
expectedExecName = "InsertIntoHadoopFsRelationCommand",
expectedDataFormat = expectedDataFormat,
expectedOutputPath = "gs://path/to/database/table1",
expectedOutputColumns = "col01;col02;col03",
expectedWriteMode = "Append",
expectedTableName = "table1",
expectedDatabaseName = "database"
)
}
}

test("AppendDataExecV1 - delta format") {
val node = new SparkPlanGraphNode(
id = 3,
Expand Down

0 comments on commit 88142e8

Please sign in to comment.