Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Spark UT framework: SPARK-34212 Parquet should read decimals correctly #11433

Closed
2 tasks done
Tracked by #11403
Feng-Jiang28 opened this issue Sep 6, 2024 · 3 comments · Fixed by #12060
Closed
2 tasks done
Tracked by #11403

[BUG] Spark UT framework: SPARK-34212 Parquet should read decimals correctly #11433

Feng-Jiang28 opened this issue Sep 6, 2024 · 3 comments · Fixed by #12060
Assignees
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@Feng-Jiang28
Copy link
Contributor

Feng-Jiang28 commented Sep 6, 2024

Exception: a mismatch between the schema of the data stored in the Parquet file and the schema you're trying to use when reading it.

Reproduce steps :

Widening case:

Start plugin:

$SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.rapids.sql.enabled=true

CPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+


scala> df.write.parquet("/home/fejiang/Documents/temp")

scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
+----+-----+-----+
|   a|    b|    c|
+----+-----+-----+
|1.00|1.230|1.230|
+----+-----+-----+


GPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
24/09/06 15:30:30 WARN GpuOverrides: 
  ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+


scala> df.write.parquet("/home/fejiang/Documents/temp")
24/09/06 15:30:31 WARN GpuOverrides: 
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec


scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
24/09/06 15:30:32 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU

24/09/06 15:30:32 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/fejiang/Documents/temp/part-00000-9f83dfb2-74c0-4b2a-9bd4-5faadaeb893d-c000.snappy.parquet. Column: a, Expected: decimal(3,2), Found: required int32 a (DECIMAL(2,1))
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.throwTypeIncompatibleError(GpuParquetScan.scala:1025)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12(GpuParquetScan.scala:757)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12$adapted(GpuParquetScan.scala:757)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$6(GpuParquetScan.scala:878)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkPrimitiveCompat(GpuParquetScan.scala:1009)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:878)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3(GpuParquetScan.scala:830)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3$adapted(GpuParquetScan.scala:821)
	at scala.Option.foreach(Option.scala:407)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2(GpuParquetScan.scala:821)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2$adapted(GpuParquetScan.scala:820)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)

Narrow case:

Start plugin:

$SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.rapids.sql.enabled=true

CPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+


scala> df.write.parquet("/tmp/narrowSchema")

scala>val schema1 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
schema1: String = a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)

scala> val df2 = spark.read.schema(schema1).parquet("/tmp/narrowSchema")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,0), b: decimal(18,1) ... 1 more field]

scala> scala> df2.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|1.2|1.2|
+---+---+---+



GPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala>scala> df.show
25/01/21 06:21:48 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> 1.0 AS a#120 will run on GPU
  *Expression <Alias> 1.23 AS b#121 will run on GPU
  *Expression <Alias> 1.23 AS c#122 will run on GPU
  ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+



scala> df.write.mode("overwrite").parquet("/tmp/parquetWrite")
25/01/21 06:43:45 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> 1.0 AS a#0 will run on GPU
    *Expression <Alias> 1.23 AS b#1 will run on GPU
    *Expression <Alias> 1.23 AS c#2 will run on GPU
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec



scala>val schema1 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
schema1: String = a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)

scala> val df2 = spark.read.schema(schema2).parquet("/tmp/parquetWrite")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,0), b: decimal(18,1) ... 1 more field]

scala> df2.show
25/01/21 06:45:12 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(a#153 as string) AS a#162 will run on GPU
      *Expression <Cast> cast(a#153 as string) will run on GPU
    *Expression <Alias> cast(b#154 as string) AS b#163 will run on GPU
      *Expression <Cast> cast(b#154 as string) will run on GPU
    *Expression <Alias> cast(c#155 as string) AS c#164 will run on GPU
      *Expression <Cast> cast(c#155 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

25/01/21 06:45:12 ERROR Executor: Exception in task 0.0 in stage 13.0 (TID 13)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///tmp/parquetWrite/part-00000-560c392e-1c9f-4b3f-95ec-f3c37fd7ed61-c000.snappy.parquet. Column: a, Expected: decimal(3,0), Found: required int32 a (DECIMAL(2,1))
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.throwTypeIncompatibleError(GpuParquetScan.scala:1023)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12(GpuParquetScan.scala:760)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12$adapted(GpuParquetScan.scala:760)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$6(GpuParquetScan.scala:881)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkPrimitiveCompat(GpuParquetScan.scala:1007)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:881)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3(GpuParquetScan.scala:833)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3$adapted(GpuParquetScan.scala:824)
	at scala.Option.foreach(Option.scala:407)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2(GpuParquetScan.scala:824)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2$adapted(GpuParquetScan.scala:823)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:823)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$11(GpuParquetScan.scala:759)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$1(GpuParquetScan.scala:755)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.filterBlocks(GpuParquetScan.scala:679)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.com$nvidia$spark$rapids$GpuParquetMultiFilePartitionReaderFactory$$filterBlocksForCoalescingReader(GpuParquetScan.scala:1159)

@Feng-Jiang28 Feng-Jiang28 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Sep 6, 2024
@mattahrens mattahrens added good first issue Good for newcomers and removed ? - Needs Triage Need team to review and classify labels Sep 10, 2024
@Feng-Jiang28 Feng-Jiang28 changed the title SPARK-34212 Parquet should read decimals correctly [BUG] Spark UT framework: SPARK-34212 Parquet should read decimals correctly Oct 10, 2024
@mattahrens mattahrens added the ? - Needs Triage Need team to review and classify label Oct 29, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Oct 31, 2024
@nartal1
Copy link
Collaborator

nartal1 commented Nov 22, 2024

Will test if all the tests in UT-34212 is passing before closing this issue. Need to enable the UT before closing

.exclude("SPARK-34212 Parquet should read decimals correctly", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11433"))

@nartal1
Copy link
Collaborator

nartal1 commented Jan 21, 2025

SPARK-34212 has tests where the narrowing of precision is also tested which is not implemented in spark-rapids.
In the tests, CPU implementation takes the non-vectorized path(Spark-MR).
I have updated the issue description and added these as sub tasks. Keeping this issue open for now.

@revans2
Copy link
Collaborator

revans2 commented Jan 21, 2025

@nartal1 so what happens if the precision/or scale is narrowed? Is it rounded like we do for cast? If so then we probably can reuse that code without much work. If it is something else so that they can be hive compatible, then it should not be too bad, and we can probably figure it all out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
4 participants