Skip to content
Snippets Groups Projects
Commit 7035d880 authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-9894] [SQL] Json writer should handle MapData.

https://issues.apache.org/jira/browse/SPARK-9894

Author: Yin Huai <yhuai@databricks.com>

Closes #8137 from yhuai/jsonMapData.
parent ab7e721c
No related branches found
No related tags found
No related merge requests found
......@@ -107,12 +107,12 @@ private[sql] object JacksonGenerator {
v.foreach(ty, (_, value) => valWriter(ty, value))
gen.writeEndArray()
case (MapType(kv, vv, _), v: Map[_, _]) =>
case (MapType(kt, vt, _), v: MapData) =>
gen.writeStartObject()
v.foreach { p =>
gen.writeFieldName(p._1.toString)
valWriter(vv, p._2)
}
v.foreach(kt, vt, { (k, v) =>
gen.writeFieldName(k.toString)
valWriter(vt, v)
})
gen.writeEndObject()
case (StructType(ty), v: InternalRow) =>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = "json"
import sqlContext._
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
.saveAsTextFile(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
read.format(dataSourceName)
.option("dataSchema", dataSchemaWithPartition.json)
.load(file.getCanonicalPath))
}
}
test("SPARK-9894: save complex types to JSON") {
withTempDir { file =>
file.delete()
val schema =
new StructType()
.add("array", ArrayType(LongType))
.add("map", MapType(StringType, new StructType().add("innerField", LongType)))
val data =
Row(Seq(1L, 2L, 3L), Map("m1" -> Row(4L))) ::
Row(Seq(5L, 6L, 7L), Map("m2" -> Row(10L))) :: Nil
val df = createDataFrame(sparkContext.parallelize(data), schema)
// Write the data out.
df.write.format(dataSourceName).save(file.getCanonicalPath)
// Read it back and check the result.
checkAnswer(
read.format(dataSourceName).schema(schema).load(file.getCanonicalPath),
df
)
}
}
}
......@@ -50,33 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String =
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource].getCanonicalName
import sqlContext._
test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)
for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
sparkContext
.parallelize(for (i <- 1 to 3) yield s"""{"a":$i,"b":"val_$i"}""")
.saveAsTextFile(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
read.format(dataSourceName)
.option("dataSchema", dataSchemaWithPartition.json)
.load(file.getCanonicalPath))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment