diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index 4c4439eb9..042d27fc2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -244,17 +244,48 @@ object DataWritingCommandExecParser { args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT) // Extract the data format from the third argument val thirdArg = args.lift(2).getOrElse("").trim - val format = if (thirdArg.startsWith("[")) { - // Optional parameter is present in the eventlog. Get the fourth parameter by skipping the - // optional parameter string. + val rawFormat = if (thirdArg.startsWith("[")) { + // Optional parameter is present in the eventlog. + // Skip the optional parameters( `[params,*], FileFormat` ) + // and pick the FileFormat thirdArg.split("(?<=],)") .map(_.trim).lift(1).getOrElse("").split(",").headOption.getOrElse("").trim } else { thirdArg.split(",").headOption.getOrElse("").trim } + val format = extractFormatName(rawFormat) (path, format) } + /** + * Extracts the file format from a class object string, such as + * "com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c". + * + * This function is designed to handle cases where the RAPIDS plugin logs raw object names + * instead of a user-friendly file format name. For example, it extracts "Parquet" from + * "com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c". + * Refer: https://github.com/NVIDIA/spark-rapids-tools/issues/1561 + * + * If the input string does not match the expected pattern, the function returns the original + * string as a fallback. + * + * @param formatStr The raw format string, typically containing the class name of the file + * format. + * @return A user-friendly file format name (e.g., "Parquet") or the original string if no + * match is found. + */ + def extractFormatName(formatStr: String): String = { + // Extracting file format from the full object string + // 1. `.*\.` - Matches sequence of character between literal dots + // 2. `([a-zA-Z]+)FileFormat` - Captures fileFormat from the class name + // 3. `(@.*)` - Group capturing @ followed by any character + val formatRegex = """.*\.Gpu([a-zA-Z]+)FileFormat(@.*)?""".r + formatStr match { + case formatRegex(fileFormat, _) => fileFormat + case _ => formatStr // Return original if no match + } + } + // Helper function to determine the write mode (e.g., Append, Overwrite) from the description. def extractWriteMode(description: String): String = { val modes = Map( diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/WriteOperationParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/WriteOperationParserSuite.scala index e87c4644c..b742639ce 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/WriteOperationParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/WriteOperationParserSuite.scala @@ -103,6 +103,39 @@ class WriteOperationParserSuite extends FunSuite { ) } + test("getWriteOpMetaFromNode - Gpu logs profiler case") { + val testFileFormats = Seq( + ("com.nvidia.spark.rapids.GpuParquetFileFormat@9f5022c", "Parquet"), + ("com.nvidia.spark.rapids.GpuOrcFileFormat@123abc", "Orc"), + ("com.nvidia.spark.rapids.GpuHiveTextFileFormat@123abc", "HiveText"), + ("com.nvidia.spark.rapids.GpuHiveParquetFileFormat@123abc", "HiveParquet"), + ("com.nvidia.spark.rapids.GpuDeltaFileFormat@123abc", "Delta") + ) + testFileFormats.foreach { case (format, expectedDataFormat) => + val node = new SparkPlanGraphNode( + id = 1, + name = "Execute GpuInsertIntoHadoopFsRelationCommand", + desc = s"Execute GpuInsertIntoHadoopFsRelationCommand gs://path/to/database/table1, " + + s"false, $format, " + + "[serialization.format=1, mergeschema=false, __hive_compatible_bucketed_table_insertion__=true], " + + "Append, `spark_catalog`.`database`.`table`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, " + + "org.apache.spark.sql.execution.datasources.InMemoryFileIndex(gs://path/to/database/table1), " + + "[col01, col02, col03]", + Seq.empty + ) + testGetWriteOpMetaFromNode( + node, + expectedExecName = "InsertIntoHadoopFsRelationCommand", + expectedDataFormat = expectedDataFormat, + expectedOutputPath = "gs://path/to/database/table1", + expectedOutputColumns = "col01;col02;col03", + expectedWriteMode = "Append", + expectedTableName = "table1", + expectedDatabaseName = "database" + ) + } + } + test("AppendDataExecV1 - delta format") { val node = new SparkPlanGraphNode( id = 3,