diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 6346db389463e74b3d7ff3bdead3a5e610cf492e..2f1b049ce4839f631c737d2d6cc0f9947ac2c93c 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ -import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} @@ -59,7 +58,7 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] -class ShuffleBlockManager(blockManager: BlockManager) extends Logging { +class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = @@ -83,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) - def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = + def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) private val shuffleState = shuffleStates(shuffleId) @@ -133,6 +132,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { shuffleState.unusedFileGroups.add(group) } } + } /** * Returns the physical file segment in which the given BlockId is located. @@ -177,7 +177,7 @@ object ShuffleBlockManager { * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * reducer. */ - private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ => + private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { new PrimitiveVector[Long]() }