Skip to content
Snippets Groups Projects
Commit 0494c80e authored by Yin Huai's avatar Yin Huai Committed by Cheng Lian
Browse files

[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0.

https://issues.apache.org/jira/browse/SPARK-10681

Author: Yin Huai <yhuai@databricks.com>

Closes #8806 from yhuai/SPARK-10495.
parent 72869883
No related branches found
No related tags found
No related merge requests found
......@@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
valWriter(field.dataType, v)
}
gen.writeEndObject()
// For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
case (ArrayType(ty, _), v: ArrayData) =>
gen.writeStartArray()
v.foreach(ty, (_, value) => valWriter(ty, value))
gen.writeEndArray()
case (MapType(kt, vt, _), v: MapData) =>
gen.writeStartObject()
v.foreach(kt, vt, { (k, v) =>
gen.writeFieldName(k.toString)
valWriter(vt, v)
})
gen.writeEndObject()
case (StructType(ty), v: InternalRow) =>
gen.writeStartObject()
var i = 0
while (i < ty.length) {
val field = ty(i)
val value = v.get(i, field.dataType)
if (value != null) {
gen.writeFieldName(field.name)
valWriter(field.dataType, value)
}
i += 1
}
gen.writeEndObject()
case (dt, v) =>
sys.error(
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}
valWriter(rowSchema, row)
......@@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
i += 1
}
gen.writeEndObject()
case (dt, v) =>
sys.error(
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}
valWriter(rowSchema, row)
......
......@@ -62,10 +62,23 @@ private[sql] object JacksonParser {
// guard the non string type
null
case (VALUE_STRING, BinaryType) =>
parser.getBinaryValue
case (VALUE_STRING, DateType) =>
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
val stringValue = parser.getText
if (stringValue.contains("-")) {
// The format of this string will probably be "yyyy-mm-dd".
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
} else {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
stringValue.toInt
}
case (VALUE_STRING, TimestampType) =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
case (VALUE_NUMBER_INT, TimestampType) =>
......
......@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
import org.apache.spark.rdd.RDD
import org.scalactic.Tolerance._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
......@@ -1159,4 +1159,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
"SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
})
}
test("backward compatibility") {
// This test we make sure our JSON support can read JSON data generated by previous version
// of Spark generated through toJSON method and JSON data source.
// The data is generated by the following program.
// Here are a few notes:
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
// in the JSON object.
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
// JSON objects generated by those Spark versions (col17).
// - If the type is NullType, we do not write data out.
// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)
val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)
val constantValues =
Seq(
"a string in binary".getBytes("UTF-8"),
null,
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75,
new java.math.BigDecimal(s"1234.23456"),
new java.math.BigDecimal(s"1.23456"),
java.sql.Date.valueOf("2015-01-01"),
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
Seq(2, 3, 4),
Map("a string" -> 2000L),
Row(4.75.toFloat, Seq(false, true)),
new MyDenseVector(Array(0.25, 2.25, 4.25)))
val data =
Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
// Data generated by previous versions.
// scalastyle:off
val existingJSONData =
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
// scalastyle:on
// Generate data for the current version.
val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
withTempPath { path =>
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
// df.toJSON will convert internal rows to external rows first and then generate
// JSON objects. While, df.write.format("json") will write internal rows directly.
val allJSON =
existingJSONData ++
df.toJSON.collect() ++
sparkContext.textFile(path.getCanonicalPath).collect()
Utils.deleteRecursively(path)
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
// Read data back with the schema specified.
val col0Values =
Seq(
"Spark 1.2.2",
"Spark 1.3.1",
"Spark 1.3.1",
"Spark 1.4.1",
"Spark 1.4.1",
"Spark 1.5.0",
"Spark 1.5.0",
"Spark " + sqlContext.sparkContext.version,
"Spark " + sqlContext.sparkContext.version)
val expectedResult = col0Values.map { v =>
Row.fromSeq(Seq(v) ++ constantValues)
}
checkAnswer(
sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
expectedResult
)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment