Skip to content
Snippets Groups Projects
Commit 0647ec97 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Clean up shuffle files once their metadata is gone

Previously, we would only clean the in-memory metadata for consolidated
shuffle files.

Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.
parent 440e531a
No related branches found
No related tags found
No related merge requests found
...@@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) {
* Contains all the state related to a particular shuffle. This includes a pool of unused * Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/ */
private class ShuffleState() { private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0) val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
} }
type ShuffleId = Int type ShuffleId = Int
...@@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup { new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId) private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null private var fileGroup: ShuffleFileGroup = null
...@@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
fileGroup.recordMapOutput(mapId, offsets) fileGroup.recordMapOutput(mapId, offsets)
} }
recycleFileGroup(fileGroup) recycleFileGroup(fileGroup)
} else {
shuffleState.completedMapTasks.add(mapId)
} }
} }
...@@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) {
} }
private def cleanup(cleanupTime: Long) { private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime) shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => {
if (consolidateShuffleFiles) {
for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
file.delete()
}
} else {
for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
blockManager.diskBlockManager.getFile(blockId).delete()
}
}
})
} }
} }
......
...@@ -27,7 +27,7 @@ import org.apache.spark.Logging ...@@ -27,7 +27,7 @@ import org.apache.spark.Logging
class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging { class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
val name = cleanerType.toString val name = cleanerType.toString
private val delaySeconds = MetadataCleaner.getDelaySeconds private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10) private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true) private val timer = new Timer(name + " cleanup timer", true)
......
...@@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging { ...@@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
def toMap: immutable.Map[A, B] = iterator.toMap def toMap: immutable.Map[A, B] = iterator.toMap
/** /**
* Removes old key-value pairs that have timestamp earlier than `threshTime` * Removes old key-value pairs that have timestamp earlier than `threshTime`,
* calling the supplied function on each such entry before removing.
*/ */
def clearOldValues(threshTime: Long) { def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
val iterator = internalMap.entrySet().iterator() val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) { while (iterator.hasNext) {
val entry = iterator.next() val entry = iterator.next()
if (entry.getValue._2 < threshTime) { if (entry.getValue._2 < threshTime) {
f(entry.getKey, entry.getValue._1)
logDebug("Removing key " + entry.getKey) logDebug("Removing key " + entry.getKey)
iterator.remove() iterator.remove()
} }
} }
} }
/**
* Removes old key-value pairs that have timestamp earlier than `threshTime`
*/
def clearOldValues(threshTime: Long) {
clearOldValues(threshTime, (_, _) => ())
}
private def currentTime: Long = System.currentTimeMillis() private def currentTime: Long = System.currentTimeMillis()
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment