From 05308426f0f51273be95fb1ca2cb1ec19d83cec8 Mon Sep 17 00:00:00 2001 From: Yin Huai <huai@cse.ohio-state.edu> Date: Sun, 26 Oct 2014 16:30:15 -0700 Subject: [PATCH] [SPARK-4052][SQL] Use scala.collection.Map for pattern matching instead of using Predef.Map (it is scala.collection.immutable.Map) Please check https://issues.apache.org/jira/browse/SPARK-4052 for cases triggering this bug. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #2899 from yhuai/SPARK-4052 and squashes the following commits: 1188f70 [Yin Huai] Address liancheng's comments. b6712be [Yin Huai] Use scala.collection.Map instead of Predef.Map (scala.collection.immutable.Map). --- .../spark/sql/catalyst/ScalaReflection.scala | 3 +++ .../scala/org/apache/spark/sql/TestData.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 4 ++++ .../sql/hive/InsertIntoHiveTableSuite.scala | 18 ++++++++++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 3d4296f9d7..7d930fccd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.types._ * Provides experimental support for generating catalyst schemas for scala objects. */ object ScalaReflection { + // The Predef.Map is scala.collection.immutable.Map. + // Since the map values can be mutable, we explicitly import scala.collection.Map at here. + import scala.collection.Map import scala.reflect.runtime.universe._ case class Schema(dataType: DataType, nullable: Boolean) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 1c21afc17e..6c38575b13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -99,7 +99,7 @@ object TestData { ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil) arrayData.registerTempTable("arrayData") - case class MapData(data: Map[Int, String]) + case class MapData(data: scala.collection.Map[Int, String]) val mapData = TestSQLContext.sparkContext.parallelize( MapData(Map(1 -> "a1", 2 -> "b1", 3 -> "c1", 4 -> "d1", 5 -> "e1")) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 7db5fd804d..79234f8a66 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -93,6 +93,10 @@ case class InsertIntoHiveTable( (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) case moi: MapObjectInspector => + // The Predef.Map is scala.collection.immutable.Map. + // Since the map values can be mutable, we explicitly import scala.collection.Map at here. + import scala.collection.Map + val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 7e323146f9..18dc937dd2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.QueryTest +import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHive /* Implicits */ @@ -73,4 +74,21 @@ class InsertIntoHiveTableSuite extends QueryTest { createTable[TestData]("createAndInsertTest") createTable[TestData]("createAndInsertTest") } + + test("SPARK-4052: scala.collection.Map as value type of MapType") { + val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil) + val rowRDD = TestHive.sparkContext.parallelize( + (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i")))) + val schemaRDD = applySchema(rowRDD, schema) + schemaRDD.registerTempTable("tableWithMapValue") + sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)") + sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") + + checkAnswer( + sql("SELECT * FROM hiveTableWithMapValue"), + rowRDD.collect().toSeq + ) + + sql("DROP TABLE hiveTableWithMapValue") + } } -- GitLab