Skip to content
Snippets Groups Projects
Commit 1ba11b1c authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Minor cleanup in ShuffleBlockManager

parent 6201e5e2
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger ...@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
...@@ -59,7 +58,7 @@ private[spark] trait ShuffleWriterGroup { ...@@ -59,7 +58,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer. * files within a ShuffleFileGroups associated with the block's reducer.
*/ */
private[spark] private[spark]
class ShuffleBlockManager(blockManager: BlockManager) extends Logging { class ShuffleBlockManager(blockManager: BlockManager) {
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable. // TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles = val consolidateShuffleFiles =
...@@ -83,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -83,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
private private
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup { new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
private val shuffleState = shuffleStates(shuffleId) private val shuffleState = shuffleStates(shuffleId)
...@@ -133,6 +132,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -133,6 +132,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
shuffleState.unusedFileGroups.add(group) shuffleState.unusedFileGroups.add(group)
} }
} }
}
/** /**
* Returns the physical file segment in which the given BlockId is located. * Returns the physical file segment in which the given BlockId is located.
...@@ -177,7 +177,7 @@ object ShuffleBlockManager { ...@@ -177,7 +177,7 @@ object ShuffleBlockManager {
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer. * reducer.
*/ */
private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ => private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]() new PrimitiveVector[Long]()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment