Skip to content

Commit

Permalink
[SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, impacting …
Browse files Browse the repository at this point in the history
…off heap StorageLevels and Task/Executor ResourceRequests

### What changes were proposed in this pull request?

This PR fixes three longstanding bugs in Spark's `JsonProtocol`:

- `TaskResourceRequest` loses precision for `amount` < 0.5. The `amount` is a floating point number which is either between 0 and 0.5 or is a positive integer, but the JSON read path assumes it is an integer.
- `ExecutorResourceRequest` integer overflows for values larger than Int.MaxValue because the write path writes longs but the read path assumes integers.
- Off heap StorageLevels are not handled properly: the `useOffHeap` field isn't included in the JSON, so this StorageLevel cannot be round-tripped through JSON. This could cause the History Server to display inaccurate "off heap memory used" stats on the executors page.

I discovered these bugs while working on #36885.

### Why are the changes needed?

JsonProtocol should be able to roundtrip events through JSON without loss of information.

### Does this PR introduce _any_ user-facing change?

Yes: it fixes bugs that impact information shown in the History Server Web UI. The new StorageLevel JSON field will be visible to tools which process raw event log JSON.

### How was this patch tested?

Updated existing unit tests to cover the changed logic.

Closes #37027 from JoshRosen/jsonprotocol-bugfixes.

Authored-by: Josh Rosen <joshrosen@databricks.com>
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
  • Loading branch information
JoshRosen committed Jun 30, 2022
1 parent 91f95a7 commit a39fc87
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ private[spark] object JsonProtocol {
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
("Use Off Heap" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
Expand Down Expand Up @@ -750,15 +751,15 @@ private[spark] object JsonProtocol {

def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
val amount = (json \ "Amount").extract[Long]
val discoveryScript = (json \ "Discovery Script").extract[String]
val vendor = (json \ "Vendor").extract[String]
new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
}

def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
val amount = (json \ "Amount").extract[Double]
new TaskResourceRequest(rName, amount)
}

Expand Down Expand Up @@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
// The "Use Off Heap" field was added in Spark 3.4.0
val useOffHeap = jsonOption(json \ "Use Off Heap") match {
case Some(value) => value.extract[Boolean]
case None => false
}
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
StorageLevel(useDisk, useMemory, deserialized, replication)
StorageLevel(
useDisk = useDisk,
useMemory = useMemory,
useOffHeap = useOffHeap,
deserialized = deserialized,
replication = replication)
}

def blockStatusFromJson(json: JValue): BlockStatus = {
Expand Down
53 changes: 50 additions & 3 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite {
321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L,
30364L, 15182L, 10L, 90L, 2L, 20L, 80001L)))
val rprofBuilder = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
val taskReq = new TaskResourceRequests()
.cpus(1)
.resource("gpu", 1)
.resource("fgpa", 0.5)
val execReq: ExecutorResourceRequests = new ExecutorResourceRequests()
.cores(2)
.resource("gpu", 2, "myscript")
.resource("myCustomResource", amount = Int.MaxValue + 1L, discoveryScript = "myscript2")
rprofBuilder.require(taskReq).require(execReq)
val resourceProfile = rprofBuilder.build
resourceProfile.setResourceProfileId(21)
Expand Down Expand Up @@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.MEMORY_AND_DISK_2)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
testStorageLevel(StorageLevel.OFF_HEAP)

// JobResult
val exception = new Exception("Out of Memory! Please restock film.")
Expand Down Expand Up @@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite {
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
}

test("StorageLevel backward compatibility") {
// "Use Off Heap" was added in Spark 3.4.0
val level = StorageLevel(
useDisk = false,
useMemory = true,
useOffHeap = true,
deserialized = false,
replication = 1
)
val newJson = JsonProtocol.storageLevelToJson(level)
val oldJson = newJson.removeField { case (field, _) => field == "Use Off Heap" }
val newLevel = JsonProtocol.storageLevelFromJson(oldJson)
assert(newLevel.useOffHeap === false)
}

test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
Expand Down Expand Up @@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": false,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Discovery Script":"",
| "Vendor":""
| },
| "myCustomResource":{
| "Resource Name":"myCustomResource",
| "Amount": 2147483648,
| "Discovery Script": "myscript2",
| "Vendor" : ""
| },
| "gpu":{
| "Resource Name":"gpu",
| "Amount":2,
Expand All @@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "gpu":{
| "Resource Name":"gpu",
| "Amount":1.0
| },
| "fgpa":{
| "Resource Name":"fgpa",
| "Amount":0.5
| }
| }
|}
Expand Down

0 comments on commit a39fc87

Please sign in to comment.