From b0ad45136102f3088a1b70236cd9be31f37438d9 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 12 Feb 2020 20:08:00 -0600 Subject: [PATCH] Don't silently swallow unclosed tag content; propagate for error handling instead --- .../com/databricks/spark/xml/XmlInputFormat.scala | 11 ++++++----- .../databricks/spark/xml/parsers/StaxXmlParser.scala | 8 ++++++-- src/test/resources/unclosed_tag.xml | 4 ++++ .../scala/com/databricks/spark/xml/XmlSuite.scala | 12 ++++++++++++ 4 files changed, 28 insertions(+), 7 deletions(-) create mode 100644 src/test/resources/unclosed_tag.xml diff --git a/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala b/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala index a4e0cc6f..a2e36d73 100644 --- a/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala +++ b/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala @@ -171,11 +171,12 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] { if (readUntilStartElement()) { try { buffer.append(currentStartTag) - if (readUntilEndElement(currentStartTag.endsWith(">"))) { - key.set(getFilePosition()) - value.set(buffer.toString()) - return true - } + // Don't check whether the end element was found. Even if not, return everything + // that was read, which will invariably cause a parse error later + readUntilEndElement(currentStartTag.endsWith(">")) + key.set(getFilePosition()) + value.set(buffer.toString()) + return true } finally { buffer = new StringBuilder() } diff --git a/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala b/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala index 853a4f7b..e702d939 100644 --- a/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala +++ b/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala @@ -101,11 +101,15 @@ private[xml] object StaxXmlParser extends Serializable { // create a row even if no corrupt record column is present parseMode match { case FailFastMode => + val abbreviatedRecord = + if (record.length() > 1000) record.substring(0, 1000) + "..." else record throw new IllegalArgumentException( - s"Malformed line in FAILFAST mode: ${record.replaceAll("\n", "")}", cause) + s"Malformed line in FAILFAST mode: ${abbreviatedRecord.replaceAll("\n", "")}", cause) case DropMalformedMode => val reason = if (cause != null) s"Reason: ${cause.getMessage}" else "" - logger.warn(s"Dropping malformed line: ${record.replaceAll("\n", "")}. $reason") + val abbreviatedRecord = + if (record.length() > 1000) record.substring(0, 1000) + "..." else record + logger.warn(s"Dropping malformed line: ${abbreviatedRecord.replaceAll("\n", "")}. $reason") logger.debug("Malformed line cause:", cause) None case PermissiveMode => diff --git a/src/test/resources/unclosed_tag.xml b/src/test/resources/unclosed_tag.xml new file mode 100644 index 00000000..04649a3d --- /dev/null +++ b/src/test/resources/unclosed_tag.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/xml/XmlSuite.scala b/src/test/scala/com/databricks/spark/xml/XmlSuite.scala index 6bd41273..9a92faa8 100755 --- a/src/test/scala/com/databricks/spark/xml/XmlSuite.scala +++ b/src/test/scala/com/databricks/spark/xml/XmlSuite.scala @@ -81,6 +81,7 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll { private val basket = resDir + "basket.xml" private val basketInvalid = resDir + "basket_invalid.xml" private val basketXSD = resDir + "basket.xsd" + private val unclosedTag = resDir + "unclosed_tag.xml" private val booksTag = "book" private val booksRootTag = "books" @@ -302,6 +303,17 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll { assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode")) } + test("test FAILFAST with unclosed tag") { + val exceptionInParse = intercept[SparkException] { + spark.read + .option("rowTag", "book") + .option("mode", "FAILFAST") + .xml(unclosedTag) + .show() + } + assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode")) + } + test("DSL test for permissive mode for corrupt records") { val carsDf = new XmlReader() .withParseMode(PermissiveMode.name)