Skip to content
Snippets Groups Projects
Commit 75d161b3 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Forcing shuffle consolidation in DiskBlockManagerSuite

parent 1450b8ef
No related branches found
No related tags found
No related merge requests found
......@@ -5,9 +5,9 @@ import java.io.{FileWriter, File}
import scala.collection.mutable
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
......@@ -16,6 +16,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
val rootDirs = rootDir0.getName + "," + rootDir1.getName
println("Created root dirs: " + rootDirs)
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
val consolidateProp = "spark.shuffle.consolidateFiles"
val oldConsolidate = Option(System.getProperty(consolidateProp))
System.setProperty(consolidateProp, "true")
val shuffleBlockManager = new ShuffleBlockManager(null) {
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
......@@ -23,6 +29,10 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
var diskBlockManager: DiskBlockManager = _
override def afterAll() {
oldConsolidate.map(c => System.setProperty(consolidateProp, c))
}
override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
shuffleBlockManager.idToSegmentMap.clear()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment