diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 56face2992bcfc6c823a7ac3ef05bcefb832fef1..4f2adb006fbc7cb851196dea373ae6fe0488ce95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -32,6 +32,7 @@ private[spark] object SQLConf {
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
+  val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 759a2a586b926f7d3bf46b65686c6e212bc61fc3..c6dca10f6ad7ccbeb70e906a8931d202158322f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -17,22 +17,23 @@
 
 package org.apache.spark.sql.parquet
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.Try
-
 import java.io.IOException
 import java.lang.{Long => JLong}
 import java.text.SimpleDateFormat
-import java.util.{Date, List => JList}
+import java.util.concurrent.{Callable, TimeUnit}
+import java.util.{ArrayList, Collections, Date, List => JList}
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.Try
+
+import com.google.common.cache.CacheBuilder
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
 import parquet.hadoop._
 import parquet.hadoop.api.{InitContext, ReadSupport}
 import parquet.hadoop.metadata.GlobalMetaData
@@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
 import parquet.schema.MessageType
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -96,6 +97,11 @@ case class ParquetTableScan(
       ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
     }
 
+    // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
+    conf.set(
+      SQLConf.PARQUET_CACHE_METADATA,
+      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
+
     sc.newAPIHadoopRDD(
       conf,
       classOf[FilteringParquetRowInputFormat],
@@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
   }
 
   override def getFooters(jobContext: JobContext): JList[Footer] = {
+    import FilteringParquetRowInputFormat.footerCache
+
     if (footers eq null) {
+      val conf = ContextUtil.getConfiguration(jobContext)
+      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
       val statuses = listStatus(jobContext)
       fileStatuses = statuses.map(file => file.getPath -> file).toMap
-      footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
+      if (statuses.isEmpty) {
+        footers = Collections.emptyList[Footer]
+      } else if (!cacheMetadata) {
+        // Read the footers from HDFS
+        footers = getFooters(conf, statuses)
+      } else {
+        // Read only the footers that are not in the footerCache
+        val foundFooters = footerCache.getAllPresent(statuses)
+        val toFetch = new ArrayList[FileStatus]
+        for (s <- statuses) {
+          if (!foundFooters.containsKey(s)) {
+            toFetch.add(s)
+          }
+        }
+        val newFooters = new mutable.HashMap[FileStatus, Footer]
+        if (toFetch.size > 0) {
+          val fetched = getFooters(conf, toFetch)
+          for ((status, i) <- toFetch.zipWithIndex) {
+            newFooters(status) = fetched.get(i)
+          }
+          footerCache.putAll(newFooters)
+        }
+        footers = new ArrayList[Footer](statuses.size)
+        for (status <- statuses) {
+          footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
+        }
+      }
     }
 
     footers
@@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
       configuration: Configuration,
       footers: JList[Footer]): JList[ParquetInputSplit] = {
 
+    import FilteringParquetRowInputFormat.blockLocationCache
+
+    val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+
     val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
     val minSplitSize: JLong =
       Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
@@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
     for (footer <- footers) {
       val fs = footer.getFile.getFileSystem(configuration)
       val file = footer.getFile
-      val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
+      val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
       val parquetMetaData = footer.getParquetMetadata
       val blocks = parquetMetaData.getBlocks
-      val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
+      var blockLocations: Array[BlockLocation] = null
+      if (!cacheMetadata) {
+        blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
+      } else {
+        blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
+          def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
+        })
+      }
       splits.addAll(
         generateSplits.invoke(
           null,
           blocks,
-          fileBlockLocations,
-          fileStatus,
+          blockLocations,
+          status,
           parquetMetaData.getFileMetaData,
           readContext.getRequestedSchema.toString,
           readContext.getReadSupportMetadata,
@@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
   }
 }
 
+private[parquet] object FilteringParquetRowInputFormat {
+  private val footerCache = CacheBuilder.newBuilder()
+    .maximumSize(20000)
+    .build[FileStatus, Footer]()
+
+  private val blockLocationCache = CacheBuilder.newBuilder()
+    .maximumSize(20000)
+    .expireAfterWrite(15, TimeUnit.MINUTES)  // Expire locations since HDFS files might move
+    .build[FileStatus, Array[BlockLocation]]()
+}
+
 private[parquet] object FileSystemHelper {
   def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
     val origPath = new Path(pathStr)