diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 4a274d51b62c3..fca7d16012cee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import java.io.{File, FileWriter} +import java.nio.ByteOrder import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions @@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass } test("flatMapGroupsWithState, state ver 1") { + // Skip this test on big endian platforms because the timestampTimeoutAttribute of + // StateManagerImplV1 is declared as IntegerType instead of LongType which breaks + // serialization on big endian. This can't be fixed because it would be a breaking + // schema change. + assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) testFlatMapGroupsWithState(1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index f1feb62b7622a..d785cc1a7f446 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.io.File +import java.nio.ByteOrder import java.sql.Timestamp import org.apache.commons.io.FileUtils @@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF()) } + // Skip the v1 tests with timeout on big endian platforms because the + // timestampTimeoutAttribute of StateManagerImplV1 is declared as IntegerType instead + // of LongType which breaks serialization on big endian. This can't be fixed because it + // would be a breaking schema change. + def isStateFormatSupported(stateFormatVersion: Int): Boolean = { + stateFormatVersion != 1 || ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN) + } + testWithAllStateVersions("flatMapGroupsWithState - streaming with processing time timeout") { + assume( + isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))) // Function to maintain the count as state and set the proc. time timeout delay of 10 seconds. // It returns the count if changed, or -1 if the state was removed by timeout. val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { @@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time timeout + watermark") { + assume( + isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))) val inputData = MemoryStream[(String, Int)] val result = inputData.toDS()