diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6a7d1fae3320eb666d173f176f1cfa93a8ef4a2f..b7a2473dfe9200fc587982925adb282d2a1ebd19 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging {
     shutdownHooks.remove(ref)
   }
 
+  /**
+   * To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
+   * set a dummy call site that RDDs use instead. This is for performance optimization.
+   */
+  def withDummyCallSite[T](sc: SparkContext)(body: => T): T = {
+    val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM)
+    val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM)
+    try {
+      sc.setLocalProperty(CallSite.SHORT_FORM, "")
+      sc.setLocalProperty(CallSite.LONG_FORM, "")
+      body
+    } finally {
+      // Restore the old ones here
+      sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite)
+      sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite)
+    }
+  }
+
 }
 
 private [util] class SparkShutdownHookManager {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 32986aa3ecc209d00865cc6e911a1a466dc8d9c7..cb1e60883df1ee27d3c748f9e2b66642d498826a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -33,6 +33,7 @@ import parquet.hadoop._
 import parquet.hadoop.metadata.CompressionCodecName
 import parquet.hadoop.util.ContextUtil
 
+import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD._
@@ -40,7 +41,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
-import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
+import org.apache.spark.util.Utils
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
   override def createRelation(
@@ -264,57 +265,58 @@ private[sql] class ParquetRelation2(
 
     val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
 
-    // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
-    // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
-    // footers.  Especially when a global arbitrative schema (either from metastore or data source
-    // DDL) is available.
-    new SqlNewHadoopRDD(
-      sc = sqlContext.sparkContext,
-      broadcastedConf = broadcastedConf,
-      initDriverSideJobFuncOpt = Some(setInputPaths),
-      initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-      inputFormatClass = classOf[FilteringParquetRowInputFormat],
-      keyClass = classOf[Void],
-      valueClass = classOf[Row]) {
-
-      val cacheMetadata = useMetadataCache
-
-      @transient val cachedStatuses = inputFiles.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
-        new FileStatus(
-          f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
-          f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
-      }.toSeq
-
-      @transient val cachedFooters = footers.map { f =>
-        // In order to encode the authority of a Path containing special characters such as /,
-        // we need to use the string returned by the URI of the path to create a new Path.
-        new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
-      }.toSeq
-
-      // Overridden so we can inject our own cached files statuses.
-      override def getPartitions: Array[SparkPartition] = {
-        val inputFormat = if (cacheMetadata) {
-          new FilteringParquetRowInputFormat {
-            override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
-
-            override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+    Utils.withDummyCallSite(sqlContext.sparkContext) {
+      // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
+      // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
+      // and footers. Especially when a global arbitrative schema (either from metastore or data
+      // source DDL) is available.
+      new SqlNewHadoopRDD(
+        sc = sqlContext.sparkContext,
+        broadcastedConf = broadcastedConf,
+        initDriverSideJobFuncOpt = Some(setInputPaths),
+        initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
+        inputFormatClass = classOf[FilteringParquetRowInputFormat],
+        keyClass = classOf[Void],
+        valueClass = classOf[Row]) {
+
+        val cacheMetadata = useMetadataCache
+
+        @transient val cachedStatuses = inputFiles.map { f =>
+          // In order to encode the authority of a Path containing special characters such as /,
+          // we need to use the string returned by the URI of the path to create a new Path.
+          val pathWithAuthority = new Path(f.getPath.toUri.toString)
+
+          new FileStatus(
+            f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+        }.toSeq
+
+        @transient val cachedFooters = footers.map { f =>
+          // In order to encode the authority of a Path containing special characters such as /,
+          // we need to use the string returned by the URI of the path to create a new Path.
+          new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+        }.toSeq
+
+        // Overridden so we can inject our own cached files statuses.
+        override def getPartitions: Array[SparkPartition] = {
+          val inputFormat = if (cacheMetadata) {
+            new FilteringParquetRowInputFormat {
+              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
+              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
+            }
+          } else {
+            new FilteringParquetRowInputFormat
           }
-        } else {
-          new FilteringParquetRowInputFormat
-        }
 
-        val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
-        val rawSplits = inputFormat.getSplits(jobContext)
+          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+          val rawSplits = inputFormat.getSplits(jobContext)
 
-        Array.tabulate[SparkPartition](rawSplits.size) { i =>
-          new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+          Array.tabulate[SparkPartition](rawSplits.size) { i =>
+            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+          }
         }
-      }
-    }.values
+      }.values
+    }
   }
 
   private class MetadataCache {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 550090d22d551628fd716304cc6252ce891d5f37..c03649d00bbaec161b617708e5701ed1a21703c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
 import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
+import org.apache.spark.util.Utils
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
@@ -197,7 +198,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         }
       }
 
-      dataRows.mapPartitions { iterator =>
+      // Since we know for sure that this closure is serializable, we can avoid the overhead
+      // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
+      // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
+      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => {
         val dataTypes = requiredColumns.map(schema(_).dataType)
         val mutableRow = new SpecificMutableRow(dataTypes)
         iterator.map { dataRow =>
@@ -209,6 +213,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
           mutableRow.asInstanceOf[expressions.Row]
         }
       }
+
+      // This is an internal RDD whose call site the user should not be concerned with
+      // Since we create many of these (one per partition), the time spent on computing
+      // the call site may add up.
+      Utils.withDummyCallSite(dataRows.sparkContext) {
+        new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
+      }
+
     } else {
       dataRows
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index 0c7bb6e50cd98c9f4361218ef5389461da530738..a74a98631da35b216d52c23502476c2024fa8230 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
   with SparkHadoopMapReduceUtil
   with Logging {
 
-  if (initLocalJobFuncOpt.isDefined) {
-    sc.clean(initLocalJobFuncOpt.get)
-  }
-
   protected def getJob(): Job = {
     val conf: Configuration = broadcastedConf.value.value
     // "new Job" will make a copy of the conf. Then, it is