Skip to content
Snippets Groups Projects
Commit 5d460253 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #228 from pwendell/master

Document missing configs and set shuffle consolidation to false.
parents 72b69615 75d161b3
No related branches found
No related tags found
No related merge requests found
......@@ -62,7 +62,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
// TODO: Remove this once the shuffle file consolidation feature is stable.
val consolidateShuffleFiles =
System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
System.getProperty("spark.shuffle.consolidateFiles", "false").toBoolean
private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
......
......@@ -5,9 +5,9 @@ import java.io.{FileWriter, File}
import scala.collection.mutable
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
......@@ -16,6 +16,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
val rootDirs = rootDir0.getName + "," + rootDir1.getName
println("Created root dirs: " + rootDirs)
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
val consolidateProp = "spark.shuffle.consolidateFiles"
val oldConsolidate = Option(System.getProperty(consolidateProp))
System.setProperty(consolidateProp, "true")
val shuffleBlockManager = new ShuffleBlockManager(null) {
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
......@@ -23,6 +29,10 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
var diskBlockManager: DiskBlockManager = _
override def afterAll() {
oldConsolidate.map(c => System.setProperty(consolidateProp, c))
}
override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
shuffleBlockManager.idToSegmentMap.clear()
......
......@@ -327,7 +327,42 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
<td>false</td>
<td>
If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance if you run shuffles with large numbers of reduce tasks.
</td>
</tr>
<tr>
<tr>
<td>spark.speculation</td>
<td>false</td>
<td>
If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
</td>
</tr>
<tr>
<td>spark.speculation.interval</td>
<td>100</td>
<td>
How often Spark will check for tasks to speculate, in milliseconds.
</td>
</tr>
<tr>
<td>spark.speculation.quantile</td>
<td>0.75</td>
<td>
Percentage of tasks which must be complete before speculation is enabled for a particular stage.
</td>
</tr>
<tr>
<td>spark.speculation.multiplier</td>
<td>1.5</td>
<td>
How many times slower a task is than the median to be considered for speculation.
</td>
</tr>
</table>
# Environment Variables
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment