Skip to content
Snippets Groups Projects
Commit 4352a2fd authored by Yin Huai's avatar Yin Huai Committed by Michael Armbrust
Browse files

[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises...

[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException

JIRA: https://issues.apache.org/jira/browse/SPARK-2376

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1320 from yhuai/SPARK-2376 and squashes the following commits:

0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376
480803d [Yin Huai] Correctly handling JSON arrays in PySpark.
parent f0496ee1
No related branches found
No related tags found
No related merge requests found
......@@ -152,10 +152,12 @@ class SQLContext:
>>> ofn.close()
>>> srdd = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
>>> srdd2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True
"""
jschema_rdd = self._ssql_ctx.jsonFile(path)
......@@ -167,10 +169,12 @@ class SQLContext:
>>> srdd = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
>>> srdd2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True
"""
def func(split, iterator):
......@@ -492,8 +496,8 @@ def _test():
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
'{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}']
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
......
......@@ -17,6 +17,11 @@
package org.apache.spark.sql
import java.util.{Map => JMap, List => JList, Set => JSet}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import net.razorvine.pickle.Pickler
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
......@@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}
/**
* :: AlphaComponent ::
......@@ -359,6 +363,28 @@ class SchemaRDD(
case (obj, (name, dataType)) =>
dataType match {
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
case array @ ArrayType(struct: StructType) =>
val arrayValues = obj match {
case seq: Seq[Any] =>
seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
case list: JList[Any] =>
list.map(element => rowToMap(element.asInstanceOf[Row], struct))
case set: JSet[Any] =>
set.map(element => rowToMap(element.asInstanceOf[Row], struct))
case array if array != null && array.getClass.isArray =>
array.asInstanceOf[Array[Any]].map {
element => rowToMap(element.asInstanceOf[Row], struct)
}
case other => other
}
map.put(name, arrayValues)
case array: ArrayType => {
val arrayValues = obj match {
case seq: Seq[Any] => seq.asJava
case other => other
}
map.put(name, arrayValues)
}
case other => map.put(name, obj)
}
}
......@@ -366,22 +392,11 @@ class SchemaRDD(
map
}
// TODO: Actually, the schema of a row should be represented by a StructType instead of
// a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
// construct the Map for python.
val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
field => (field.name, field.dataType))
val rowSchema = StructType.fromAttributes(this.queryExecution.analyzed.output)
this.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
val map: JMap[String, Any] = new java.util.HashMap
row.zip(fields).foreach { case (obj, (name, dataType)) =>
dataType match {
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
case other => map.put(name, obj)
}
}
map
rowToMap(row, rowSchema)
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment