Skip to content

Commit

Permalink
[SPARK-48148][FOLLOWUP] Fix JSON parser feature flag
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

#46408 attempts to set the feature flag `INCLUDE_SOURCE_IN_LOCATION` in the JSON parser and reverts the flag to the original value. The reverting code is incorrect and accidentally sets the `AUTO_CLOSE_SOURCE` feature to false. The reason is that `overrideStdFeatures(value, mask)` sets the feature flags selected by `mask` to `value`. `originalMask` is a value of 0/1. When it is 1, it selects `AUTO_CLOSE_SOURCE`, whose ordinal is 0 ([reference](https://github.com/FasterXML/jackson-core/blob/172369cc390ace0f68a5032701634bdc984c2af8/src/main/java/com/fasterxml/jackson/core/JsonParser.java#L112)). The old code doesn't revert `INCLUDE_SOURCE_IN_LOCATION` to the original value either. As a result, when the JSON parser is closed, the underlying input stream is not closed, which can lead to memory leak.

### Why are the changes needed?

Perform the originally intended feature, and avoid memory leak.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test. It would fail without the change in the PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49018 from chenhao-db/fix_json_parser_flag.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
chenhao-db authored and HyukjinKwon committed Dec 1, 2024
1 parent b45e3c0 commit 7d46fdb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,8 @@ class JacksonParser(

case _: StringType => (parser: JsonParser) => {
// This must be enabled if we will retrieve the bytes directly from the raw content:
val includeSourceInLocation = JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION
val originalMask = if (includeSourceInLocation.enabledIn(parser.getFeatureMask)) {
1
} else {
0
}
parser.overrideStdFeatures(includeSourceInLocation.getMask, includeSourceInLocation.getMask)
val oldFeature = parser.getFeatureMask
parser.setFeatureMask(oldFeature | JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION.getMask)
val result = parseJsonToken[UTF8String](parser, dataType) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)
Expand Down Expand Up @@ -344,7 +339,7 @@ class JacksonParser(
}
}
// Reset back to the original configuration:
parser.overrideStdFeatures(includeSourceInLocation.getMask, originalMask)
parser.setFeatureMask(oldFeature)
result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

class JacksonParserSuite extends SparkFunSuite {
test("feature mask should remain unchanged") {
val options = new JSONOptions(Map.empty[String, String], "GMT", "")
val parser = new JacksonParser(StructType.fromDDL("a string"), options, false, Nil)
val input = """{"a": {"b": 1}}""".getBytes
// The creating function is usually called inside `parser.parse`, but we need the JSON parser
// here for testing purpose.
val jsonParser = options.buildJsonFactory().createParser(input)
val oldFeature = jsonParser.getFeatureMask
val result = parser.parse[Array[Byte]](input, (_, _) => jsonParser, UTF8String.fromBytes)
assert(result === Seq(InternalRow(UTF8String.fromString("""{"b": 1}"""))))
assert(jsonParser.getFeatureMask == oldFeature)
}

test("skipping rows using pushdown filters") {
def check(
input: String = """{"i":1, "s": "a"}""",
Expand Down

0 comments on commit 7d46fdb

Please sign in to comment.