Skip to content

Commit

Permalink
[SPARK-17764][SQL] Add to_json supporting to convert nested struct …
Browse files Browse the repository at this point in the history
…column to JSON string

## What changes were proposed in this pull request?

This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python.

It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function.

The usage is as below:

``` scala
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
df.select(to_json($"a").as("json")).show()
```

``` bash
+--------+
|    json|
+--------+
|{"_1":1}|
+--------+
```
## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15354 from HyukjinKwon/SPARK-17764.
  • Loading branch information
HyukjinKwon authored and marmbrus committed Nov 1, 2016
1 parent cfac17e commit 01dd008
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 16 deletions.
23 changes: 23 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,29 @@ def from_json(col, schema, options={}):
return Column(jc)


@ignore_unicode_prefix
@since(2.1)
def to_json(col, options={}):
"""
Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
in the case of an unsupported type.
:param col: name of column containing the struct
:param options: options to control converting. accepts the same options as the json datasource
>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.to_json(_to_java_column(col), options)
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
"""
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
<[http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
record) and returns the result as a :class`DataFrame`.
If the ``schema`` parameter is not specified, this function goes
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=None):
"""
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
<[http://jsonlines.org/>`_) and returns a :class`DataFrame`.
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.{ByteArrayOutputStream, StringWriter}
import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}

import scala.util.parsing.combinator.RegexParsers

import com.fasterxml.jackson.core._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
}

/**
* Converts a [[StructType]] to a json output string.
*/
case class StructToJson(options: Map[String, String], child: Expression)
extends Expression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true

@transient
lazy val writer = new CharArrayWriter()

@transient
lazy val gen =
new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)

override def dataType: DataType = StringType
override def children: Seq[Expression] = child :: Nil

override def checkInputDataTypes(): TypeCheckResult = {
if (StructType.acceptsType(child.dataType)) {
try {
JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
} else {
TypeCheckResult.TypeCheckFailure(
s"$prettyName requires that the expression is a struct expression.")
}
}

override def eval(input: InternalRow): Any = {
gen.write(child.eval(input).asInstanceOf[InternalRow])
gen.flush()
val json = writer.toString
writer.reset()
UTF8String.fromString(json)
}

override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import java.io.Writer

import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.{JsonParser, JsonToken}

import org.apache.spark.sql.types._

object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
Expand All @@ -29,4 +31,28 @@ object JacksonUtils {
case x => x != stopOn
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
def verifyType(name: String, dataType: DataType): Unit = dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>

case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))

case at: ArrayType => verifyType(name, at.elementType)

case mt: MapType => verifyType(name, mt.keyType)

case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)

case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.simpleString} to JSON.")
}

schema.foreach(field => verifyType(field.name, field.dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
null
)
}

test("to_json") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
StructToJson(Map.empty, struct),
"""{"a":1}"""
)
}
}
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.json.JacksonGenerator
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -45,7 +46,6 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
Expand Down
44 changes: 43 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2883,10 +2883,10 @@ object functions {
* (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.1.0
Expand Down Expand Up @@ -2936,6 +2936,48 @@ object functions {
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)


/**
* (Scala-specific) Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column, options: Map[String, String]): Column = withExpr {
StructToJson(options, e.expr)
}

/**
* (Java-specific) Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column, options: java.util.Map[String, String]): Column =
to_json(e, options.asScala.toMap)

/**
* Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column): Column =
to_json(e, Map.empty[String, String])

/**
* Returns length of array or map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType}

class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand All @@ -31,7 +31,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row("alice", "5"))
}


val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
Expand Down Expand Up @@ -97,7 +96,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}

test("json_parser") {
test("from_json") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

Expand All @@ -106,7 +105,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(1)) :: Nil)
}

test("json_parser missing columns") {
test("from_json missing columns") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("b", IntegerType)

Expand All @@ -115,12 +114,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(null)) :: Nil)
}

test("json_parser invalid json") {
test("from_json invalid json") {
val df = Seq("""{"a" 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

checkAnswer(
df.select(from_json($"value", schema)),
Row(null) :: Nil)
}

test("to_json") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")

checkAnswer(
df.select(to_json($"a")),
Row("""{"_1":1}""") :: Nil)
}

test("to_json unsupported type") {
val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a")
.select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c"))
val e = intercept[AnalysisException]{
// Unsupported type throws an exception
df.select(to_json($"c")).collect()
}
assert(e.getMessage.contains(
"Unable to convert column a of type calendarinterval to JSON."))
}
}

0 comments on commit 01dd008

Please sign in to comment.