From 1592adfa259860494353babfb48c80b7d1087379 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sat, 2 Nov 2013 00:19:04 -0700
Subject: [PATCH] Add documentation and address other comments

---
 .../spark/storage/DiskBlockManager.scala      | 12 ++---
 .../spark/storage/ShuffleBlockManager.scala   | 49 ++++++++++++-------
 2 files changed, 35 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index bde3d1f592..fcd2e97982 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,14 +55,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
    * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
    */
   def getBlockLocation(blockId: BlockId): FileSegment = {
-    if (blockId.isShuffle) {
-      val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
-      if (segment.isDefined) { return segment.get }
-      // If no special mapping found, assume standard block -> file mapping...
+    if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
+      shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
+    } else {
+      val file = getFile(blockId.name)
+      new FileSegment(file, 0, file.length())
     }
-
-    val file = getFile(blockId.name)
-    new FileSegment(file, 0, file.length())
   }
 
   def getFile(filename: String): File = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d718c87cab..d1e3074683 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,18 +45,24 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
- * per reducer.
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and
+ * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
- * it releases them for another task.
+ * blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per
+ * reducer per concurrently executing shuffle task. As soon as a task finishes writing to its
+ * shuffle files, it releases them for another task.
  * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
  *   - shuffleId: The unique id given to the entire shuffle stage.
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
+ * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block
+ * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles
+ * destined for the block's reducer.
+ *
  */
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
@@ -124,9 +130,9 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
     }
   }
 
-  def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
+  private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
     val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool())
-    if (prev == None) {
+    if (!prev.isDefined) {
       val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
       for (reducerId <- 0 until numBuckets) {
         reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
@@ -142,6 +148,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
     var fileGroup = pool.getUnusedFileGroup()
 
     // If we reuse a file group, ensure we maintain mapId monotonicity.
+    // This means we may create extra ShuffleFileGroups if we're trying to run a map task
+    // that is out-of-order with respect to its mapId (which may happen when failures occur).
     val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
     while (fileGroup != null && fileGroup.maxMapId >= mapId) {
       fileGroupsToReturn += fileGroup
@@ -170,21 +178,19 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 
   /**
    * Returns the physical file segment in which the given BlockId is located.
-   * If we have no special mapping, None will be returned.
+   * This function should only be called if shuffle file consolidation is enabled, as it is
+   * an error condition if we don't find the expected block.
    */
-  def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = {
+  def getBlockLocation(id: ShuffleBlockId): FileSegment = {
     // Search all files associated with the given reducer.
     // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
-    if (consolidateShuffleFiles) {
-      val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
-      for (file <- filesForReducer) {
-        val segment = file.getFileSegmentFor(id.mapId)
-        if (segment != None) { return segment }
-      }
-
-      logInfo("Failed to find shuffle block: " + id)
+    val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
+    for (file <- filesForReducer) {
+      val segment = file.getFileSegmentFor(id.mapId)
+      if (segment != None) { return segment.get }
     }
-    None
+
+    throw new IllegalStateException("Failed to find shuffle block: " + id)
   }
 
   private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
@@ -204,6 +210,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
+  /**
+   * Contains the set of mappers that have written to this file group, in the same order as they
+   * have written to their respective files.
+   */
   private val mapIds = new PrimitiveVector[Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
@@ -238,8 +248,9 @@ class ShuffleFile(val file: File) {
   /**
    * Consecutive offsets of blocks into the file, ordered by position in the file.
    * This ordering allows us to compute block lengths by examining the following block offset.
+   * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i).
    */
-  val blockOffsets = new PrimitiveVector[Long]()
+  private val blockOffsets = new PrimitiveVector[Long]()
 
   /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
   private var shuffleFileGroup : ShuffleFileGroup = _
-- 
GitLab