diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4dc13b036cd4ea5d1ecad89ab66268f1b8f14ee3..9061d3f5fee4dd8b350da1ef0c7de1a9becc0ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
 private[parquet] class FilteringParquetRowInputFormat
   extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
 
-  private var footers: JList[Footer] = _
-
   private var fileStatuses = Map.empty[Path, FileStatus]
 
   override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
     }
   }
 
-  override def getFooters(jobContext: JobContext): JList[Footer] = {
-    import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
-
-    if (footers eq null) {
-      val conf = ContextUtil.getConfiguration(jobContext)
-      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-      val statuses = listStatus(jobContext)
-      fileStatuses = statuses.map(file => file.getPath -> file).toMap
-      if (statuses.isEmpty) {
-        footers = Collections.emptyList[Footer]
-      } else if (!cacheMetadata) {
-        // Read the footers from HDFS
-        footers = getFooters(conf, statuses)
-      } else {
-        // Read only the footers that are not in the footerCache
-        val foundFooters = footerCache.getAllPresent(statuses)
-        val toFetch = new ArrayList[FileStatus]
-        for (s <- statuses) {
-          if (!foundFooters.containsKey(s)) {
-            toFetch.add(s)
-          }
-        }
-        val newFooters = new mutable.HashMap[FileStatus, Footer]
-        if (toFetch.size > 0) {
-          val startFetch = System.currentTimeMillis
-          val fetched = getFooters(conf, toFetch)
-          logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
-          for ((status, i) <- toFetch.zipWithIndex) {
-            newFooters(status) = fetched.get(i)
-          }
-          footerCache.putAll(newFooters)
-        }
-        footers = new ArrayList[Footer](statuses.size)
-        for (status <- statuses) {
-          footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
-        }
-      }
-    }
+  // This is only a temporary solution sicne we need to use fileStatuses in
+  // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
+  // two methods.
+  override def getSplits(jobContext: JobContext): JList[InputSplit] = {
+    // First set fileStatuses.
+    val statuses = listStatus(jobContext)
+    fileStatuses = statuses.map(file => file.getPath -> file).toMap
 
-    footers
+    super.getSplits(jobContext)
   }
 
   // TODO Remove this method and related code once PARQUET-16 is fixed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 16b771344bfcd355629ee8247b10d6fd652c837f..e648618468d5d7f2113b5de5a16739abf13b598f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
     private var commonMetadataStatuses: Array[FileStatus] = _
 
     // Parquet footer cache.
-    private var footers: Map[FileStatus, Footer] = _
+    var footers: Map[FileStatus, Footer] = _
 
     // `FileStatus` objects of all data files (Parquet part-files).
     var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
     } else {
       metadataCache.dataStatuses.toSeq
     }
+    val selectedFooters = selectedFiles.map(metadataCache.footers)
 
     // FileInputFormat cannot handle empty lists.
     if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
         @transient
         val cachedStatus = selectedFiles
 
+        @transient
+        val cachedFooters = selectedFooters
+
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
           val inputFormat = if (cacheMetadata) {
             new FilteringParquetRowInputFormat {
               override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
+
+              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
             }
           } else {
             new FilteringParquetRowInputFormat
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 80fd5cda20e2054c3bf77a0cafc37b61c909e2a5..6a9d9daf6750c7ea864512d80d8f37de7d5c9318 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
+import org.apache.spark.sql.SaveMode
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
       )
     """)
   }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+    sql("drop table if exists spark_6016_fix")
+
+    // Create a DataFrame with two partitions. So, the created table will have two parquet files.
+    val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+    df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    // Create a DataFrame with four partitions. So, the created table will have four parquet files.
+    val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+    df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
+    // since the new table has four parquet files, we are trying to read new footers from two files
+    // and then merge metadata in footers of these four (two outdated ones and two latest one),
+    // which will cause an error.
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    sql("drop table spark_6016_fix")
+  }
 }
 
 class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {