diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 073d2b1512b954b7a6b34174065c620dd63e67df..286d8549bfe27b4edd93c755bb62e6fc555261bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -556,7 +556,7 @@ class Dataset[T] private[sql](
    *   1983  03    0.410516        0.442194
    *   1984  04    0.450090        0.483521
    * }}}
- *
+   *
    * @param numRows Number of rows to show
    * @param truncate If set to more than 0, truncates strings to `truncate` characters and
    *                    all cells will be aligned right.
@@ -1524,7 +1524,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 2.0.0
    */
-  def union(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def union(other: Dataset[T]): Dataset[T] = withSetOperator {
     // This breaks caching, but it's usually ok because it addresses a very specific use case:
     // using union to union many files or partitions.
     CombineUnions(Union(logicalPlan, other.logicalPlan))
@@ -1540,7 +1540,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 1.6.0
    */
-  def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
     Intersect(logicalPlan, other.logicalPlan)
   }
 
@@ -1554,7 +1554,7 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 2.0.0
    */
-  def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
+  def except(other: Dataset[T]): Dataset[T] = withSetOperator {
     Except(logicalPlan, other.logicalPlan)
   }
 
@@ -2725,4 +2725,14 @@ class Dataset[T] private[sql](
   @inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
     Dataset(sparkSession, logicalPlan)
   }
+
+  /** A convenient function to wrap a set based logical plan and produce a Dataset. */
+  @inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
+    if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
+      // Set operators widen types (change the schema), so we cannot reuse the row encoder.
+      Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
+    } else {
+      Dataset(sparkSession, logicalPlan)
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 16cc36820848509f47240c4b90d31c686e5e8bc0..e87baa454c8b33b3e4f3d2ef69ab750cbe56ba03 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.io.File
 import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
 import java.util.UUID
 
 import scala.util.Random
@@ -1615,4 +1616,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       qe.assertAnalyzed()
     }
   }
+
+  test("SPARK-17123: Performing set operations that combine non-scala native types") {
+    val dates = Seq(
+      (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
+      (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
+    ).toDF("date", "timestamp", "decimal")
+
+    val widenTypedRows = Seq(
+      (new Timestamp(2), 10.5D, "string")
+    ).toDF("date", "timestamp", "decimal")
+
+    dates.union(widenTypedRows).collect()
+    dates.except(widenTypedRows).collect()
+    dates.intersect(widenTypedRows).collect()
+  }
 }