From 8703898d3f2c6b6e08b3ef91da67876589aba184 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sun, 3 Nov 2013 00:34:53 -0700
Subject: [PATCH] Address Reynold's comments

---
 .../spark/storage/ShuffleBlockManager.scala   | 28 +++++++++++--------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d1e3074683..57b1a28543 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,7 +45,7 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
  * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
@@ -57,11 +57,13 @@ trait ShuffleBlocks {
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
+ * that specifies where in a given file the actual block data is located.
  *
  * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block
- * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles
- * destined for the block's reducer.
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each
+ * block stored in that file. In order to find the location of a shuffle block, we search all
+ * ShuffleFiles destined for the block's reducer.
  *
  */
 private[spark]
@@ -98,18 +100,22 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   private
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
-  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = {
+  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
     initializeShuffleMetadata(shuffleId, numBuckets)
 
     new ShuffleBlocks {
       override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
         val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
-        val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
-        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          if (consolidateShuffleFiles) {
+        var fileGroup: ShuffleFileGroup = null
+        val writers = if (consolidateShuffleFiles) {
+          fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize)
-          } else {
+          }
+        } else {
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             val blockFile = blockManager.diskBlockManager.getFile(blockId)
             blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
           }
@@ -142,8 +148,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   }
 
   private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
-    if (!consolidateShuffleFiles) { return null }
-
     val pool = shuffleToFileGroupPoolMap(shuffleId)
     var fileGroup = pool.getUnusedFileGroup()
 
-- 
GitLab