Skip to content

Commit

Permalink
Take into account StreamDecoder.hasLeftoverChar in trying to exactly …
Browse files Browse the repository at this point in the history
…always correctly determine how much has been read (#468)
  • Loading branch information
srowen authored Aug 25, 2020
1 parent 68b92b3 commit f28f1d2
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
private var reader: Reader = _
private var filePosition: Seekable = _
private var countingIn: CountingInputStream = _
private var readerLeftoverCharFn: () => Boolean = _
private var readerByteBuffer: ByteBuffer = _
private var decompressor: Decompressor = _
private var buffer = new StringBuilder()
Expand Down Expand Up @@ -127,6 +128,9 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
val sdField = reader.getClass.getDeclaredField("sd")
sdField.setAccessible(true)
val sd = sdField.get(reader)
val readerLeftoverCharField = sd.getClass.getDeclaredField("haveLeftoverChar")
readerLeftoverCharField.setAccessible(true)
readerLeftoverCharFn = () => { readerLeftoverCharField.get(sd).asInstanceOf[Boolean] }
val bbField = sd.getClass.getDeclaredField("bb")
bbField.setAccessible(true)
readerByteBuffer = bbField.get(sd).asInstanceOf[ByteBuffer]
Expand All @@ -149,7 +153,9 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
if (filePosition != null) {
filePosition.getPos
} else {
start + countingIn.getByteCount - readerByteBuffer.remaining()
start + countingIn.getByteCount -
readerByteBuffer.remaining() -
(if (readerLeftoverCharFn()) 1 else 0)
}
}

Expand Down

0 comments on commit f28f1d2

Please sign in to comment.