diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 08e88f8c74c58..bb1d6c23bb10c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3471,6 +3471,12 @@ ], "sqlState" : "42803" }, + "MISSING_TIMEOUT_CONFIGURATION" : { + "message" : [ + "The operation has timed out, but no timeout duration is configured. To set a processing time-based timeout, use 'GroupState.setTimeoutDuration()' in your 'mapGroupsWithState' or 'flatMapGroupsWithState' operation. For event-time-based timeout, use 'GroupState.setTimeoutTimestamp()' and define a watermark using 'Dataset.withWatermark()'." + ], + "sqlState" : "HY000" + }, "MISSING_WINDOW_SPECIFICATION" : { "message" : [ "Window specification is not defined in the WINDOW clause for . For more information about WINDOW clauses, please refer to '/sql-ref-syntax-qry-select-window.html'." @@ -8377,11 +8383,6 @@ "continuous mode is not supported!" ] }, - "_LEGACY_ERROR_TEMP_3168" : { - "message" : [ - "hasTimedOut is true however there's no timeout configured" - ] - }, "_LEGACY_ERROR_TEMP_3169" : { "message" : [ "AcceptsLatestSeenOffset is not supported with DSv1 streaming source: " diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index cb283699b4e32..d4e93642b8164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -218,7 +218,7 @@ private[sql] object GroupStateImpl { throw new IllegalArgumentException("eventTimeWatermarkMs must be 0 or positive if present") } if (hasTimedOut && timeoutConf == NoTimeout) { - throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3168") + throw new SparkUnsupportedOperationException("MISSING_TIMEOUT_CONFIGURATION") } new GroupStateImpl[S]( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala index 80c87d3297b01..69362dd60d889 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala @@ -302,13 +302,13 @@ class GroupStateSuite extends SparkFunSuite { TestGroupState.create[Int]( Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true) }, - condition = "_LEGACY_ERROR_TEMP_3168", + condition = "MISSING_TIMEOUT_CONFIGURATION", parameters = Map.empty) checkError( exception = intercept[SparkUnsupportedOperationException] { GroupStateImpl.createForStreaming[Int](Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false) }, - condition = "_LEGACY_ERROR_TEMP_3168", + condition = "MISSING_TIMEOUT_CONFIGURATION", parameters = Map.empty) }