diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 99d665fafec8978c0321a9ccd47f09b97b815b38..7008e8fadffc3d3f61695aba7928be2d345134a8 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -173,8 +173,7 @@ class DataFrame(object):
 
         >>> df.explain()
         == Physical Plan ==
-        WholeStageCodegen
-        :  +- Scan ExistingRDD[age#0,name#1]
+        Scan ExistingRDD[age#0,name#1]
 
         >>> df.explain(True)
         == Parsed Logical Plan ==
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index c222571a3464b9136f152c168a30dd7c5120eea0..920e989d058dc44d65056c173d1a6475015384ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -280,12 +280,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
    * can do better should override this function.
    */
   def sameResult(plan: PlanType): Boolean = {
-    val canonicalizedLeft = this.canonicalized
-    val canonicalizedRight = plan.canonicalized
-    canonicalizedLeft.getClass == canonicalizedRight.getClass &&
-      canonicalizedLeft.children.size == canonicalizedRight.children.size &&
-      canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
-      (canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
+    val left = this.canonicalized
+    val right = plan.canonicalized
+    left.getClass == right.getClass &&
+      left.children.size == right.children.size &&
+      left.cleanArgs == right.cleanArgs &&
+      (left.children, right.children).zipped.forall(_ sameResult _)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3662ed74d2eae0e7209492fdf18cbf034c73a2b6..d363cb000d39a52671b5fb297fa5cf49b6c5913e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -101,17 +101,76 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    override val nodeName: String,
-    override val metadata: Map[String, String] = Map.empty,
-    isUnsafeRow: Boolean = false,
-    override val outputPartitioning: Partitioning = UnknownPartitioning(0))
+    override val nodeName: String) extends LeafNode {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.mapPartitionsInternal { iter =>
+      val proj = UnsafeProjection.create(schema)
+      iter.map { r =>
+        numOutputRows += 1
+        proj(r)
+      }
+    }
+  }
+
+  override def simpleString: String = {
+    s"Scan $nodeName${output.mkString("[", ",", "]")}"
+  }
+}
+
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class DataSourceScan(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val metadata: Map[String, String] = Map.empty)
   extends LeafNode with CodegenSupport {
 
+  override val nodeName: String = relation.toString
+
+  // Ignore rdd when checking results
+  override def sameResult(plan: SparkPlan ): Boolean = plan match {
+    case other: DataSourceScan => relation == other.relation && metadata == other.metadata
+    case _ => false
+  }
+
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
+  val outputUnsafeRows = relation match {
+    case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
+      !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    case _: HadoopFsRelation => true
+    case _ => false
+  }
+
+  override val outputPartitioning = {
+    val bucketSpec = relation match {
+      // TODO: this should be closer to bucket planning.
+      case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
+      case _ => None
+    }
+
+    def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+      throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+        s"(${output.map(_.name).mkString(", ")})")
+    }
+
+    bucketSpec.map { spec =>
+      val numBuckets = spec.numBuckets
+      val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+      HashPartitioning(bucketColumns, numBuckets)
+    }.getOrElse {
+      UnknownPartitioning(0)
+    }
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = if (isUnsafeRow) {
+    val unsafeRow = if (outputUnsafeRows) {
       rdd
     } else {
       rdd.mapPartitionsInternal { iter =>
@@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD(
     ctx.INPUT_ROW = row
     ctx.currentVars = null
     val columns2 = exprs.map(_.gen(ctx))
-    val inputRow = if (isUnsafeRow) row else null
+    val inputRow = if (outputUnsafeRows) row else null
     val scanRows = ctx.freshName("processRows")
     ctx.addNewFunction(scanRows,
       s"""
@@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD(
   }
 }
 
-private[sql] object PhysicalRDD {
+private[sql] object DataSourceScan {
   // Metadata keys
   val INPUT_PATHS = "InputPaths"
   val PUSHED_FILTERS = "PushedFilters"
-
-  def createFromDataSource(
-      output: Seq[Attribute],
-      rdd: RDD[InternalRow],
-      relation: BaseRelation,
-      metadata: Map[String, String] = Map.empty): PhysicalRDD = {
-
-    val outputUnsafeRows = relation match {
-      case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
-        !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
-      case _: HadoopFsRelation => true
-      case _ => false
-    }
-
-    val bucketSpec = relation match {
-      // TODO: this should be closer to bucket planning.
-      case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec
-      case _ => None
-    }
-
-    def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
-      throw new AnalysisException(s"bucket column $colName not found in existing columns " +
-        s"(${output.map(_.name).mkString(", ")})")
-    }
-
-    bucketSpec.map { spec =>
-      val numBuckets = spec.numBuckets
-      val bucketColumns = spec.bucketColumnNames.map(toAttribute)
-      val partitioning = HashPartitioning(bucketColumns, numBuckets)
-      PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
-    }.getOrElse {
-      PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
-    }
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 52c2971b73f1b40bb7a2ced90dc81aaf820f6926..8fb4705581a384b029f2d934cf79f77a23789a38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
     case _: BroadcastHashJoin => "bhj"
     case _: SortMergeJoin => "smj"
     case _: PhysicalRDD => "rdd"
+    case _: DataSourceScan => "scan"
     case _ => nodeName.toLowerCase
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2944a8f86f16983f7b245405c36abdba19c7097a..1adf3b6676555389464d606f8900a866bcaf3a27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, PUSHED_FILTERS}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.ExecutedCommand
 import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVectorUtils}
@@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       }
 
     case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
-      execution.PhysicalRDD.createFromDataSource(
+      execution.DataSourceScan(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
 
     case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         // Don't request columns that are only referenced by pushed filters.
         .filterNot(handledSet.contains)
 
-      val scan = execution.PhysicalRDD.createFromDataSource(
+      val scan = execution.DataSourceScan(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)
@@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val requestedColumns =
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
 
-      val scan = execution.PhysicalRDD.createFromDataSource(
+      val scan = execution.DataSourceScan(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 94d318e7027894feed6c2a3c456d5f36acbb1d96..8a36d3224003adfeb2a12102607cda5e178e2358 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph {
       case "Subquery" if subgraph != null =>
         // Subquery should not be included in WholeStageCodegen
         buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges)
+      case "ReusedExchange" =>
+        // Point to the re-used exchange
+        val node = exchanges(planInfo.children.head)
+        edges += SparkPlanGraphEdge(node.id, parent.id)
       case name =>
         val metrics = planInfo.metrics.map { metric =>
           SQLPlanMetric(metric.name, metric.accumulatorId,
@@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph {
         } else {
           subgraph.nodes += node
         }
-        if (name == "ShuffleExchange" || name == "BroadcastExchange") {
+        if (name.contains("Exchange")) {
           exchanges += planInfo -> node
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 3780cbbcc9631d2df4f4e272c2073b1456dba0e0..9130e77ea572411bdcaa9d6dcfef7d907da43a11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) extends Filter
  *
  * @since 1.3.0
  */
-case class In(attribute: String, values: Array[Any]) extends Filter
+case class In(attribute: String, values: Array[Any]) extends Filter {
+  override def hashCode(): Int = {
+    var h = attribute.hashCode
+    values.foreach { v =>
+      h *= 41
+      h += v.hashCode()
+    }
+    h
+  }
+  override def equals(o: Any): Boolean = o match {
+    case In(a, vs) =>
+      a == attribute && vs.length == values.length && vs.zip(values).forall(x => x._1 == x._2)
+    case _ => false
+  }
+  override def toString: String = {
+    s"In($attribute, [${values.mkString(",")}]"
+  }
+}
 
 /**
  * A filter that evaluates to `true` iff the attribute evaluates to null.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index dfffa582120ccb5cf4ed665a7d0305bba5769620..1ef517324d7cbb64bafc312db8d3eacaff713465 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
       // the plan only has PhysicalRDD to scan JDBCRelation.
       assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
       val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
-      assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
-      assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+      assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
+      assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
       df
     }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 2ff79a2316bdca1228c3f70a6e5aa535bc6f404f..19e34b45bff6705fa757f847020ef025300a9a5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
-          case p: execution.PhysicalRDD => p
+          case p: execution.DataSourceScan => p
         } match {
           case Seq(p) => p
           case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index db722975379a25e9fa1e924a155da2ed09bdc430..62f991fc5dc6152aabfee6d87dc5f794c5f7bfc0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext {
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
-          case p: execution.PhysicalRDD => p
+          case p: execution.DataSourceScan => p
         } match {
           case Seq(p) => p
           case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0f09d6c4a36e3571ee7663955118b35ed218bc7..8fdbbd94c807e863c51063bba364d3e3ce28bb6f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.command.ExecutedCommand
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScan
@@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       }.isEmpty)
     assert(
       sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
-        case _: PhysicalRDD => true
+        case _: DataSourceScan => true
       }.nonEmpty)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 35573f62dc63384792ba24064e9a8e8a68457b0f..a0be55cfba94c7960ba7e08af0c9939769fe0f97 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy}
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoin
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
 
       // Filter could hide the bug in bucket pruning. Thus, skipping all the filters
       val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
-      val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
+      val rdd = plan.find(_.isInstanceOf[DataSourceScan])
       assert(rdd.isDefined, plan)
 
       val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>