Skip to content
Snippets Groups Projects
Commit 156df87e authored by Andrew Ash's avatar Andrew Ash Committed by Reynold Xin
Browse files

SPARK-1757 Failing test for saving null primitives with .saveAsParquetFile()

https://issues.apache.org/jira/browse/SPARK-1757

The first test succeeds, but the second test fails with exception:

```
[info] - save and load case class RDD with Nones as parquet *** FAILED *** (14 milliseconds)
[info]   java.lang.RuntimeException: Unsupported datatype StructType(List())
[info]   at scala.sys.package$.error(package.scala:27)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetRelation.scala:201)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetRelation.scala:234)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetRelation.scala:267)
[info]   at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:143)
[info]   at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:122)
[info]   at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:139)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
[info]   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:264)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:264)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:265)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:265)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:268)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:268)
[info]   at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:66)
[info]   at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:98)
```

Author: Andrew Ash <andrew@andrewash.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #690 from ash211/rdd-parquet-save and squashes the following commits:

747a0b9 [Andrew Ash] Merge pull request #1 from marmbrus/pr/690
54bd00e [Michael Armbrust] Need to put Option first since Option <: Seq.
8f3f281 [Andrew Ash] SPARK-1757 Add failing test for saving SparkSQL Schemas with Option[?] fields as parquet
parent 9cf9f189
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,9 @@ object ScalaReflection {
/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
schemaFor(optType)
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
......@@ -59,9 +62,6 @@ object ScalaReflection {
case t if t <:< typeOf[String] => StringType
case t if t <:< typeOf[Timestamp] => TimestampType
case t if t <:< typeOf[BigDecimal] => DecimalType
case t if t <:< typeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
schemaFor(optType)
case t if t <:< typeOf[java.lang.Integer] => IntegerType
case t if t <:< typeOf[java.lang.Long] => LongType
case t if t <:< typeOf[java.lang.Double] => DoubleType
......
......@@ -42,6 +42,20 @@ import org.apache.spark.sql.test.TestSQLContext._
case class TestRDDEntry(key: Int, value: String)
case class NullReflectData(
intField: java.lang.Integer,
longField: java.lang.Long,
floatField: java.lang.Float,
doubleField: java.lang.Double,
booleanField: java.lang.Boolean)
case class OptionalReflectData(
intField: Option[Int],
longField: Option[Long],
floatField: Option[Float],
doubleField: Option[Double],
booleanField: Option[Boolean])
class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
import TestData._
TestData // Load test data tables.
......@@ -195,5 +209,35 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
}
test("save and load case class RDD with nulls as parquet") {
val data = NullReflectData(null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)
val file = getTempFilePath("parquet")
val path = file.toString
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)
val rdd_saved = readFile.collect()
assert(rdd_saved(0) === Seq.fill(5)(null))
Utils.deleteRecursively(file)
assert(true)
}
test("save and load case class RDD with Nones as parquet") {
val data = OptionalReflectData(null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)
val file = getTempFilePath("parquet")
val path = file.toString
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)
val rdd_saved = readFile.collect()
assert(rdd_saved(0) === Seq.fill(5)(null))
Utils.deleteRecursively(file)
assert(true)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment