From e219d2b23d84e1e2d5fe09c3733bd13ddb1391e1 Mon Sep 17 00:00:00 2001 From: Zeyu Chen Date: Fri, 21 Feb 2025 13:39:40 -0800 Subject: [PATCH] SPARK-51252 Clean up tests and fix code formatting --- .../streaming/ClientStreamingQuerySuite.scala | 3 +- .../state/StateStoreInstanceMetricSuite.scala | 706 +++++++----------- 2 files changed, 257 insertions(+), 452 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala index e7e8016d61671..9e27904b6da87 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala @@ -57,8 +57,7 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L // This verifies standard streaming API by starting a streaming query with windowed count. withSQLConf( "spark.sql.shuffle.partitions" -> "1", // Avoid too many reducers. - "spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport" -> "0" - ) { + "spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport" -> "0") { val readDF = spark.readStream .format("rate") .option("rowsPerSecond", "10") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala index 63a717127ad4b..b5f2682db1d47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala @@ -60,471 +60,277 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF s"$SNAPSHOT_LAG_METRIC_PREFIX${partitionId}_$storeName" } - testWithChangelogCheckpointingEnabled( - "SPARK-51097: Verify snapshot lag metrics are updated correctly with RocksDBStateStoreProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" - ) { - withTempDir { checkpointDir => - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() + Seq( + ("SPARK-51097", "RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName), + ("SPARK-51251", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName) + ).foreach { + case (ticketPrefix, providerName, providerClassName) => + testWithChangelogCheckpointingEnabled( + s"$ticketPrefix: Verify snapshot lag metrics are updated correctly with $providerName" + ) { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", + SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" + ) { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result = inputData.toDS().dropDuplicates() - testStream(result, outputMode = OutputMode.Update)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(inputData, "a"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - CheckNewAnswer("a", "b"), - Execute { q => - // Make sure only smallest K active metrics are published - eventually(timeout(10.seconds)) { - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - assert(instanceMetrics.forall(_._2 == 1)) - } - }, - StopStream - ) + testStream(result, outputMode = OutputMode.Update)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), + CheckNewAnswer("a", "b"), + Execute { q => + // Make sure only smallest K active metrics are published + eventually(timeout(10.seconds)) { + val instanceMetrics = q.lastProgress + .stateOperators(0) + .customMetrics + .asScala + .view + .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) + // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT + assert( + instanceMetrics.size == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) + ) + // All state store instances should have uploaded a version + assert(instanceMetrics.forall(_._2 == 2)) + } + }, + StopStream + ) + } + } } - } - } - - testWithChangelogCheckpointingEnabled( - "SPARK-51097: Verify snapshot lag metrics are updated correctly with " + - "RocksDBSkipMaintenanceOnCertainPartitionsProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> - classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" - ) { - withTempDir { checkpointDir => - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() - - testStream(result, outputMode = OutputMode.Update)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(inputData, "a"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - CheckNewAnswer("a", "b"), - Execute { q => - // Partitions getting skipped (id 0 and 1) do not have an uploaded version, leaving - // those instance metrics as -1. - eventually(timeout(10.seconds)) { - assert( - q.lastProgress - .stateOperators(0) - .customMetrics - .get(snapshotLagMetricName(0)) === -1 - ) - assert( - q.lastProgress - .stateOperators(0) - .customMetrics - .get(snapshotLagMetricName(1)) === -1 - ) - // Make sure only smallest K active metrics are published - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // Two metrics published are -1, the remainder should all be 1 as they - // uploaded properly. - assert( - instanceMetrics.count(_._2 == 1) == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 - ) - } - }, - StopStream - ) - } - } - } - - testWithChangelogCheckpointingEnabled( - "SPARK-51097: Verify snapshot lag metrics are updated correctly for join queries with " + - "RocksDBStateStoreProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> - classOf[RocksDBStateStoreProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" - ) { - withTempDir { checkpointDir => - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] - - val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") - val df2 = input2 - .toDF() - .select($"value" as "rightKey", ($"value" * 3) as "rightValue") - val joined = df1.join(df2, expr("leftKey = rightKey")) - - testStream(joined)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(input1, 1, 5), - ProcessAllAvailable(), - AddData(input2, 1, 5, 10), - ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), - Execute { q => - eventually(timeout(10.seconds)) { - // Make sure only smallest K active metrics are published. - // There are 5 * 4 = 20 metrics in total because of join, but only 10 are published. - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // All state store instances should have uploaded a version - assert(instanceMetrics.forall(_._2 == 1)) - } - }, - StopStream - ) - } - } - } - - testWithChangelogCheckpointingEnabled( - "SPARK-51097: Verify snapshot lag metrics are updated correctly for join queries with " + - "RocksDBSkipMaintenanceOnCertainPartitionsProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> - classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" - ) { - withTempDir { checkpointDir => - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] - - val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") - val df2 = input2 - .toDF() - .select($"value" as "rightKey", ($"value" * 3) as "rightValue") - val joined = df1.join(df2, expr("leftKey = rightKey")) - - testStream(joined)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(input1, 1, 5), - ProcessAllAvailable(), - AddData(input2, 1, 5, 10), - ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), - Execute { q => - eventually(timeout(10.seconds)) { - // Make sure only smallest K active metrics are published. - // There are 5 * 4 = 20 metrics in total because of join, but only 10 are published. - val allInstanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - val badInstanceMetrics = allInstanceMetrics.filterKeys( - k => - k.startsWith(snapshotLagMetricName(0, "")) || - k.startsWith(snapshotLagMetricName(1, "")) - ) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - allInstanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // Two ids are blocked, each with four state stores - assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4) - // The rest should have uploaded a version - assert( - allInstanceMetrics.count(_._2 == 1) == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 * 4 - ) - } - }, - StopStream - ) - } - } } - testWithChangelogCheckpointingEnabled( - "SPARK-51252: Verify snapshot lag metrics are updated correctly with " + - "HDFSBackedStateStoreProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[HDFSBackedStateStoreProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "5" - ) { - withTempDir { checkpointDir => - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() + Seq( + ( + "SPARK-51097", + "RocksDBSkipMaintenanceOnCertainPartitionsProvider", + classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName + ), + ( + "SPARK-51251", + "HDFSBackedSkipMaintenanceOnCertainPartitionsProvider", + classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName + ) + ).foreach { + case (ticketPrefix, providerName, providerClassName) => + testWithChangelogCheckpointingEnabled( + s"$ticketPrefix: Verify snapshot lag metrics are updated correctly with $providerName" + ) { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", + SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" + ) { + withTempDir { checkpointDir => + val inputData = MemoryStream[String] + val result = inputData.toDS().dropDuplicates() - testStream(result, outputMode = OutputMode.Update)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(inputData, "a"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - CheckNewAnswer("a", "b"), - Execute { q => - // Make sure only smallest K active metrics are published - eventually(timeout(10.seconds)) { - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // Because of how HDFSBackedStateStoreProvider uses strict inequality - // to determine if a snapshot is needed, the first upload is actually - // done at version 2 instead of 1 (RocksDB uses non-strict inequalities). - assert(instanceMetrics.forall(_._2 == 2)) - } - }, - StopStream - ) + testStream(result, outputMode = OutputMode.Update)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(inputData, "a"), + ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), + AddData(inputData, "b"), + ProcessAllAvailable(), + CheckNewAnswer("a", "b"), + Execute { q => + // Partitions getting skipped (id 0 and 1) do not have an uploaded version, leaving + // those instance metrics as -1. + eventually(timeout(10.seconds)) { + assert( + q.lastProgress + .stateOperators(0) + .customMetrics + .get(snapshotLagMetricName(0)) === -1 + ) + assert( + q.lastProgress + .stateOperators(0) + .customMetrics + .get(snapshotLagMetricName(1)) === -1 + ) + // Make sure only smallest K active metrics are published + val instanceMetrics = q.lastProgress + .stateOperators(0) + .customMetrics + .asScala + .view + .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) + // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT + assert( + instanceMetrics.size == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) + ) + // Two metrics published are -1, the remainder should all be set to version 2 + // as they uploaded properly. + assert( + instanceMetrics.count(_._2 == 2) == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 + ) + } + }, + StopStream + ) + } + } } - } } - testWithChangelogCheckpointingEnabled( - "SPARK-51252: Verify snapshot lag metrics are updated correctly with " + - "HDFSBackedSkipMaintenanceOnCertainPartitionsProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> - classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "5" - ) { - withTempDir { checkpointDir => - val inputData = MemoryStream[String] - val result = inputData.toDS().dropDuplicates() + Seq( + ("SPARK-51097", "RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName), + ("SPARK-51251", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName) + ).foreach { + case (ticketPrefix, providerName, providerClassName) => + testWithChangelogCheckpointingEnabled( + s"$ticketPrefix: Verify snapshot lag metrics are updated correctly for join queries with " + + s"$providerName" + ) { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", + SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" + ) { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] - testStream(result, outputMode = OutputMode.Update)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(inputData, "a"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - AddData(inputData, "b"), - ProcessAllAvailable(), - CheckNewAnswer("a", "b"), - Execute { q => - // Partitions getting skipped (id 0 and 1) do not have an uploaded version, leaving - // those instance metrics as -1. - eventually(timeout(10.seconds)) { - assert( - q.lastProgress - .stateOperators(0) - .customMetrics - .get(snapshotLagMetricName(0)) === -1 - ) - assert( - q.lastProgress - .stateOperators(0) - .customMetrics - .get(snapshotLagMetricName(1)) === -1 - ) - // Make sure only smallest K active metrics are published - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // Two metrics published are -1, the remainder should all be 1 as they - // uploaded properly. - // Because of how HDFSBackedStateStoreProvider uses strict inequality - // to determine if a snapshot is needed, the first upload is actually - // done at version 2 instead of 1. - assert( - instanceMetrics.count(_._2 == 2) == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 - ) - } - }, - StopStream - ) - } - } - } - - testWithChangelogCheckpointingEnabled( - "SPARK-51252: Verify snapshot lag metrics are updated correctly for join queries with " + - "HDFSBackedStateStoreProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[HDFSBackedStateStoreProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" - ) { - withTempDir { checkpointDir => - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] + val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") + val df2 = input2 + .toDF() + .select($"value" as "rightKey", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, expr("leftKey = rightKey")) - val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") - val df2 = input2 - .toDF() - .select($"value" as "rightKey", ($"value" * 3) as "rightValue") - val joined = df1.join(df2, expr("leftKey = rightKey")) - - testStream(joined)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(input1, 1, 5), - ProcessAllAvailable(), - AddData(input2, 1, 5, 10), - ProcessAllAvailable(), - AddData(input1, 2, 3), - ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), - Execute { q => - eventually(timeout(10.seconds)) { - // Make sure only smallest K active metrics are published. - // There are 5 * 4 = 20 metrics in total because of join, but only 10 are published. - val instanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - instanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // All state store instances should have uploaded a snapshot at version 2 - assert(instanceMetrics.forall(_._2 == 2)) - } - }, - StopStream - ) + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1, 5), + ProcessAllAvailable(), + AddData(input2, 1, 5, 10), + ProcessAllAvailable(), + AddData(input1, 2, 3), + ProcessAllAvailable(), + CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), + Execute { q => + eventually(timeout(10.seconds)) { + // Make sure only smallest K active metrics are published. + // There are 5 * 4 = 20 metrics in total because of join, but only 10 + // are published. + val instanceMetrics = q.lastProgress + .stateOperators(0) + .customMetrics + .asScala + .view + .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) + // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT + assert( + instanceMetrics.size == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) + ) + // All state store instances should have uploaded a version + assert(instanceMetrics.forall(_._2 == 2)) + } + }, + StopStream + ) + } + } } - } } - testWithChangelogCheckpointingEnabled( - "SPARK-51252: Verify snapshot lag metrics are updated correctly for join queries with " + - "HDFSBackedSkipMaintenanceOnCertainPartitionsProvider" - ) { - withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> - classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName, - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" - ) { - withTempDir { checkpointDir => - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] + Seq( + ( + "SPARK-51097", + "RocksDBSkipMaintenanceOnCertainPartitionsProvider", + classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName + ), + ( + "SPARK-51251", + "HDFSBackedSkipMaintenanceOnCertainPartitionsProvider", + classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName + ) + ).foreach { + case (ticketPrefix, providerName, providerClassName) => + testWithChangelogCheckpointingEnabled( + s"$ticketPrefix: Verify snapshot lag metrics are updated correctly for join queries with " + + s"$providerName" + ) { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", + SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", + SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" + ) { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] - val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") - val df2 = input2 - .toDF() - .select($"value" as "rightKey", ($"value" * 3) as "rightValue") - val joined = df1.join(df2, expr("leftKey = rightKey")) + val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue") + val df2 = input2 + .toDF() + .select($"value" as "rightKey", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, expr("leftKey = rightKey")) - testStream(joined)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(input1, 1, 5), - ProcessAllAvailable(), - AddData(input2, 1, 5, 10), - ProcessAllAvailable(), - AddData(input1, 2, 3), - ProcessAllAvailable(), - CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), - Execute { q => - eventually(timeout(10.seconds)) { - // Make sure only smallest K active metrics are published. - // There are 5 * 4 = 20 metrics in total because of join, but only 10 are published. - val allInstanceMetrics = q.lastProgress - .stateOperators(0) - .customMetrics - .asScala - .view - .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) - val badInstanceMetrics = allInstanceMetrics.filterKeys( - k => - k.startsWith(snapshotLagMetricName(0, "")) || - k.startsWith(snapshotLagMetricName(1, "")) - ) - // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT - assert( - allInstanceMetrics.size == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - ) - // Two ids are blocked, each with four state stores - assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4) - // The rest should have uploaded a snapshot at version 2 - assert( - allInstanceMetrics.count(_._2 == 2) == q.sparkSession.conf - .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 * 4 - ) - } - }, - StopStream - ) + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1, 5), + ProcessAllAvailable(), + AddData(input2, 1, 5, 10), + ProcessAllAvailable(), + AddData(input1, 2, 3), + ProcessAllAvailable(), + CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)), + Execute { q => + eventually(timeout(10.seconds)) { + // Make sure only smallest K active metrics are published. + // There are 5 * 4 = 20 metrics in total because of join, but only 10 + // are published. + val allInstanceMetrics = q.lastProgress + .stateOperators(0) + .customMetrics + .asScala + .view + .filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX)) + val badInstanceMetrics = allInstanceMetrics.filterKeys( + k => + k.startsWith(snapshotLagMetricName(0, "")) || + k.startsWith(snapshotLagMetricName(1, "")) + ) + // Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT + assert( + allInstanceMetrics.size == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) + ) + // Two ids are blocked, each with four state stores + assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4) + // The rest should have uploaded a version + assert( + allInstanceMetrics.count(_._2 == 2) == q.sparkSession.conf + .get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 * 4 + ) + } + }, + StopStream + ) + } + } } - } } }