diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index b4a7c05ee0fd1482ae9d06f6922f2415993aca17..532ecb8757e9cd3d1dae0cc8db9116e5ace6ecca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -321,12 +321,12 @@ trait CheckAnalysis extends PredicateHelper {
               // Check if the data types match.
               dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
                 // SPARK-18058: we shall not care about the nullability of columns
-                if (!dt1.sameType(dt2)) {
+                if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty) {
                   failAnalysis(
                     s"""
                       |${operator.nodeName} can only be performed on tables with the compatible
-                      |column types. $dt1 <> $dt2 at the ${ordinalNumber(ci)} column of
-                      |the ${ordinalNumber(ti + 1)} table
+                      |column types. ${dt1.catalogString} <> ${dt2.catalogString} at the
+                      |${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 1)} table
                     """.stripMargin.replace("\n", " ").trim())
                 }
               }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index b636c317031523a2d17b25cd80a0dbb613644bd4..dfaac92e04a2d3dc86345418945eb66e26d27f07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -116,17 +116,19 @@ object TypeCoercion {
    * i.e. the main difference with [[findTightestCommonType]] is that here we allow some
    * loss of precision when widening decimal and double, and promotion to string.
    */
-  private def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match {
-    case (t1: DecimalType, t2: DecimalType) =>
-      Some(DecimalPrecision.widerDecimalType(t1, t2))
-    case (t: IntegralType, d: DecimalType) =>
-      Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
-    case (d: DecimalType, t: IntegralType) =>
-      Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
-    case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) =>
-      Some(DoubleType)
-    case _ =>
-      findTightestCommonTypeToString(t1, t2)
+  private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
+    (t1, t2) match {
+      case (t1: DecimalType, t2: DecimalType) =>
+        Some(DecimalPrecision.widerDecimalType(t1, t2))
+      case (t: IntegralType, d: DecimalType) =>
+        Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
+      case (d: DecimalType, t: IntegralType) =>
+        Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d))
+      case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) =>
+        Some(DoubleType)
+      case _ =>
+        findTightestCommonTypeToString(t1, t2)
+    }
   }
 
   private def findWiderCommonType(types: Seq[DataType]) = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 96aff37a4b4f90c0ed0a82930b3a4d2c51f9042e..c5e877d12811cc1b8c16eb9f0ecf6eaf9487187c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -282,16 +282,31 @@ class AnalysisErrorSuite extends AnalysisTest {
     testRelation.union(nestedRelation),
     "union" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "union with a incompatible column type and compatible column types",
+    testRelation3.union(testRelation4),
+    "union"  :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "intersect with incompatible column types",
     testRelation.intersect(nestedRelation),
     "intersect" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "intersect with a incompatible column type and compatible column types",
+    testRelation3.intersect(testRelation4),
+    "intersect" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "except with incompatible column types",
     testRelation.except(nestedRelation),
     "except" :: "the compatible column types" :: Nil)
 
+  errorTest(
+    "except with a incompatible column type and compatible column types",
+    testRelation3.except(testRelation4),
+    "except" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
+
   errorTest(
     "SPARK-9955: correct error message for aggregate",
     // When parse SQL string, we will wrap aggregate expressions with UnresolvedAlias.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
index 3741a6ba95a862491e7d22df0c57574c9aae0c67..e12e272aedffe58e8593ae01629ed86406c5043e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TestRelations.scala
@@ -37,6 +37,13 @@ object TestRelations {
     AttributeReference("g", DoubleType)(),
     AttributeReference("h", DecimalType(10, 2))())
 
+  // This is the same with `testRelation3` but only `h` is incompatible type.
+  val testRelation4 = LocalRelation(
+    AttributeReference("e", StringType)(),
+    AttributeReference("f", StringType)(),
+    AttributeReference("g", StringType)(),
+    AttributeReference("h", MapType(IntegerType, IntegerType))())
+
   val nestedRelation = LocalRelation(
     AttributeReference("top", StructType(
       StructField("duplicateField", StringType) ::