Skip to content
Snippets Groups Projects
Commit 1592adfa authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Add documentation and address other comments

parent 7d44dec9
No related branches found
No related tags found
No related merge requests found
......@@ -55,14 +55,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/
def getBlockLocation(blockId: BlockId): FileSegment = {
if (blockId.isShuffle) {
val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
if (segment.isDefined) { return segment.get }
// If no special mapping found, assume standard block -> file mapping...
if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else {
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
}
val file = getFile(blockId.name)
new FileSegment(file, 0, file.length())
}
def getFile(filename: String): File = {
......
......@@ -45,18 +45,24 @@ trait ShuffleBlocks {
}
/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
* per reducer.
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and
* per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
* it releases them for another task.
* blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per
* reducer per concurrently executing shuffle task. As soon as a task finishes writing to its
* shuffle files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block
* stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles
* destined for the block's reducer.
*
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
......@@ -124,9 +130,9 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
}
}
def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool())
if (prev == None) {
if (!prev.isDefined) {
val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
for (reducerId <- 0 until numBuckets) {
reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
......@@ -142,6 +148,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
var fileGroup = pool.getUnusedFileGroup()
// If we reuse a file group, ensure we maintain mapId monotonicity.
// This means we may create extra ShuffleFileGroups if we're trying to run a map task
// that is out-of-order with respect to its mapId (which may happen when failures occur).
val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
while (fileGroup != null && fileGroup.maxMapId >= mapId) {
fileGroupsToReturn += fileGroup
......@@ -170,21 +178,19 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
/**
* Returns the physical file segment in which the given BlockId is located.
* If we have no special mapping, None will be returned.
* This function should only be called if shuffle file consolidation is enabled, as it is
* an error condition if we don't find the expected block.
*/
def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = {
def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all files associated with the given reducer.
// This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
if (consolidateShuffleFiles) {
val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
for (file <- filesForReducer) {
val segment = file.getFileSegmentFor(id.mapId)
if (segment != None) { return segment }
}
logInfo("Failed to find shuffle block: " + id)
val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
for (file <- filesForReducer) {
val segment = file.getFileSegmentFor(id.mapId)
if (segment != None) { return segment.get }
}
None
throw new IllegalStateException("Failed to find shuffle block: " + id)
}
private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
......@@ -204,6 +210,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
*/
private[spark]
class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
/**
* Contains the set of mappers that have written to this file group, in the same order as they
* have written to their respective files.
*/
private val mapIds = new PrimitiveVector[Int]()
files.foreach(_.setShuffleFileGroup(this))
......@@ -238,8 +248,9 @@ class ShuffleFile(val file: File) {
/**
* Consecutive offsets of blocks into the file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i).
*/
val blockOffsets = new PrimitiveVector[Long]()
private val blockOffsets = new PrimitiveVector[Long]()
/** Back pointer to whichever ShuffleFileGroup this file is a part of. */
private var shuffleFileGroup : ShuffleFileGroup = _
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment