From cd5da428537b8dfa0bbb9592d344316c26d8f625 Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Mon, 19 Jan 2015 10:44:12 -0800
Subject: [PATCH] [SPARK-5284][SQL] Insert into Hive throws NPE when a inner
 complex type field has a null value

JIRA: https://issues.apache.org/jira/browse/SPARK-5284

Author: Yin Huai <yhuai@databricks.com>

Closes #4077 from yhuai/SPARK-5284 and squashes the following commits:

fceacd6 [Yin Huai] Check if a value is null when the field has a complex type.
---
 .../spark/sql/hive/HiveInspectors.scala       | 26 +++++++++----
 .../sql/hive/execution/SQLQuerySuite.scala    | 37 ++++++++++++++++++-
 2 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index d87c4945c8..eeabfdd857 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -346,16 +346,20 @@ private[hive] trait HiveInspectors {
     case soi: StandardStructObjectInspector =>
       val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
       (o: Any) => {
-        val struct = soi.create()
-        (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
-          (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+        if (o != null) {
+          val struct = soi.create()
+          (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
+            (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+          }
+          struct
+        } else {
+          null
         }
-        struct
       }
 
     case loi: ListObjectInspector =>
       val wrapper = wrapperFor(loi.getListElementObjectInspector)
-      (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
+      (o: Any) => if (o != null) seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) else null
 
     case moi: MapObjectInspector =>
       // The Predef.Map is scala.collection.immutable.Map.
@@ -364,9 +368,15 @@ private[hive] trait HiveInspectors {
 
       val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
       val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
-      (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
-        keyWrapper(key) -> valueWrapper(value)
-      })
+      (o: Any) => {
+        if (o != null) {
+          mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
+            keyWrapper(key) -> valueWrapper(value)
+          })
+        } else {
+          null
+        }
+      }
 
     case _ =>
       identity[Any]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c1c3683f84..d41eb9e870 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.QueryTest
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.types._
 
 case class Nested1(f1: Nested2)
 case class Nested2(f2: Nested3)
@@ -214,4 +214,39 @@ class SQLQuerySuite extends QueryTest {
         Seq.empty[Row])
     }
   }
+
+  test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") {
+    val schema = StructType(
+      StructField("s",
+        StructType(
+          StructField("innerStruct", StructType(StructField("s1", StringType, true) :: Nil)) ::
+            StructField("innerArray", ArrayType(IntegerType), true) ::
+            StructField("innerMap", MapType(StringType, IntegerType)) :: Nil), true) :: Nil)
+    val row = Row(Row(null, null, null))
+
+    val rowRdd = sparkContext.parallelize(row :: Nil)
+
+    applySchema(rowRdd, schema).registerTempTable("testTable")
+
+    sql(
+      """CREATE TABLE nullValuesInInnerComplexTypes
+        |  (s struct<innerStruct: struct<s1:string>,
+        |            innerArray:array<int>,
+        |            innerMap: map<string, int>>)
+      """.stripMargin).collect
+
+    sql(
+      """
+        |INSERT OVERWRITE TABLE nullValuesInInnerComplexTypes
+        |SELECT * FROM testTable
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM nullValuesInInnerComplexTypes"),
+      Seq(Seq(Seq(null, null, null)))
+    )
+
+    sql("DROP TABLE nullValuesInInnerComplexTypes")
+    dropTempTable("testTable")
+  }
 }
-- 
GitLab