Skip to content

Commit

Permalink
Fixed SPARK-1678
Browse files Browse the repository at this point in the history
CompressibleColumnAccessor.hasNext and RunLengthEncoding.decoder.hasNext were not correctly implemented.
  • Loading branch information
liancheng committed Apr 30, 2014
1 parent ff5be9a commit d537a36
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc
decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
}

abstract override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
abstract override def hasNext = super.hasNext || decoder.hasNext

override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
currentValue
}

override def hasNext = buffer.hasRemaining
override def hasNext = valueCount < run || buffer.hasRemaining
}
}

Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,15 @@ object TestData {
ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
arrayData.registerAsTable("arrayData")

case class StringData(s: String)
val repeatedData =
TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
repeatedData.registerAsTable("repeatedData")

val nullableRepeatedData =
TestSQLContext.sparkContext.parallelize(
List.fill(2)(StringData(null)) ++
List.fill(2)(StringData("test")))
nullableRepeatedData.registerAsTable("nullableRepeatedData")
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,28 @@ class InMemoryColumnarQuerySuite extends QueryTest {
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
}

test("SPARK-1678 regression: compression must not lose repeated values") {
checkAnswer(
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq)

TestSQLContext.cacheTable("repeatedData")

checkAnswer(
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq)
}

test("with null values") {
checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq)

TestSQLContext.cacheTable("nullableRepeatedData")

checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq)
}
}

0 comments on commit d537a36

Please sign in to comment.