Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't silently swallow unclosed tag content; propagate for error handling instead #437

Merged
merged 1 commit into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,12 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
if (readUntilStartElement()) {
try {
buffer.append(currentStartTag)
if (readUntilEndElement(currentStartTag.endsWith(">"))) {
key.set(getFilePosition())
value.set(buffer.toString())
return true
}
// Don't check whether the end element was found. Even if not, return everything
// that was read, which will invariably cause a parse error later
readUntilEndElement(currentStartTag.endsWith(">"))
key.set(getFilePosition())
value.set(buffer.toString())
return true
} finally {
buffer = new StringBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ private[xml] object StaxXmlParser extends Serializable {
// create a row even if no corrupt record column is present
parseMode match {
case FailFastMode =>
val abbreviatedRecord =
if (record.length() > 1000) record.substring(0, 1000) + "..." else record
throw new IllegalArgumentException(
s"Malformed line in FAILFAST mode: ${record.replaceAll("\n", "")}", cause)
s"Malformed line in FAILFAST mode: ${abbreviatedRecord.replaceAll("\n", "")}", cause)
case DropMalformedMode =>
val reason = if (cause != null) s"Reason: ${cause.getMessage}" else ""
logger.warn(s"Dropping malformed line: ${record.replaceAll("\n", "")}. $reason")
val abbreviatedRecord =
if (record.length() > 1000) record.substring(0, 1000) + "..." else record
logger.warn(s"Dropping malformed line: ${abbreviatedRecord.replaceAll("\n", "")}. $reason")
logger.debug("Malformed line cause:", cause)
None
case PermissiveMode =>
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/unclosed_tag.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<catalog>
<book id="Malformed attribute with " caracter ">
</catalog>
12 changes: 12 additions & 0 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll {
private val basket = resDir + "basket.xml"
private val basketInvalid = resDir + "basket_invalid.xml"
private val basketXSD = resDir + "basket.xsd"
private val unclosedTag = resDir + "unclosed_tag.xml"

private val booksTag = "book"
private val booksRootTag = "books"
Expand Down Expand Up @@ -302,6 +303,17 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll {
assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode"))
}

test("test FAILFAST with unclosed tag") {
val exceptionInParse = intercept[SparkException] {
spark.read
.option("rowTag", "book")
.option("mode", "FAILFAST")
.xml(unclosedTag)
.show()
}
assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode"))
}

test("DSL test for permissive mode for corrupt records") {
val carsDf = new XmlReader()
.withParseMode(PermissiveMode.name)
Expand Down