Skip to content

Commit

Permalink
SPARK-51252 Switch to atomic and fix spark test prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
zecookiez authored and Zeyu Chen committed Feb 24, 2025
1 parent e219d2b commit 10cbd2e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.util
import java.util.Locale
import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.atomic.{AtomicLong, LongAdder}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -221,8 +221,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

val instanceMetrics = Map(
instanceMetricSnapshotLastUpload.withNewId(
stateStoreId.partitionId, stateStoreId.storeName
) -> lastSnapshotUploadedVersion
stateStoreId.partitionId, stateStoreId.storeName) -> lastSnapshotUploadedVersion.get()
)

StateStoreMetrics(
Expand Down Expand Up @@ -433,9 +432,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
private val loadedMapCacheHitCount: LongAdder = new LongAdder
private val loadedMapCacheMissCount: LongAdder = new LongAdder

// This is updated when the maintenance task writes the snapshot file, -1 represents no version
// has ever been uploaded.
private var lastSnapshotUploadedVersion: Long = -1L
// This is updated when the maintenance task writes the snapshot file and read by the task
// thread. -1 represents no version has ever been uploaded.
private val lastSnapshotUploadedVersion: AtomicLong = new AtomicLong(-1L)

private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
Expand Down Expand Up @@ -698,7 +697,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
logInfo(log"Written snapshot file for version ${MDC(LogKeys.FILE_VERSION, version)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, this)} at ${MDC(LogKeys.FILE_NAME, targetFile)} " +
log"for ${MDC(LogKeys.OP_TYPE, opType)}")
lastSnapshotUploadedVersion = version
lastSnapshotUploadedVersion.set(version)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF

Seq(
("SPARK-51097", "RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName),
("SPARK-51251", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName)
("SPARK-51252", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName)
).foreach {
case (ticketPrefix, providerName, providerClassName) =>
testWithChangelogCheckpointingEnabled(
Expand Down Expand Up @@ -120,7 +120,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName
),
(
"SPARK-51251",
"SPARK-51252",
"HDFSBackedSkipMaintenanceOnCertainPartitionsProvider",
classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName
)
Expand Down Expand Up @@ -194,7 +194,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF

Seq(
("SPARK-51097", "RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName),
("SPARK-51251", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName)
("SPARK-51252", "HDFSBackedStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName)
).foreach {
case (ticketPrefix, providerName, providerClassName) =>
testWithChangelogCheckpointingEnabled(
Expand Down Expand Up @@ -261,7 +261,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName
),
(
"SPARK-51251",
"SPARK-51252",
"HDFSBackedSkipMaintenanceOnCertainPartitionsProvider",
classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName
)
Expand Down

0 comments on commit 10cbd2e

Please sign in to comment.