diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala index cfc5fcc1b00e85385c8f82c309746c35fd7a91d6..6eabfd7dc7fd6259dd5c2168710a31bf8eb44324 100644 --- a/src/scala/spark/ShuffleTrackerStrategy.scala +++ b/src/scala/spark/ShuffleTrackerStrategy.scala @@ -151,7 +151,7 @@ extends ShuffleTrackerStrategy with Logging { private var hasBlocksPerInputSplit: Array[Array[Int]] = null // Stored in bytes per millisecond - private var speedPerInputSplit: Array[Array[Int]] = null + private var speedPerInputSplit: Array[Array[Double]] = null private var curConnectionsPerLoc: Array[Int] = null private var totalConnectionsPerLoc: Array[Int] = null @@ -172,7 +172,7 @@ extends ShuffleTrackerStrategy with Logging { hasBlocksPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => 0) // Initialize to -1 - speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1) + speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1.0) curConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0) totalConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0) @@ -183,22 +183,21 @@ extends ShuffleTrackerStrategy with Logging { // Estimate time remaining to finish receiving for all reducer/mapper pairs // If speed is unknown or zero then make it 1 to give a large estimate - var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0) + var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0.0) for (i <- 0 until numReducers; j <- 0 until numMappers) { var blocksRemaining = totalBlocksPerInputSplit(i)(j) - hasBlocksPerInputSplit(i)(j) assert(blocksRemaining >= 0) individualEstimates(i)(j) = - { if (blocksRemaining < 0) 0 else blocksRemaining } * - Shuffle.BlockSize / - { if (speedPerInputSplit(i)(j) <= 0) 1 else speedPerInputSplit(i)(j) } + 1.0 * blocksRemaining * Shuffle.BlockSize / + { if (speedPerInputSplit(i)(j) <= 0.0) 1.0 else speedPerInputSplit(i)(j) } } - // Check if all individualEstimates entries have non-zero values + // Check if all speedPerInputSplit entries have non-zero values var estimationComplete = true for (i <- 0 until numReducers; j <- 0 until numMappers) { - if (speedPerInputSplit(i)(j) < 0) { + if (speedPerInputSplit(i)(j) < 0.0) { estimationComplete = false } } @@ -285,7 +284,7 @@ extends ShuffleTrackerStrategy with Logging { // TODO: We are forgetting the old speed. Can use averaging at some point. if (receptionStat.bytesReceived > 0) { speedPerInputSplit(reducerSplitInfo.splitId)(receptionStat.serverSplitIndex) = - receptionStat.bytesReceived / (receptionStat.timeSpent + 1) + 1.0 * receptionStat.bytesReceived / (receptionStat.timeSpent + 1.0) } // Update current connections to the mapper