Skip to content
Snippets Groups Projects
Commit 030acdd1 authored by Takeshi Yamamuro's avatar Takeshi Yamamuro Committed by Xiao Li
Browse files

[SPARK-19637][SQL] Add to_json in FunctionRegistry

## What changes were proposed in this pull request?
This pr added entries  in `FunctionRegistry` and supported `to_json` in SQL.

## How was this patch tested?
Added tests in `JsonFunctionsSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #16981 from maropu/SPARK-19637.
parent 932196d9
No related branches found
No related tags found
No related merge requests found
......@@ -421,6 +421,9 @@ object FunctionRegistry {
expression[BitwiseOr]("|"),
expression[BitwiseXor]("^"),
// json
expression[StructToJson]("to_json"),
// Cast aliases (SPARK-16730)
castAlias("boolean", BooleanType),
castAlias("tinyint", ByteType),
......
......@@ -23,11 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
......@@ -330,7 +331,7 @@ case class GetJsonObject(json: Expression, path: Expression)
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Return a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.",
usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.",
extended = """
Examples:
> SELECT _FUNC_('{"a":1, "b":2}', 'a', 'b');
......@@ -564,6 +565,17 @@ case class JsonToStruct(
/**
* Converts a [[StructType]] to a json output string.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value",
extended = """
Examples:
> SELECT _FUNC_(named_struct('a', 1, 'b', 2));
{"a":1,"b":2}
> SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
{"time":"26/08/2015"}
""")
// scalastyle:on line.size.limit
case class StructToJson(
options: Map[String, String],
child: Expression,
......@@ -573,6 +585,14 @@ case class StructToJson(
def this(options: Map[String, String], child: Expression) = this(options, child, None)
// Used in `FunctionRegistry`
def this(child: Expression) = this(Map.empty, child, None)
def this(child: Expression, options: Expression) =
this(
options = StructToJson.convertToMapData(options),
child = child,
timeZoneId = None)
@transient
lazy val writer = new CharArrayWriter()
......@@ -613,3 +633,20 @@ case class StructToJson(
override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
}
object StructToJson {
def convertToMapData(exp: Expression): Map[String, String] = exp match {
case m: CreateMap
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
key.toString -> value.toString
}
case m: CreateMap =>
throw new AnalysisException(
s"A type of keys and values in map() must be string, but got ${m.dataType}")
case _ =>
throw new AnalysisException("Must use a map() function for options")
}
}
-- to_json
describe function to_json;
describe function extended to_json;
select to_json(named_struct('a', 1, 'b', 2));
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
-- Check if errors handled
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'));
select to_json();
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 6
-- !query 0
describe function to_json
-- !query 0 schema
struct<function_desc:string>
-- !query 0 output
Class: org.apache.spark.sql.catalyst.expressions.StructToJson
Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
-- !query 1
describe function extended to_json
-- !query 1 schema
struct<function_desc:string>
-- !query 1 output
Class: org.apache.spark.sql.catalyst.expressions.StructToJson
Extended Usage:
Examples:
> SELECT to_json(named_struct('a', 1, 'b', 2));
{"a":1,"b":2}
> SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
{"time":"26/08/2015"}
Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
-- !query 2
select to_json(named_struct('a', 1, 'b', 2))
-- !query 2 schema
struct<structtojson(named_struct(a, 1, b, 2)):string>
-- !query 2 output
{"a":1,"b":2}
-- !query 3
select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))
-- !query 3 schema
struct<structtojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string>
-- !query 3 output
{"time":"26/08/2015"}
-- !query 4
select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
Must use a map() function for options;; line 1 pos 7
-- !query 5
select to_json()
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function to_json; line 1 pos 7
......@@ -197,4 +197,27 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
.select(to_json($"struct").as("json"))
checkAnswer(dfTwo, readBackTwo)
}
test("SPARK-19637 Support to_json in SQL") {
val df1 = Seq(Tuple1(Tuple1(1))).toDF("a")
checkAnswer(
df1.selectExpr("to_json(a)"),
Row("""{"_1":1}""") :: Nil)
val df2 = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
checkAnswer(
df2.selectExpr("to_json(a, map('timestampFormat', 'dd/MM/yyyy HH:mm'))"),
Row("""{"_1":"26/08/2015 18:00"}""") :: Nil)
val errMsg1 = intercept[AnalysisException] {
df2.selectExpr("to_json(a, named_struct('a', 1))")
}
assert(errMsg1.getMessage.startsWith("Must use a map() function for options"))
val errMsg2 = intercept[AnalysisException] {
df2.selectExpr("to_json(a, map('a', 1))")
}
assert(errMsg2.getMessage.startsWith(
"A type of keys and values in map() must be string, but got"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment