Skip to content
Snippets Groups Projects
Commit 6201e5e2 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Refactor ShuffleBlockManager to reduce public interface

- ShuffleBlocks has been removed and replaced by ShuffleWriterGroup.
- ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup.
- ShuffleFile has been removed and its contents are now within ShuffleFileGroup.
- ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask.
parent b0cf19fe
No related branches found
No related tags found
No related merge requests found
...@@ -146,27 +146,26 @@ private[spark] class ShuffleMapTask( ...@@ -146,27 +146,26 @@ private[spark] class ShuffleMapTask(
metrics = Some(context.taskMetrics) metrics = Some(context.taskMetrics)
val blockManager = SparkEnv.get.blockManager val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null val shuffleBlockManager = blockManager.shuffleBlockManager
var buckets: ShuffleWriterGroup = null var shuffle: ShuffleWriterGroup = null
var success = false var success = false
try { try {
// Obtain all the block writers for shuffle blocks. // Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)
// Write the map output to its associated buckets. // Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) { for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]] val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1) val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair) shuffle.writers(bucketId).write(pair)
} }
// Commit the writes. Get the size of each bucket block (total block size). // Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L var totalBytes = 0L
var totalTime = 0L var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit() writer.commit()
val size = writer.fileSegment().length val size = writer.fileSegment().length
totalBytes += size totalBytes += size
...@@ -185,15 +184,15 @@ private[spark] class ShuffleMapTask( ...@@ -185,15 +184,15 @@ private[spark] class ShuffleMapTask(
} catch { case e: Exception => } catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes // If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark. // and throw the exception upstream to Spark.
if (buckets != null) { if (shuffle != null) {
buckets.writers.foreach(_.revertPartialWrites()) shuffle.writers.foreach(_.revertPartialWrites())
} }
throw e throw e
} finally { } finally {
// Release the writers back to the shuffle block manager. // Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) { if (shuffle != null && shuffle.writers != null) {
buckets.writers.foreach(_.close()) shuffle.writers.foreach(_.close())
shuffle.releaseWriters(buckets, success) shuffle.releaseWriters(success)
} }
// Execute the callbacks on task completion. // Execute the callbacks on task completion.
context.executeOnCompleteCallbacks() context.executeOnCompleteCallbacks()
......
...@@ -18,31 +18,23 @@ ...@@ -18,31 +18,23 @@
package org.apache.spark.storage package org.apache.spark.storage
import java.io.File import java.io.File
import java.util
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
private[spark] /** A group of writers for a ShuffleMapTask, one writer per reducer. */
class ShuffleWriterGroup( private[spark] trait ShuffleWriterGroup {
val mapId: Int, val writers: Array[BlockObjectWriter]
val fileGroup: ShuffleFileGroup,
val writers: Array[BlockObjectWriter])
private[spark]
trait ShuffleBlocks {
/** Get a group of writers for this map task. */
def acquireWriters(mapId: Int): ShuffleWriterGroup
/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(group: ShuffleWriterGroup, success: Boolean) def releaseWriters(success: Boolean)
} }
/** /**
...@@ -50,9 +42,9 @@ trait ShuffleBlocks { ...@@ -50,9 +42,9 @@ trait ShuffleBlocks {
* per reducer (this set of files is called a ShuffleFileGroup). * per reducer (this set of files is called a ShuffleFileGroup).
* *
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* reducer per concurrently executing shuffle task. As soon as a task finishes writing to its * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* shuffle files, it releases them for another task. * files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage. * - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id) * - bucketId: The id of the output partition (i.e., reducer id)
...@@ -62,10 +54,9 @@ trait ShuffleBlocks { ...@@ -62,10 +54,9 @@ trait ShuffleBlocks {
* that specifies where in a given file the actual block data is located. * that specifies where in a given file the actual block data is located.
* *
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* block stored in that file. In order to find the location of a shuffle block, we search all * each block stored in each file. In order to find the location of a shuffle block, we search the
* ShuffleFiles destined for the block's reducer. * files within a ShuffleFileGroups associated with the block's reducer.
*
*/ */
private[spark] private[spark]
class ShuffleBlockManager(blockManager: BlockManager) extends Logging { class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
...@@ -74,102 +65,74 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -74,102 +65,74 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
val consolidateShuffleFiles = val consolidateShuffleFiles =
System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
/** /**
* Contains a pool of unused ShuffleFileGroups. * Contains all the state related to a particular shuffle. This includes a pool of unused
* One group is needed per concurrent thread (mapper) operating on the same shuffle. * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/ */
private class ShuffleFileGroupPool { private class ShuffleState() {
private val nextFileId = new AtomicInteger(0) val nextFileId = new AtomicInteger(0)
private val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
def getNextFileId() = nextFileId.getAndIncrement()
def getUnusedFileGroup() = unusedFileGroups.poll()
def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group)
} }
type ShuffleId = Int type ShuffleId = Int
private val shuffleToFileGroupPoolMap = new TimeStampedHashMap[ShuffleId, ShuffleFileGroupPool] private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
/**
* Maps reducers (of a particular shuffle) to the set of files that have blocks destined for them.
* Each reducer will have one ShuffleFile per concurrent thread that executed during mapping.
*/
private val shuffleToReducerToFilesMap =
new TimeStampedHashMap[ShuffleId, Array[ConcurrentLinkedQueue[ShuffleFile]]]
private private
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) =
initializeShuffleMetadata(shuffleId, numBuckets) new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
new ShuffleBlocks { private val shuffleState = shuffleStates(shuffleId)
override def acquireWriters(mapId: Int): ShuffleWriterGroup = { private var fileGroup: ShuffleFileGroup = null
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
var fileGroup: ShuffleFileGroup = null val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
val writers = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup()
fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize) }
} } else {
} else { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId)
val blockFile = blockManager.diskBlockManager.getFile(blockId) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
} }
new ShuffleWriterGroup(mapId, fileGroup, writers)
} }
override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) { override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) { if (consolidateShuffleFiles) {
val fileGroup = group.fileGroup
if (success) { if (success) {
fileGroup.addMapper(group.mapId) val offsets = writers.map(_.fileSegment().offset)
for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) { fileGroup.recordMapOutput(mapId, offsets)
shuffleFile.recordMapOutput(writer.fileSegment().offset)
}
} }
recycleFileGroup(shuffleId, fileGroup) recycleFileGroup(fileGroup)
} }
} }
}
}
private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) { private def getUnusedFileGroup(): ShuffleFileGroup = {
val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool()) val fileGroup = shuffleState.unusedFileGroups.poll()
if (!prev.isDefined) { if (fileGroup != null) fileGroup else newFileGroup()
val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets) }
for (reducerId <- 0 until numBuckets) {
reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]() private def newFileGroup(): ShuffleFileGroup = {
val fileId = shuffleState.nextFileId.getAndIncrement()
val files = Array.tabulate[File](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.diskBlockManager.getFile(filename)
}
val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
shuffleState.allFileGroups.add(fileGroup)
fileGroup
} }
shuffleToReducerToFilesMap.put(shuffleId, reducerToFilesMap)
}
}
private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { private def recycleFileGroup(group: ShuffleFileGroup) {
val pool = shuffleToFileGroupPoolMap(shuffleId) shuffleState.unusedFileGroups.add(group)
val fileGroup = pool.getUnusedFileGroup()
if (fileGroup == null) {
val fileId = pool.getNextFileId()
val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
val file = blockManager.diskBlockManager.getFile(filename)
val shuffleFile = new ShuffleFile(file)
shuffleToReducerToFilesMap(shuffleId)(bucketId).add(shuffleFile)
shuffleFile
} }
new ShuffleFileGroup(shuffleId, fileId, files)
} else {
fileGroup
} }
}
private def recycleFileGroup(shuffleId: Int, fileGroup: ShuffleFileGroup) {
shuffleToFileGroupPoolMap(shuffleId).returnFileGroup(fileGroup)
}
/** /**
* Returns the physical file segment in which the given BlockId is located. * Returns the physical file segment in which the given BlockId is located.
...@@ -177,13 +140,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -177,13 +140,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* an error condition if we don't find the expected block. * an error condition if we don't find the expected block.
*/ */
def getBlockLocation(id: ShuffleBlockId): FileSegment = { def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all files associated with the given reducer. // Search all file groups associated with this shuffle.
val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) val shuffleState = shuffleStates(id.shuffleId)
for (file <- filesForReducer) { for (fileGroup <- shuffleState.allFileGroups) {
val segment = file.getFileSegmentFor(id.mapId) val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
if (segment != None) { return segment.get } if (segment.isDefined) { return segment.get }
} }
throw new IllegalStateException("Failed to find shuffle block: " + id) throw new IllegalStateException("Failed to find shuffle block: " + id)
} }
...@@ -192,78 +154,62 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -192,78 +154,62 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
} }
private def cleanup(cleanupTime: Long) { private def cleanup(cleanupTime: Long) {
shuffleToFileGroupPoolMap.clearOldValues(cleanupTime) shuffleStates.clearOldValues(cleanupTime)
shuffleToReducerToFilesMap.clearOldValues(cleanupTime)
} }
} }
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private[spark]
class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
files.foreach(_.setShuffleFileGroup(this))
def apply(bucketId: Int) = files(bucketId)
def addMapper(mapId: Int) {
mapIdToIndex(mapId) = mapIdToIndex.size
}
def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1)
}
/**
* A single, consolidated shuffle file that may contain many actual blocks. All blocks are destined
* to the same reducer.
*/
private[spark] private[spark]
class ShuffleFile(val file: File) { object ShuffleBlockManager {
/** /**
* Consecutive offsets of blocks into the file, ordered by position in the file. * A group of shuffle files, one per reducer.
* This ordering allows us to compute block lengths by examining the following block offset. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
* Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array.
*/ */
private val blockOffsets = new PrimitiveVector[Long]() private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ =>
new PrimitiveVector[Long]()
}
/** Back pointer to whichever ShuffleFileGroup this file is a part of. */ def numBlocks = mapIdToIndex.size
private var shuffleFileGroup : ShuffleFileGroup = _
// Required due to circular dependency between ShuffleFileGroup and ShuffleFile. def apply(bucketId: Int) = files(bucketId)
def setShuffleFileGroup(group: ShuffleFileGroup) {
assert(shuffleFileGroup == null)
shuffleFileGroup = group
}
def recordMapOutput(offset: Long) { def recordMapOutput(mapId: Int, offsets: Array[Long]) {
blockOffsets += offset mapIdToIndex(mapId) = numBlocks
} for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
}
}
/** /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
* Returns the FileSegment associated with the given map task, or def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
* None if this ShuffleFile does not have an entry for it. val file = files(reducerId)
*/ val blockOffsets = blockOffsetsByReducer(reducerId)
def getFileSegmentFor(mapId: Int): Option[FileSegment] = { val index = mapIdToIndex.getOrElse(mapId, -1)
val index = shuffleFileGroup.indexOf(mapId) if (index >= 0) {
if (index >= 0) { val offset = blockOffsets(index)
val offset = blockOffsets(index) val length =
val length = if (index + 1 < numBlocks) {
if (index + 1 < blockOffsets.length) { blockOffsets(index + 1) - offset
blockOffsets(index + 1) - offset } else {
} else { file.length() - offset
file.length() - offset }
} assert(length >= 0)
assert(length >= 0) Some(new FileSegment(file, offset, length))
Some(new FileSegment(file, offset, length)) } else {
} else { None
None }
} }
} }
} }
...@@ -38,19 +38,19 @@ object StoragePerfTester { ...@@ -38,19 +38,19 @@ object StoragePerfTester {
val blockManager = sc.env.blockManager val blockManager = sc.env.blockManager
def writeOutputBytes(mapId: Int, total: AtomicLong) = { def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer()) new KryoSerializer())
val buckets = shuffle.acquireWriters(mapId) val writers = shuffle.writers
for (i <- 1 to recordsPerMap) { for (i <- 1 to recordsPerMap) {
buckets.writers(i % numOutputSplits).write(writeData) writers(i % numOutputSplits).write(writeData)
} }
buckets.writers.map {w => writers.map {w =>
w.commit() w.commit()
total.addAndGet(w.fileSegment().length) total.addAndGet(w.fileSegment().length)
w.close() w.close()
} }
shuffle.releaseWriters(buckets, true) shuffle.releaseWriters(true)
} }
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment