From d6ca11e9353565c8c846b7573e5727a995732927 Mon Sep 17 00:00:00 2001 From: Jonathan Albrecht Date: Fri, 21 Feb 2025 07:12:11 +0900 Subject: [PATCH] [SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState tests with timeout on big endian platforms ### What changes were proposed in this pull request? Skip the v1 FlatMapGroupsWithState tests with timeout on big endian platforms. ### Why are the changes needed? The timestampTimeoutAttribute of StateManagerImplV1 is declared as IntegerType instead of LongType which breaks serialization on big endian platforms. This can't be fixed because it would be a breaking schema change so skip the tests instead. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested with existing tests on amd64 (little endian) and s390x (big endian) Below is the test result from s390x: ``` - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 (RocksDBStateStore) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:471) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 (RocksDBStateStore) -- - flatMapGroupsWithState - streaming with processing time timeout - state format version 2 (RocksDBStateStore with changelog checkpointing) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 (RocksDBStateStore) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED !!! FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)) was false (FlatMapGroupsWithStateSuite.scala:539) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 (RocksDBStateStore) -- - flatMapGroupsWithState - streaming w/ event time timeout + watermark - state format version 2 (RocksDBStateStore with changelog checkpointing) -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 -- - flatMapGroupsWithState, state ver 1 !!! CANCELED !!! java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) was false (StateDataSourceReadSuite.scala:802) -- - flatMapGroupsWithState, state ver 2 ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #49811 from jonathan-albrecht-ibm/master-endian-flatMapGroups. Authored-by: Jonathan Albrecht Signed-off-by: Jungtaek Lim --- .../v2/state/StateDataSourceReadSuite.scala | 6 ++++++ .../sql/streaming/FlatMapGroupsWithStateSuite.scala | 13 +++++++++++++ 2 files changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 4a274d51b62c3..fca7d16012cee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import java.io.{File, FileWriter} +import java.nio.ByteOrder import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions @@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass } test("flatMapGroupsWithState, state ver 1") { + // Skip this test on big endian platforms because the timestampTimeoutAttribute of + // StateManagerImplV1 is declared as IntegerType instead of LongType which breaks + // serialization on big endian. This can't be fixed because it would be a breaking + // schema change. + assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) testFlatMapGroupsWithState(1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index f1feb62b7622a..d785cc1a7f446 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.io.File +import java.nio.ByteOrder import java.sql.Timestamp import org.apache.commons.io.FileUtils @@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF()) } + // Skip the v1 tests with timeout on big endian platforms because the + // timestampTimeoutAttribute of StateManagerImplV1 is declared as IntegerType instead + // of LongType which breaks serialization on big endian. This can't be fixed because it + // would be a breaking schema change. + def isStateFormatSupported(stateFormatVersion: Int): Boolean = { + stateFormatVersion != 1 || ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN) + } + testWithAllStateVersions("flatMapGroupsWithState - streaming with processing time timeout") { + assume( + isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))) // Function to maintain the count as state and set the proc. time timeout delay of 10 seconds. // It returns the count if changed, or -1 if the state was removed by timeout. val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { @@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time timeout + watermark") { + assume( + isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))) val inputData = MemoryStream[(String, Int)] val result = inputData.toDS()