Skip to content

Commit

Permalink
Make the maximum string length limit configurable in case of JsonProt…
Browse files Browse the repository at this point in the history
…ocol
  • Loading branch information
roczei committed Nov 18, 2024
1 parent 9858ab6 commit ed2706f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ import org.apache.spark.util.SparkErrorUtils.tryWithResource

private[spark] trait JsonUtils {

protected val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
protected val mapper: ObjectMapper = configureMapper(new ObjectMapper())

protected def configureMapper(m: ObjectMapper): ObjectMapper = {
m.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}

def toJsonString(block: JsonGenerator => Unit): String = {
tryWithResource(new ByteArrayOutputStream()) { baos =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2809,4 +2809,13 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

val SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH =
ConfigBuilder("spark.jsonProtocol.maxStringLen")
.doc("")
.version("4.0.0")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(Int.MaxValue)

}
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import java.util.{Properties, UUID}
import scala.collection.Map
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, StreamReadConstraints}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
Expand Down Expand Up @@ -63,12 +64,27 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) {
* - Any new JSON fields should be optional; use `jsonOption` when reading these fields
* in `*FromJson` methods.
*/
private[spark] object JsonProtocol extends JsonUtils {
private[spark] object JsonProtocol extends JsonUtils with Logging {
// TODO: Remove this file and put JSON serialization into each individual class.

private[util]
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))

protected override val mapper: ObjectMapper = {
val conf = new SparkConf
val maxJsonStringLength: Int = conf.get(SPARK_JSON_PROTOCOL_MAX_STRING_LENGTH)
val streamReadConstraints: StreamReadConstraints = StreamReadConstraints
.builder()
.maxStringLength(maxJsonStringLength)
.build()
val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints)
logInfo(s"maxJsonStringLength: ${maxJsonStringLength}")
configureMapper(new ObjectMapper(jsonFactory))
}

logInfo(s"mapper.getMaxStringLength: ${mapper.getFactory.streamReadConstraints()
.getMaxStringLength}")

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
Expand Down

0 comments on commit ed2706f

Please sign in to comment.