Skip to content

Commit

Permalink
Tests for applySchema.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jul 23, 2014
1 parent aa92e84 commit 624765c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@DeveloperApi
def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD = {
// TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRDD))
new SchemaRDD(this, logicalPlan)
}
Expand Down
32 changes: 32 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,36 @@ class SQLQuerySuite extends QueryTest {
)
clear()
}

test("apply schema") {
val schema = StructType(
StructField("f1", IntegerType, false) ::
StructField("f2", StringType, false) ::
StructField("f3", BooleanType, false) ::
StructField("f4", IntegerType, true) :: Nil)

val rowRDD = unparsedStrings.map { r =>
val values = r.split(",").map(_.trim)
val v4 = try values(3).toInt catch {
case _: NumberFormatException => null
}
Row(values(0).toInt, values(1), values(2).toBoolean, v4)
}

val schemaRDD = applySchema(rowRDD, schema)
schemaRDD.registerAsTable("applySchema")
checkAnswer(
sql("SELECT * FROM applySchema"),
(1, "A1", true, null) ::
(2, "B2", false, null) ::
(3, "C3", true, null) ::
(4, "D4", true, 2147483644) :: Nil)

checkAnswer(
sql("SELECT f1, f4 FROM applySchema"),
(1, null) ::
(2, null) ::
(3, null) ::
(4, 2147483644) :: Nil)
}
}
7 changes: 7 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,11 @@ object TestData {

case class TableName(tableName: String)
TestSQLContext.sparkContext.parallelize(TableName("test") :: Nil).registerAsTable("tableName")

val unparsedStrings =
TestSQLContext.sparkContext.parallelize(
"1, A1, true, null" ::
"2, B2, false, null" ::
"3, C3, true, null" ::
"4, D4, true, 2147483644" :: Nil)
}

0 comments on commit 624765c

Please sign in to comment.