Skip to content

Commit

Permalink
Fix parsing of partial result when corrupted record field is present (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen authored Feb 11, 2021
1 parent b96dfad commit f8d200c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,13 @@ private[xml] object StaxXmlParser extends Serializable {
case PermissiveMode =>
logger.debug("Malformed line cause:", cause)
// The logic below is borrowed from Apache Spark's FailureSafeParser.
val corruptFieldIndex = Try(schema.fieldIndex(options.columnNameOfCorruptRecord)).toOption
val actualSchema = StructType(schema.filterNot(_.name == options.columnNameOfCorruptRecord))
val resultRow = new Array[Any](schema.length)
var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = partialResult.map(_.get(i)).orNull
i += 1
schema.filterNot(_.name == options.columnNameOfCorruptRecord).foreach { from =>
val sourceIndex = schema.fieldIndex(from.name)
resultRow(sourceIndex) = partialResult.map(_.get(sourceIndex)).orNull
}
corruptFieldIndex.foreach(index => resultRow(index) = record)
val corruptFieldIndex = Try(schema.fieldIndex(options.columnNameOfCorruptRecord)).toOption
corruptFieldIndex.foreach(resultRow(_) = record)
Some(Row.fromSeq(resultRow))
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/resources/manual_schema_corrupt_record.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<row id='0' xml:space='preserve'>
<c2>1234</c2>
<c3>Mark</c3>
<c4>Mark</c4>
<c5>Mark</c5>
<c6>DOLLAR</c6>
<c7>RT</c7>
<c8>USD</c8>
<c9>1</c9>
<c11>3000</c11>
<c20 m='8'></c20>
<c20 m='9'></c20>
<c46>20210207</c46>
<c76>NO</c76>
<c78>20210207</c78>
<c85>14503</c85>
<c93>USD</c93>
<c95>USD</c95>
<c99>LEGACY</c99>
<c99 m='2'>IBAN</c99>
<c100>sm342</c100>
<c100 m='2'></c100>
<c108>NO</c108>
<c192>M</c192>
<c193>46_STREET1</c193>
<c194>0811241751</c194>
<c195>46_STREET1</c195>
<c196>SA0010001</c196>
<c197>1</c197>
</row>
52 changes: 52 additions & 0 deletions src/test/scala/com/databricks/spark/xml/XmlSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
private val whitespaceError = resDir + "whitespace_error.xml"
private val mapAttribute = resDir + "map-attribute.xml"
private val structWithOptChild = resDir + "struct_with_optional_child.xml"
private val manualSchemaCorruptRecord = resDir + "manual_schema_corrupt_record.xml"

private val booksTag = "book"
private val booksRootTag = "books"
Expand Down Expand Up @@ -1316,6 +1317,57 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
assert(df.selectExpr("SIZE(Bar)").collect().head.getInt(0) === 2)
}

test("Manual schema with corrupt record field works on permissive mode failure") {
// See issue #517
val schema = StructType(List(
StructField("_id", StringType),
StructField("_space", StringType),
StructField("c2", DoubleType),
StructField("c3", StringType),
StructField("c4", StringType),
StructField("c5", StringType),
StructField("c6", StringType),
StructField("c7", StringType),
StructField("c8", StringType),
StructField("c9", DoubleType),
StructField("c11", DoubleType),
StructField("c20", ArrayType(StructType(List(
StructField("_VALUE", StringType),
StructField("_m", IntegerType)))
)),
StructField("c46", StringType),
StructField("c76", StringType),
StructField("c78", StringType),
StructField("c85", DoubleType),
StructField("c93", StringType),
StructField("c95", StringType),
StructField("c99", ArrayType(StructType(List(
StructField("_VALUE", StringType),
StructField("_m", IntegerType)))
)),
StructField("c100", ArrayType(StructType(List(
StructField("_VALUE", StringType),
StructField("_m", IntegerType)))
)),
StructField("c108", StringType),
StructField("c192", DoubleType),
StructField("c193", StringType),
StructField("c194", StringType),
StructField("c195", StringType),
StructField("c196", StringType),
StructField("c197", DoubleType),
StructField("_corrupt_record", StringType)))

val df = spark.read
.option("inferSchema", false)
.option("rowTag", "row")
.schema(schema)
.xml(manualSchemaCorruptRecord)

// Assert it works at all
assert(df.collect().head.getAs[String]("_corrupt_record") !== null)
}

private def getLines(path: Path): Seq[String] = {
val source = Source.fromFile(path.toFile)
try {
Expand Down

0 comments on commit f8d200c

Please sign in to comment.