From 4321ff9edda4961273ac4a5b02dc1aed03f05e47 Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Mon, 13 Feb 2017 16:08:31 +0100
Subject: [PATCH] [SPARK-19544][SQL] Improve error message when some column
 types are compatible and others are not in set operations

## What changes were proposed in this pull request?

This PR proposes to fix the error message when some data types are compatible and others are not in set/union operation.

Currently, the code below:

```scala
Seq((1,("a", 1))).toDF.union(Seq((1L,("a", "b"))).toDF)
```

throws an exception saying `LongType` and `IntegerType` are incompatible types. It should say something about `StructType`s with more readable format as below:

**Before**

```
Union can only be performed on tables with the compatible column types.
LongType <> IntegerType at the first column of the second table;;
```

**After**

```
Union can only be performed on tables with the compatible column types.
struct<_1:string,_2:string> <> struct<_1:string,_2:int> at the second column of the second table;;
```

*I manually inserted a newline in the messages above for readability only in this PR description.

## How was this patch tested?

Unit tests in `AnalysisErrorSuite`, manual tests and build wth Scala 2.10.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16882 from HyukjinKwon/SPARK-19544.
---
 .../sql/catalyst/analysis/CheckAnalysis.scala |  6 ++---
 .../sql/catalyst/analysis/TypeCoercion.scala  | 24 ++++++++++---------
 .../analysis/AnalysisErrorSuite.scala         | 15 ++++++++++++
 .../sql/catalyst/analysis/TestRelations.scala |  7 ++++++
 4 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index b4a7c05ee0..532ecb8757 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -321,12 +321,12 @@ trait CheckAnalysis extends PredicateHelper {
               // Check if the data types match.
               dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
                 // SPARK-18058: we shall not care about the nullability of columns
-                if (!dt1.sameType(dt2)) {
+                if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty) {
                   failAnalysis(
                     s"""
                       |${operator.nodeName} can only be performed on tables with the compatible
-                      |column types. $dt1 <> $dt2 at the ${ordinalNumber(ci)} column of
-                      |the ${ordinalNumber(ti + 1)} table
+                      |column types. ${dt1.catalogString} <> ${dt2.catalogString} at the
+                      |${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table
                     """.stripMargin.replace("\n", " ").trim())
                 }
               }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index b636c31703..dfaac92e04 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -116,17 +116,19 @@ object TypeCoercion {
    * i.e. the main difference with [[findTightestCommonType]] is that here we allow some
    * loss of precision when widening decimal and double, and promotion to string.
    */
-  private def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match {
-    case (t1: DecimalType, t2: DecimalType) =>
-      Some(DecimalPrecision.widerDecimalType(t1, t2))
-    case (t: IntegralType, d: DecimalType) =>
-      Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
-    case (d: DecimalType, t: IntegralType) =>
-      Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
-    case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) =>
-      Some(DoubleType)
-    case _ =>
-      findTightestCommonTypeToString(t1, t2)
+  private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
+    (t1, t2) match {
+      case (t1: DecimalType, t2: DecimalType) =>
+        Some(DecimalPrecision.widerDecimalType(t1, t2))
+      case (t: IntegralType, d: DecimalType) =>
+        Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
+      case (d: DecimalType, t: IntegralType) =>
+        Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
+      case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) =>
+        Some(DoubleType)
+      case _ =>
+        findTightestCommonTypeToString(t1, t2)
+    }
   }
 
   private def findWiderCommonType(types: Seq[DataType]) = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 96aff37a4b..c5e877d128 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -282,16 +282,31 @@ class AnalysisErrorSuite extends AnalysisTest {
     testRelation.union(nestedRelation),
     "union" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "union with a incompatible column type and compatible column types",
+    testRelation3.union(testRelation4),
+    "union"  :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "intersect with incompatible column types",
     testRelation.intersect(nestedRelation),
     "intersect" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "intersect with a incompatible column type and compatible column types",
+    testRelation3.intersect(testRelation4),
+    "intersect" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "except with incompatible column types",
     testRelation.except(nestedRelation),
     "except" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "except with a incompatible column type and compatible column types",
+    testRelation3.except(testRelation4),
+    "except" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "SPARK-9955: correct error message for aggregate",
     // When parse SQL string, we will wrap aggregate expressions with UnresolvedAlias.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
index 3741a6ba95..e12e272aed 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
@@ -37,6 +37,13 @@ object TestRelations {
     AttributeReference("g", DoubleType)(),
     AttributeReference("h", DecimalType(10, 2))())
 
+  // This is the same with `testRelation3` but only `h` is incompatible type.
+  val testRelation4 = LocalRelation(
+    AttributeReference("e", StringType)(),
+    AttributeReference("f", StringType)(),
+    AttributeReference("g", StringType)(),
+    AttributeReference("h", MapType(IntegerType, IntegerType))())
+
   val nestedRelation = LocalRelation(
     AttributeReference("top", StructType(
       StructField("duplicateField", StringType) ::
-- 
GitLab