diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 65a2a7b04dd8f496c8eb916590c3f87bd8876a27..f7109f42838e94b1a5ff25d9352a55773a4b9c54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans.UsingJoin import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -46,6 +45,16 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + protected def hasMapType(dt: DataType): Boolean = { + dt.existsRecursively(_.isInstanceOf[MapType]) + } + + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { + case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasMapType(a.dataType)) + case _ => None + } + private def checkLimitClause(limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -123,8 +132,7 @@ trait CheckAnalysis extends PredicateHelper { if (conditions.isEmpty && query.output.size != 1) { failAnalysis( s"Scalar subquery must return only one column, but got ${query.output.size}") - } - else if (conditions.nonEmpty) { + } else if (conditions.nonEmpty) { // Collect the columns from the subquery for further checking. var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) @@ -202,7 +210,7 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => + case Filter(condition, _) => splitConjunctivePredicates(condition).foreach { case _: PredicateSubquery | Not(_: PredicateSubquery) => case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => @@ -376,6 +384,14 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) + // TODO: although map type is not orderable, technically map type should be able to be + // used in equality comparison, remove this type check once we support it. + case o if mapColumnInSetOperation(o).isDefined => + val mapCol = mapColumnInSetOperation(o).get + failAnalysis("Cannot have map type columns in DataFrame which calls " + + s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + + "is " + mapCol.dataType.simpleString) + case s: SimpleCatalogRelation => failAnalysis( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 22dfc46acfc0f9277eba2565c133797dc34dbf61..ec201f3253786bcc14870106ee951ac85053133c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1739,4 +1739,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.range(1).selectExpr("CAST(id as DECIMAL) as x").selectExpr("percentile(x, 0.5)") checkAnswer(df, Row(BigDecimal(0.0)) :: Nil) } + + test("SPARK-19893: cannot run set operations with map type") { + val df = spark.range(1).select(map(lit("key"), $"id").as("m")) + val e = intercept[AnalysisException](df.intersect(df)) + assert(e.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + val e2 = intercept[AnalysisException](df.except(df)) + assert(e2.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + withTempView("v") { + df.createOrReplaceTempView("v") + val e3 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v")) + assert(e3.message.contains( + "Cannot have map type columns in DataFrame which calls set operations")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index afeb47828edeb31905343daeefa03a6dba1e9e8b..8592a2924eacfffbb21d61570545b301fa736bdd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -234,8 +234,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { Seq(StringType, BinaryType, NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct) + DateType, TimestampType, ArrayType(IntegerType), struct) val fields = dataTypes.zipWithIndex.map { case (dataType, index) => StructField(s"col$index", dataType, true) } @@ -244,10 +243,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { // Create an RDD for the schema val rdd = - sparkContext.parallelize((1 to 10000), 10).map { i => + sparkContext.parallelize(1 to 10000, 10).map { i => Row( - s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8), + s"str$i: test cache.", + s"binary$i: test cache.".getBytes(StandardCharsets.UTF_8), null, i % 2 == 0, i.toByte, @@ -255,13 +254,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { i, Long.MaxValue - i.toLong, (i + 0.25).toFloat, - (i + 0.75), + i + 0.75, BigDecimal(Long.MaxValue.toString + ".12345"), new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), new Date(i), new Timestamp(i * 1000000L), - (i to i + 10).toSeq, - (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, + i to i + 10, Row((i - 0.25).toFloat, Seq(true, false, null))) } spark.createDataFrame(rdd, schema).createOrReplaceTempView("InMemoryCache_different_data_types")