Skip to content

Commit

Permalink
improve test, fix corrupt record
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed May 21, 2015
1 parent 6836a80 commit 0dbf559
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ private[sql] object JsonRDD extends Logging {

parsed
} catch {
case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil
case e: JsonProcessingException =>
Map(columnNameOfCorruptRecords -> UTF8String(record)) :: Nil
}
}
})
Expand Down
29 changes: 22 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1074,15 +1074,30 @@ class JsonSuite extends QueryTest {
}

test("SPARK-7565 MapType in JsonRDD") {
val useStreaming = getConf(SQLConf.USE_JACKSON_STREAMING_API, "true")
val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")

val schemaWithSimpleMap = StructType(
StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
val df = read.schema(schemaWithSimpleMap).json(mapType1)
val temp = Utils.createTempDir()
df.write.mode("overwrite").parquet(temp.getPath)

setConf("spark.sql.json.useJacksonStreamingAPI", "false")
val df2 = read.schema(schemaWithSimpleMap).json(mapType1)
df2.write.mode("overwrite").parquet(temp.getPath)
try{
for (useStreaming <- List("true", "false")) {
setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
val temp = Utils.createTempDir().getPath

val df = read.schema(schemaWithSimpleMap).json(mapType1)
df.write.mode("overwrite").parquet(temp)
// order of MapType is not defined
assert(read.parquet(temp).count() == 5)

val df2 = read.json(corruptRecords)
df2.write.mode("overwrite").parquet(temp)
checkAnswer(read.parquet(temp), df2.collect())
}
} finally {
setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
}
}

}

0 comments on commit 0dbf559

Please sign in to comment.