diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c71a48fd4329d6860f7b2861ce762e999c..45d3b8b9b8725936e79cd9c320e375560e7f798f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
   }
 
   /** Close this writer, passing along whether the map completed */
-  override def stop(success: Boolean): Option[MapStatus] = {
+  override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+    var success = initiallySuccess
     try {
       if (stopping) {
         return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
       stopping = true
       if (success) {
         try {
-          return Some(commitWritesAndBuildStatus())
+          Some(commitWritesAndBuildStatus())
         } catch {
           case e: Exception =>
+            success = false
             revertWrites()
             throw e
         }
       } else {
         revertWrites()
-        return None
+        None
       }
     } finally {
       // Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
     var totalBytes = 0L
     var totalTime = 0L
     val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
-      writer.commit()
-      writer.close()
+      writer.commitAndClose()
       val size = writer.fileSegment().length
       totalBytes += size
       totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
     if (shuffle != null && shuffle.writers != null) {
       for (writer <- shuffle.writers) {
-        writer.revertPartialWrites()
-        writer.close()
+        writer.revertPartialWritesAndClose()
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07fa18bc486a5bfc1ff53d014b6c285e862..9a356d0dbaf17665ec77a05c545eb26d24e49f3b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -94,8 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
         for (elem <- elements) {
           writer.write(elem)
         }
-        writer.commit()
-        writer.close()
+        writer.commitAndClose()
         val segment = writer.fileSegment()
         offsets(id + 1) = segment.offset + segment.length
         lengths(id) = segment.length
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index a2687e6be4e34de1b440f1c3a59753abbc1cb4f8..01d46e1ffc9600e049de01d5be27b0069ddcb049 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -39,16 +39,16 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
   def isOpen: Boolean
 
   /**
-   * Flush the partial writes and commit them as a single atomic block. Return the
-   * number of bytes written for this commit.
+   * Flush the partial writes and commit them as a single atomic block.
    */
-  def commit(): Long
+  def commitAndClose(): Unit
 
   /**
    * Reverts writes that haven't been flushed yet. Callers should invoke this function
-   * when there are runtime exceptions.
+   * when there are runtime exceptions. This method will not throw, though it may be
+   * unsuccessful in truncating written data.
    */
-  def revertPartialWrites()
+  def revertPartialWritesAndClose()
 
   /**
    * Writes an object.
@@ -57,6 +57,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 
   /**
    * Returns the file segment of committed data that this Writer has written.
+   * This is only valid after commitAndClose() has been called.
    */
   def fileSegment(): FileSegment
 
@@ -108,7 +109,7 @@ private[spark] class DiskBlockObjectWriter(
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
   private val initialPosition = file.length()
-  private var lastValidPosition = initialPosition
+  private var finalPosition: Long = -1
   private var initialized = false
   private var _timeWriting = 0L
 
@@ -116,7 +117,6 @@ private[spark] class DiskBlockObjectWriter(
     fos = new FileOutputStream(file, true)
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
-    lastValidPosition = initialPosition
     bs = compressStream(new BufferedOutputStream(ts, bufferSize))
     objOut = serializer.newInstance().serializeStream(bs)
     initialized = true
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
     if (initialized) {
       // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
       //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
-      val prevPos = lastValidPosition
-      lastValidPosition = channel.position()
-      lastValidPosition - prevPos
-    } else {
-      // lastValidPosition is zero if stream is uninitialized
-      lastValidPosition
+      close()
     }
+    finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-    if (initialized) {
-      // Discard current writes. We do this by flushing the outstanding writes and
-      // truncate the file to the last valid position.
-      objOut.flush()
-      bs.flush()
-      channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes and then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+    try {
+      if (initialized) {
+        objOut.flush()
+        bs.flush()
+        close()
+      }
+
+      val truncateStream = new FileOutputStream(file, true)
+      try {
+        truncateStream.getChannel.truncate(initialPosition)
+      } finally {
+        truncateStream.close()
+      }
+    } catch {
+      case e: Exception =>
+        logError("Uncaught exception while reverting partial writes to file " + file, e)
     }
   }
 
@@ -188,6 +196,7 @@ private[spark] class DiskBlockObjectWriter(
 
   // Only valid if called after commit()
   override def bytesWritten: Long = {
-    lastValidPosition - initialPosition
+    assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
+    finalPosition - initialPosition
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 7beb55c411e7147e84c800176c5b4501b768796f..28aa35bc7e14782a6a25e9e5b1a00e4d2eae6d68 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -144,7 +144,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
         if (consolidateShuffleFiles) {
           if (success) {
             val offsets = writers.map(_.fileSegment().offset)
-            fileGroup.recordMapOutput(mapId, offsets)
+            val lengths = writers.map(_.fileSegment().length)
+            fileGroup.recordMapOutput(mapId, offsets, lengths)
           }
           recycleFileGroup(fileGroup)
         } else {
@@ -247,6 +248,8 @@ object ShuffleBlockManager {
    * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
    */
   private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+    private var numBlocks: Int = 0
+
     /**
      * Stores the absolute index of each mapId in the files of this group. For instance,
      * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
@@ -254,23 +257,27 @@ object ShuffleBlockManager {
     private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
 
     /**
-     * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
-     * This ordering allows us to compute block lengths by examining the following block offset.
+     * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+     * position in the file.
      * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
      * reducer.
      */
     private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
       new PrimitiveVector[Long]()
     }
-
-    def numBlocks = mapIdToIndex.size
+    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
 
     def apply(bucketId: Int) = files(bucketId)
 
-    def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+      assert(offsets.length == lengths.length)
       mapIdToIndex(mapId) = numBlocks
+      numBlocks += 1
       for (i <- 0 until offsets.length) {
         blockOffsetsByReducer(i) += offsets(i)
+        blockLengthsByReducer(i) += lengths(i)
       }
     }
 
@@ -278,16 +285,11 @@ object ShuffleBlockManager {
     def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
       val file = files(reducerId)
       val blockOffsets = blockOffsetsByReducer(reducerId)
+      val blockLengths = blockLengthsByReducer(reducerId)
       val index = mapIdToIndex.getOrElse(mapId, -1)
       if (index >= 0) {
         val offset = blockOffsets(index)
-        val length =
-          if (index + 1 < numBlocks) {
-            blockOffsets(index + 1) - offset
-          } else {
-            file.length() - offset
-          }
-        assert(length >= 0)
+        val length = blockLengths(index)
         Some(new FileSegment(file, offset, length))
       } else {
         None
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b34512ef9eb603bfa1bacf6012d8a21c1a8d0510..cb67a1c039f20c80f8b18877d26a0b5e12b45afd 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -199,7 +199,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Flush the disk writer's contents to disk, and update relevant variables
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 54c33107441362bf56bb5a022f2fb2f2cfc1cc4c..6e415a2bd8ce2406db42ed9d8f44e3a7a3efb10f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -270,9 +270,10 @@ private[spark] class ExternalSorter[K, V, C](
     // How many elements we have in each partition
     val elementsPerPartition = new Array[Long](numPartitions)
 
-    // Flush the disk writer's contents to disk, and update relevant variables
+    // Flush the disk writer's contents to disk, and update relevant variables.
+    // The writer is closed at the end of this process, and cannot be reused.
     def flush() = {
-      writer.commit()
+      writer.commitAndClose()
       val bytesWritten = writer.bytesWritten
       batchSizes.append(bytesWritten)
       _diskBytesSpilled += bytesWritten
@@ -293,7 +294,6 @@ private[spark] class ExternalSorter[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           flush()
-          writer.close()
           writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize)
         }
       }
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aaa77140497326f6db25e508d18e02ef54cc19c7..985ac9394738ce24138434601d2a3394314ca26d 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -22,11 +22,14 @@ import java.io.{File, FileWriter}
 import scala.collection.mutable
 import scala.language.reflectiveCalls
 
+import akka.actor.Props
 import com.google.common.io.Files
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
   private val testConf = new SparkConf(false)
@@ -121,6 +124,88 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
     newFile.delete()
   }
 
+  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+    assert (segment1.offset === segment2.offset)
+    assert (segment1.length === segment2.length)
+  }
+
+  test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
+
+    val serializer = new JavaSerializer(testConf)
+    val confCopy = testConf.clone
+    // reset after EACH object write. This is to ensure that there are bytes appended after
+    // an object is written. So if the codepaths assume writeObject is end of data, this should
+    // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+    confCopy.set("spark.serializer.objectStreamReset", "1")
+
+    val securityManager = new org.apache.spark.SecurityManager(confCopy)
+    // Do not use the shuffleBlockManager above !
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
+      securityManager)
+    val master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
+      confCopy)
+    val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
+      securityManager, null)
+
+    try {
+
+      val shuffleManager = store.shuffleBlockManager
+
+      val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer)
+      for (writer <- shuffle1.writers) {
+        writer.write("test1")
+        writer.write("test2")
+      }
+      for (writer <- shuffle1.writers) {
+        writer.commitAndClose()
+      }
+
+      val shuffle1Segment = shuffle1.writers(0).fileSegment()
+      shuffle1.releaseWriters(success = true)
+
+      val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf))
+
+      for (writer <- shuffle2.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle2.writers) {
+        writer.commitAndClose()
+      }
+      val shuffle2Segment = shuffle2.writers(0).fileSegment()
+      shuffle2.releaseWriters(success = true)
+
+      // Now comes the test :
+      // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
+      // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
+      // of block based on remaining data in file : which could mess things up when there is concurrent read
+      // and writes happening to the same shuffle group.
+
+      val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf))
+      for (writer <- shuffle3.writers) {
+        writer.write("test3")
+        writer.write("test4")
+      }
+      for (writer <- shuffle3.writers) {
+        writer.commitAndClose()
+      }
+      // check before we register.
+      checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+      shuffle3.releaseWriters(success = true)
+      checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
+      shuffleManager.removeShuffle(1)
+    } finally {
+
+      if (store != null) {
+        store.stop()
+      }
+      actorSystem.shutdown()
+      actorSystem.awaitTermination()
+    }
+  }
+
   def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
     val segment = diskBlockManager.getBlockLocation(blockId)
     assert(segment.file.getName === filename)
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c35615a7113dfe12d23e9d9ce82549466b310..8a05fcb449aa61b4025c707c1974539d7ece1189 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -61,10 +61,9 @@ object StoragePerfTester {
       for (i <- 1 to recordsPerMap) {
         writers(i % numOutputSplits).write(writeData)
       }
-      writers.map {w =>
-        w.commit()
+      writers.map { w =>
+        w.commitAndClose()
         total.addAndGet(w.fileSegment().length)
-        w.close()
       }
 
       shuffle.releaseWriters(true)