diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index c057de9b3f4df2d9b4754f0db27fd26aee5aac1e..d9902f96dfd4ec15e179cd17bf8820f4ead7ce22 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import java.io.File
 import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 
@@ -28,10 +26,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
 import org.apache.spark.storage._
 import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -43,24 +39,7 @@ private[spark] trait ShuffleWriterGroup {
 
 /**
  * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer (this set of files is called a ShuffleFileGroup).
- *
- * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
- * files, it releases them for another task.
- * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
- *   - shuffleId: The unique id given to the entire shuffle stage.
- *   - bucketId: The id of the output partition (i.e., reducer id)
- *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
- *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
- * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
- * that specifies where in a given file the actual block data is located.
- *
- * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
- * each block stored in each file. In order to find the location of a shuffle block, we search the
- * files within a ShuffleFileGroups associated with the block's reducer.
+ * per reducer.
  */
 // Note: Changes to the format in this file should be kept in sync with
 // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
@@ -71,26 +50,15 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
-  // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
-  // TODO: Remove this once the shuffle file consolidation feature is stable.
-  private val consolidateShuffleFiles =
-    conf.getBoolean("spark.shuffle.consolidateFiles", false)
-
   // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
   private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
 
   /**
-   * Contains all the state related to a particular shuffle. This includes a pool of unused
-   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
+   * Contains all the state related to a particular shuffle.
    */
-  private class ShuffleState(val numBuckets: Int) {
-    val nextFileId = new AtomicInteger(0)
-    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
+  private class ShuffleState(val numReducers: Int) {
     /**
      * The mapIds of all map tasks completed on this Executor for this shuffle.
-     * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
      */
     val completedMapTasks = new ConcurrentLinkedQueue[Int]()
   }
@@ -104,24 +72,16 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
    * Get a ShuffleWriterGroup for the given map task, which will register it as complete
    * when the writers are closed successfully
    */
-  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
+  def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
       writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
     new ShuffleWriterGroup {
-      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
+      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
       private val shuffleState = shuffleStates(shuffleId)
-      private var fileGroup: ShuffleFileGroup = null
 
       val openStartTime = System.nanoTime
       val serializerInstance = serializer.newInstance()
-      val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) {
-        fileGroup = getUnusedFileGroup()
-        Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
-            writeMetrics)
-        }
-      } else {
-        Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
+      val writers: Array[DiskBlockObjectWriter] = {
+        Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
           val blockFile = blockManager.diskBlockManager.getFile(blockId)
           // Because of previous failures, the shuffle file may already exist on this machine.
@@ -142,58 +102,14 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
       writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
 
       override def releaseWriters(success: Boolean) {
-        if (consolidateShuffleFiles) {
-          if (success) {
-            val offsets = writers.map(_.fileSegment().offset)
-            val lengths = writers.map(_.fileSegment().length)
-            fileGroup.recordMapOutput(mapId, offsets, lengths)
-          }
-          recycleFileGroup(fileGroup)
-        } else {
-          shuffleState.completedMapTasks.add(mapId)
-        }
-      }
-
-      private def getUnusedFileGroup(): ShuffleFileGroup = {
-        val fileGroup = shuffleState.unusedFileGroups.poll()
-        if (fileGroup != null) fileGroup else newFileGroup()
-      }
-
-      private def newFileGroup(): ShuffleFileGroup = {
-        val fileId = shuffleState.nextFileId.getAndIncrement()
-        val files = Array.tabulate[File](numBuckets) { bucketId =>
-          val filename = physicalFileName(shuffleId, bucketId, fileId)
-          blockManager.diskBlockManager.getFile(filename)
-        }
-        val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
-        shuffleState.allFileGroups.add(fileGroup)
-        fileGroup
-      }
-
-      private def recycleFileGroup(group: ShuffleFileGroup) {
-        shuffleState.unusedFileGroups.add(group)
+        shuffleState.completedMapTasks.add(mapId)
       }
     }
   }
 
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
-    if (consolidateShuffleFiles) {
-      // Search all file groups associated with this shuffle.
-      val shuffleState = shuffleStates(blockId.shuffleId)
-      val iter = shuffleState.allFileGroups.iterator
-      while (iter.hasNext) {
-        val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
-        if (segmentOpt.isDefined) {
-          val segment = segmentOpt.get
-          return new FileSegmentManagedBuffer(
-            transportConf, segment.file, segment.offset, segment.length)
-        }
-      }
-      throw new IllegalStateException("Failed to find shuffle block: " + blockId)
-    } else {
-      val file = blockManager.diskBlockManager.getFile(blockId)
-      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
-    }
+    val file = blockManager.diskBlockManager.getFile(blockId)
+    new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
   }
 
   /** Remove all the blocks / files and metadata related to a particular shuffle. */
