diff --git a/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala b/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala index a4e0cc6..a2e36d7 100644 --- a/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala +++ b/src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala @@ -171,11 +171,12 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] { if (readUntilStartElement()) { try { buffer.append(currentStartTag) - if (readUntilEndElement(currentStartTag.endsWith(">"))) { - key.set(getFilePosition()) - value.set(buffer.toString()) - return true - } + // Don't check whether the end element was found. Even if not, return everything + // that was read, which will invariably cause a parse error later + readUntilEndElement(currentStartTag.endsWith(">")) + key.set(getFilePosition()) + value.set(buffer.toString()) + return true } finally { buffer = new StringBuilder() } diff --git a/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala b/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala index 853a4f7..e702d93 100644 --- a/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala +++ b/src/main/scala/com/databricks/spark/xml/parsers/StaxXmlParser.scala @@ -101,11 +101,15 @@ private[xml] object StaxXmlParser extends Serializable { // create a row even if no corrupt record column is present parseMode match { case FailFastMode => + val abbreviatedRecord = + if (record.length() > 1000) record.substring(0, 1000) + "..." else record throw new IllegalArgumentException( - s"Malformed line in FAILFAST mode: ${record.replaceAll("\n", "")}", cause) + s"Malformed line in FAILFAST mode: ${abbreviatedRecord.replaceAll("\n", "")}", cause) case DropMalformedMode => val reason = if (cause != null) s"Reason: ${cause.getMessage}" else "" - logger.warn(s"Dropping malformed line: ${record.replaceAll("\n", "")}. $reason") + val abbreviatedRecord = + if (record.length() > 1000) record.substring(0, 1000) + "..." else record + logger.warn(s"Dropping malformed line: ${abbreviatedRecord.replaceAll("\n", "")}. $reason") logger.debug("Malformed line cause:", cause) None case PermissiveMode => diff --git a/src/test/resources/unclosed_tag.xml b/src/test/resources/unclosed_tag.xml new file mode 100644 index 0000000..04649a3 --- /dev/null +++ b/src/test/resources/unclosed_tag.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/xml/XmlSuite.scala b/src/test/scala/com/databricks/spark/xml/XmlSuite.scala index 070640d..4792a14 100755 --- a/src/test/scala/com/databricks/spark/xml/XmlSuite.scala +++ b/src/test/scala/com/databricks/spark/xml/XmlSuite.scala @@ -81,6 +81,7 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll { private val basket = resDir + "basket.xml" private val basketInvalid = resDir + "basket_invalid.xml" private val basketXSD = resDir + "basket.xsd" + private val unclosedTag = resDir + "unclosed_tag.xml" private val booksTag = "book" private val booksRootTag = "books" @@ -302,6 +303,17 @@ final class XmlSuite extends FunSuite with BeforeAndAfterAll { assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode")) } + test("test FAILFAST with unclosed tag") { + val exceptionInParse = intercept[SparkException] { + spark.read + .option("rowTag", "book") + .option("mode", "FAILFAST") + .xml(unclosedTag) + .show() + } + assert(exceptionInParse.getMessage.contains("Malformed line in FAILFAST mode")) + } + test("DSL test for permissive mode for corrupt records") { val carsDf = new XmlReader() .withParseMode(PermissiveMode.name)