diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 86269eac52db0cad92df2806c67bcdbc95133d44..ea4ddcc2e265d76b3454eb4a1e969d19c1900e27 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   /** Build the union of a list of RDDs. */
   def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
     val partitioners = rdds.flatMap(_.partitioner).toSet
-    if (partitioners.size == 1) {
+    if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
       new PartitionerAwareUnionRDD(this, rdds)
     } else {
       new UnionRDD(this, rdds)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 92b0641d0fb6e8912fa4e8a4d21e6ceda540e048..7598ff617b3995a7cbd4a103f2f9af895232cc52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
     var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
   require(rdds.length > 0)
+  require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
     "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
 
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index df42faab645054d5b2dda6c2105dd7ae4a61314c..ef8c36a28655b174882869936f608ff333d8ab37 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
   }
 
+  test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+    val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+    val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
+    assert(unionRdd.isInstanceOf[UnionRDD[_]])
+  }
+
+  test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+    val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
+    assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
+  }
+
+  test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
+    val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+    intercept[IllegalArgumentException] {
+      new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner))
+    }
+  }
+
   test("partitioner aware union") {
     def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
       sc.makeRDD(seq, 1)