@@ -209,17 +125,9 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
   private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
     shuffleStates.get(shuffleId) match {
       case Some(state) =>
-        if (consolidateShuffleFiles) {
-          for (fileGroup <- state.allFileGroups.asScala;
-               file <- fileGroup.files) {
-            file.delete()
-          }
-        } else {
-          for (mapId <- state.completedMapTasks.asScala;
-               reduceId <- 0 until state.numBuckets) {
-            val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
-            blockManager.diskBlockManager.getFile(blockId).delete()
-          }
+        for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
+          val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+          blockManager.diskBlockManager.getFile(blockId).delete()
         }
         logInfo("Deleted all files for shuffle " + shuffleId)
         true
@@ -229,10 +137,6 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
     }
   }
 
-  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
-    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
-  }
-
   private def cleanup(cleanupTime: Long) {
     shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
   }
@@ -241,59 +145,3 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
     metadataCleaner.cancel()
   }
 }
-
-private[spark] object FileShuffleBlockResolver {
-  /**
-   * A group of shuffle files, one per reducer.
-   * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
-   */
-  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
-    private var numBlocks: Int = 0
-
-    /**
-     * Stores the absolute index of each mapId in the files of this group. For instance,
-     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
-     */
-    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
-    /**
-     * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
-     * position in the file.
-     * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
-     * reducer.
-     */
-    private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-
-    def apply(bucketId: Int): File = files(bucketId)
-
-    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
-      assert(offsets.length == lengths.length)
-      mapIdToIndex(mapId) = numBlocks
-      numBlocks += 1
-      for (i <- 0 until offsets.length) {
-        blockOffsetsByReducer(i) += offsets(i)
-        blockLengthsByReducer(i) += lengths(i)
-      }
-    }
-
-    /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
-    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
-      val file = files(reducerId)
-      val blockOffsets = blockOffsetsByReducer(reducerId)
-      val blockLengths = blockLengthsByReducer(reducerId)
-      val index = mapIdToIndex.getOrElse(mapId, -1)
-      if (index >= 0) {
-        val offset = blockOffsets(index)
-        val length = blockLengths(index)
-        Some(new FileSegment(file, offset, length))
-      } else {
-        None
-      }
-    }
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d31aa68eb6954211c74621e8f5211716a361133a..bca3942f8c5553318e96cbd3b41a71f5fdcd3235 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -106,15 +106,6 @@ private[spark] class BlockManager(
     }
   }
 
-  // Check that we're not using external shuffle service with consolidated shuffle files.
-  if (externalShuffleServiceEnabled
-      && conf.getBoolean("spark.shuffle.consolidateFiles", false)
-      && shuffleManager.isInstanceOf[HashShuffleManager]) {
-    throw new UnsupportedOperationException("Cannot use external shuffle service with consolidated"
-      + " shuffle files in hash-based shuffle. Please disable spark.shuffle.consolidateFiles or "
-      + " switch to sort-based shuffle.")
-  }
-
   var blockManagerId: BlockManagerId = _
 
   // Address of the server that serves this executor's shuffle files. This is either an external
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 1f4595628216662f4b59c17fb296d10ce0aaa0f7..feb9533604ffbc823d19c903b36623be6c75f5fa 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -154,9 +154,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
 
   override def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
-    // If consolidation mode is used With HashShuffleMananger, the physical filename for the block
-    // is different from blockId.name. So the file returns here will not be exist, thus we avoid to
-    // delete the whole consolidated file by mistake.
     if (file.exists()) {
       file.delete()
     } else {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
deleted file mode 100644
index 491dc3659e184cad12a4e210ec79d6a66aeb26ef..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import java.io.{File, FileWriter}
-
-import scala.language.reflectiveCalls
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockResolver
-import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
-
-class HashShuffleManagerSuite extends SparkFunSuite with LocalSparkContext {
-  private val testConf = new SparkConf(false)
-
-  private def checkSegments(expected: FileSegment, buffer: ManagedBuffer) {
-    assert(buffer.isInstanceOf[FileSegmentManagedBuffer])
-    val segment = buffer.asInstanceOf[FileSegmentManagedBuffer]
-    assert(expected.file.getCanonicalPath === segment.getFile.getCanonicalPath)
-    assert(expected.offset === segment.getOffset)
-    assert(expected.length === segment.getLength)
-  }
-
-  test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
-
-    val conf = new SparkConf(false)
-    // reset after EACH object write. This is to ensure that there are bytes appended after
-    // an object is written. So if the codepaths assume writeObject is end of data, this should
-    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
-
-    sc = new SparkContext("local", "test", conf)
-
-    val shuffleBlockResolver =
-      SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
-
-    val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
-      new ShuffleWriteMetrics)
-    for (writer <- shuffle1.writers) {
-      writer.write("test1", "value")
-      writer.write("test2", "value")
-    }
-    for (writer <- shuffle1.writers) {
-      writer.commitAndClose()
-    }
-
-    val shuffle1Segment = shuffle1.writers(0).fileSegment()
-    shuffle1.releaseWriters(success = true)
-
-    val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
-      new ShuffleWriteMetrics)
-
-    for (writer <- shuffle2.writers) {
-      writer.write("test3", "value")
-      writer.write("test4", "vlue")
-    }
-    for (writer <- shuffle2.writers) {
-      writer.commitAndClose()
-    }
-    val shuffle2Segment = shuffle2.writers(0).fileSegment()
-    shuffle2.releaseWriters(success = true)
-
-    // Now comes the test :
-    // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
-    // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
-    // of block based on remaining data in file : which could mess things up when there is
-    // concurrent read and writes happening to the same shuffle group.
-
-    val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
-      new ShuffleWriteMetrics)
-    for (writer <- shuffle3.writers) {
-      writer.write("test3", "value")
-      writer.write("test4", "value")
-    }
-    for (writer <- shuffle3.writers) {
-      writer.commitAndClose()
-    }
-    // check before we register.
-    checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
-    shuffle3.releaseWriters(success = true)
-    checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
-    shuffleBlockResolver.removeShuffle(1)
-  }
-
-  def writeToFile(file: File, numBytes: Int) {
-    val writer = new FileWriter(file, true)
-    for (i <- 0 until numBytes) writer.write(i)
-    writer.close()
-  }
-}
diff --git a/docs/configuration.md b/docs/configuration.md
index 1a701f18881fe099f1d4f27ef1ca032471411cd6..3700051efb4488b705245b8cdce8722c0cb368e1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -390,16 +390,6 @@ Apart from these, the following properties are also available, and may be useful
     <code>spark.io.compression.codec</code>.
   </td>
 </tr>
-<tr>
-  <td><code>spark.shuffle.consolidateFiles</code></td>
-  <td>false</td>
-  <td>
-    If set to "true", consolidates intermediate files created during a shuffle. Creating fewer
-    files can improve filesystem performance for shuffles with large numbers of reduce tasks. It
-    is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option
-    might degrade performance on machines with many (>8) cores due to filesystem limitations.
-  </td>
-</tr>
 <tr>
   <td><code>spark.shuffle.file.buffer</code></td>
   <td>32k</td>
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 79beec4429a99afb8943a012826c86dae7267ed9..c5f93bb47f55c9133dde220ef596442fc5715797 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -50,9 +50,6 @@ import org.apache.spark.network.util.TransportConf;
  * of Executors. Each Executor must register its own configuration about where it stores its files
  * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
  * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
- *
- * Executors with shuffle file consolidation are not currently supported, as the index is stored in
- * the Executor's memory, unlike the IndexShuffleBlockResolver.
  */
 public class ExternalShuffleBlockResolver {
   private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
@@ -254,7 +251,6 @@ public class ExternalShuffleBlockResolver {
    * Hash-based shuffle data is simply stored as one file per block.
    * This logic is from FileShuffleBlockResolver.
    */
-  // TODO: Support consolidated hash shuffle files
   private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
     File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
     return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1c96b0958586f44b7c4f4e1757839d363a02605a..814a11e588ceb8ead1821a5393e421c6f034335a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -70,6 +70,10 @@ object MimaExcludes {
           "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
         ProblemFilters.exclude[IncompatibleMethTypeProblem](
           "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
+      ) ++
+      Seq(
+        ProblemFilters.exclude[MissingClassProblem](
+          "org.apache.spark.shuffle.FileShuffleBlockResolver$ShuffleFileGroup")
       )
     case v if v.startsWith("1.5") =>
       Seq(