Skip to content

Commit

Permalink
Don't silently swallow unclosed tag content; propagate for error hand…
Browse files Browse the repository at this point in the history
…ling instead (#437)

See databricks/spark-xml#436 (comment) for context. This stops silently swallowing unclosed tags at the end of input and instead propagates the remaining content for normal error handling.

Closes #436
  • Loading branch information
beluisterql authored Feb 14, 2020
1 parent a5c1a6e commit 8abb78f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 7 deletions.
11 changes: 6 additions & 5 deletions src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,12 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
if (readUntilStartElement()) {
try {
buffer.append(currentStartTag)
if (readUntilEndElement(currentStartTag.endsWith(">"))) {
key.set(getFilePosition())
value.set(buffer.toString())
return true
}
// Don't check whether the end element was found. Even if not, return everything
// that was read, which will invariably cause a parse error later
readUntilEndElement(currentStartTag.endsWith(">"))
key.set(getFilePosition())
value.set(buffer.toString())
return true
} finally {
buffer = new StringBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ private[xml] object StaxXmlParser extends Serializable {
// create a row even if no corrupt record column is present
parseMode match {
case FailFastMode =>
val abbreviatedRecord =
if (record.length() > 1000) record.substring(0, 1000) + "..." else record
throw new IllegalArgumentException(
s"Malformed line in FAILFAST mode: ${record.replaceAll("\n", "")}", cause)
s"Malformed line in FAILFAST mode: ${abbreviatedRecord.replaceAll("\n", "")}", cause)
case DropMalformedMode =>
val reason = if (cause != null) s"Reason: ${cause.getMessage}" else ""
logger.warn(s"Dropping malformed line: ${record.replaceAll("\n", "")}. $reason")
val abbreviatedRecord =
if (record.length() > 1000) record.substring(0, 1000) + "..." else record
logger.warn(s"Dropping malformed line: ${abbreviatedRecord.replaceAll("\n", "")}. $reason")
logger.debug("Malformed line cause:", cause)
None
case PermissiveMode =>
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/unclosed_tag.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<catalog>
<book id="Malformed attribute with " caracter ">
</catalog>
12 changes: 12 additions & 0 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll {
private val basket = resDir + "basket.xml"
private val basketInvalid = resDir + "basket_invalid.xml"
private val basketXSD = resDir + "basket.xsd"
private val unclosedTag = resDir + "unclosed_tag.xml"

private val booksTag = "book"
private val booksRootTag = "books"
Expand Down Expand Up @@ -302,6 +303,17 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll {
assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode"))
}

test("test FAILFAST with unclosed tag") {
val exceptionInParse = intercept[SparkException] {
spark.read
.option("rowTag", "book")
.option("mode", "FAILFAST")
.xml(unclosedTag)
.show()
}
assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode"))
}

test("DSL test for permissive mode for corrupt records") {
val carsDf = new XmlReader()
.withParseMode(PermissiveMode.name)
Expand Down

0 comments on commit 8abb78f

Please sign in to comment.