diff --git a/conf/java-opts b/conf/java-opts index e52b9e8681ce91e6a5e969b4eb3af9bd72488c81..45aae318d25ed8e046b9a18f5ed5bad041b19007 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1,7 +1,7 @@ -Dspark.shuffle.class=spark.TrackedCustomBlockedInMemoryShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 --Dspark.shuffle.trackerStrategy=spark.SelectRandomShuffleTrackerStrategy +-Dspark.shuffle.trackerStrategy=spark.LimitConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=40 -Dspark.shuffle.maxTxConnections=120 -Dspark.shuffle.blockSize=4096 diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala index 6eabfd7dc7fd6259dd5c2168710a31bf8eb44324..38c48c478127bc350dbbe933a4a7c1ec91d0a708 100644 --- a/src/scala/spark/ShuffleTrackerStrategy.scala +++ b/src/scala/spark/ShuffleTrackerStrategy.scala @@ -118,8 +118,6 @@ extends ShuffleTrackerStrategy with Logging { splitIndex = ranGen.nextInt(numMappers) } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex)) - logInfo("%d".format(splitIndex)) - return splitIndex } @@ -189,8 +187,7 @@ extends ShuffleTrackerStrategy with Logging { hasBlocksPerInputSplit(i)(j) assert(blocksRemaining >= 0) - individualEstimates(i)(j) = - 1.0 * blocksRemaining * Shuffle.BlockSize / + individualEstimates(i)(j) = 1.0 * blocksRemaining * Shuffle.BlockSize / { if (speedPerInputSplit(i)(j) <= 0.0) 1.0 else speedPerInputSplit(i)(j) } } @@ -268,9 +265,8 @@ extends ShuffleTrackerStrategy with Logging { def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { if (splitIndex != -1) { - curConnectionsPerLoc(splitIndex) = curConnectionsPerLoc(splitIndex) + 1 - totalConnectionsPerLoc(splitIndex) = - totalConnectionsPerLoc(splitIndex) + 1 + curConnectionsPerLoc(splitIndex) += 1 + totalConnectionsPerLoc(splitIndex) += 1 } } @@ -287,9 +283,10 @@ extends ShuffleTrackerStrategy with Logging { 1.0 * receptionStat.bytesReceived / (receptionStat.timeSpent + 1.0) } + logInfo("%d received %d bytes in %d millis".format(reducerSplitInfo.splitId, receptionStat.bytesReceived, receptionStat.timeSpent)) + // Update current connections to the mapper - curConnectionsPerLoc(receptionStat.serverSplitIndex) = - curConnectionsPerLoc(receptionStat.serverSplitIndex) - 1 + curConnectionsPerLoc(receptionStat.serverSplitIndex) -= 1 // TODO: This assertion can legally fail when ShuffleClient times out while // waiting for tracker response and decides to go to a random server @@ -301,3 +298,137 @@ extends ShuffleTrackerStrategy with Logging { } } } + +/** + * Shuffle tracker strategy that allows reducers to create receiving threads + * depending on their estimated time remaining + */ +class LimitConnectionsShuffleTrackerStrategy +extends ShuffleTrackerStrategy with Logging { + // Number of mappers + private var numMappers = -1 + // Number of reducers + private var numReducers = -1 + private var outputLocs: Array[SplitInfo] = null + + private var ranGen = new Random + + // Data structures from reducers' perspectives + private var totalBlocksPerInputSplit: Array[Array[Int]] = null + private var hasBlocksPerInputSplit: Array[Array[Int]] = null + + // Stored in bytes per millisecond + private var speedPerInputSplit: Array[Array[Double]] = null + + private var curConnectionsPerReducer: Array[Int] = null + private var maxConnectionsPerReducer: Array[Int] = null + + // The order of elements in the outputLocs (splitIndex) is used to pass + // information back and forth between the tracker, mappers, and reducers + def initialize(outputLocs_ : Array[SplitInfo]): Unit = { + outputLocs = outputLocs_ + + numMappers = outputLocs.size + + // All the outputLocs have totalBlocksPerOutputSplit of same size + numReducers = outputLocs(0).totalBlocksPerOutputSplit.size + + // Now initialize the data structures + totalBlocksPerInputSplit = Array.tabulate(numReducers, numMappers)((i,j) => + outputLocs(j).totalBlocksPerOutputSplit(i)) + hasBlocksPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => 0) + + // Initialize to -1 + speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1.0) + + curConnectionsPerReducer = Array.tabulate(numReducers)(_ => 0) + maxConnectionsPerReducer = Array.tabulate(numReducers)(_ => Shuffle.MaxRxConnections) + } + + def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized { + var splitIndex = -1 + + // Estimate time remaining to finish receiving for all reducer/mapper pairs + // If speed is unknown or zero then make it 1 to give a large estimate + var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0.0) + for (i <- 0 until numReducers; j <- 0 until numMappers) { + var blocksRemaining = totalBlocksPerInputSplit(i)(j) - + hasBlocksPerInputSplit(i)(j) + assert(blocksRemaining >= 0) + + individualEstimates(i)(j) = 1.0 * blocksRemaining * Shuffle.BlockSize / + { if (speedPerInputSplit(i)(j) <= 0.0) 1.0 else speedPerInputSplit(i)(j) } + } + + // Check if all speedPerInputSplit entries have non-zero values + var estimationComplete = true + for (i <- 0 until numReducers; j <- 0 until numMappers) { + if (speedPerInputSplit(i)(j) < 0.0) { + estimationComplete = false + } + } + + if (estimationComplete) { + // Estimate time remaining to finish receiving for each reducer + var completionEstimates = Array.tabulate(numReducers)( + individualEstimates(_).foldLeft(Double.MinValue)(Math.max(_,_))) + + val fastestEstimate = + completionEstimates.foldLeft(Double.MaxValue)(Math.min(_,_)) + val slowestEstimate = + completionEstimates.foldLeft(Double.MinValue)(Math.max(_,_)) + + // Set maxConnectionsPerReducer for all reducers proportional to their + // estimated time remaining with slowestEstimate reducer having the max + for (i <- 0 until numReducers) { + maxConnectionsPerReducer(i) = + ((completionEstimates(i) / slowestEstimate) * Shuffle.MaxRxConnections).toInt + } + } + + // Send back a splitIndex if this reducer is within its limit + if (curConnectionsPerReducer(reducerSplitInfo.splitId) < + maxConnectionsPerReducer(reducerSplitInfo.splitId)) { + + do { + splitIndex = ranGen.nextInt(numMappers) + } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex)) + } + + return splitIndex + } + + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { + if (splitIndex != -1) { + curConnectionsPerReducer(reducerSplitInfo.splitId) += 1 + } + } + + def deleteReducerFrom(reducerSplitInfo: SplitInfo, + receptionStat: ReceptionStats): Unit = synchronized { + // Update hasBlocksPerInputSplit for reducerSplitInfo + hasBlocksPerInputSplit(reducerSplitInfo.splitId) = + reducerSplitInfo.hasBlocksPerInputSplit + + // Store the last known speed. Add 1 to avoid divide-by-zero. Ignore 0 bytes + // TODO: We are forgetting the old speed. Can use averaging at some point. + if (receptionStat.bytesReceived > 0) { + speedPerInputSplit(reducerSplitInfo.splitId)(receptionStat.serverSplitIndex) = + 1.0 * receptionStat.bytesReceived / (receptionStat.timeSpent + 1.0) + } + + logInfo("%d received %d bytes in %d millis".format(reducerSplitInfo.splitId, receptionStat.bytesReceived, receptionStat.timeSpent)) + + // Update current threads by this reducer + curConnectionsPerReducer(reducerSplitInfo.splitId) -= 1 + + // TODO: This assertion can legally fail when ShuffleClient times out while + // waiting for tracker response and decides to go to a random server + // assert(curConnectionsPerLoc(receptionStat.serverSplitIndex) >= 0) + + // Just in case + if (curConnectionsPerReducer(reducerSplitInfo.splitId) < 0) { + curConnectionsPerReducer(reducerSplitInfo.splitId) = 0 + } + } +} diff --git a/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala b/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala index 7c7281c9d765aa325e1841407cf64f219ffd073f..7afbfb34c7913f04811df370e1eb7ba508ac08aa 100644 --- a/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala @@ -187,8 +187,9 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits && numThreadsToCreate > 0) { // Receive which split to pull from the tracker logInfo("Talking to tracker...") + val startTime = System.currentTimeMillis val splitIndex = getTrackerSelectedSplit(myId) - logInfo("Got %d from tracker...".format(splitIndex)) + logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime)) if (splitIndex != -1) { val selectedSplitInfo = outputLocs(splitIndex) diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index 798aba9598b217a59a8e77066c6c7ffb3386eddf..26fa23ad0c2257f1745d902c3c7c7dc25f5513de 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -175,8 +175,9 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits && numThreadsToCreate > 0) { // Receive which split to pull from the tracker logInfo("Talking to tracker...") + val startTime = System.currentTimeMillis val splitIndex = getTrackerSelectedSplit(myId) - logInfo("Got %d from tracker...".format(splitIndex)) + logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime)) if (splitIndex != -1) { val selectedSplitInfo = outputLocs(splitIndex)