diff --git a/conf/java-opts b/conf/java-opts index 45aae318d25ed8e046b9a18f5ed5bad041b19007..cd8338fff1921b5cb55990424da4c60ccf3281c4 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1,10 +1,10 @@ -Dspark.shuffle.class=spark.TrackedCustomBlockedInMemoryShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 --Dspark.shuffle.trackerStrategy=spark.LimitConnectionsShuffleTrackerStrategy +-Dspark.shuffle.trackerStrategy=spark.BalanceRemainingShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=40 -Dspark.shuffle.maxTxConnections=120 --Dspark.shuffle.blockSize=4096 +-Dspark.shuffle.blockSize=512 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=5000 -Dspark.shuffle.maxChatTime=500 diff --git a/src/scala/spark/Shuffle.scala b/src/scala/spark/Shuffle.scala index 0a547fb71509d01408b7d81b14a6d59cbfdf16fe..f2d790f72702b07bf11e09529d3821d154b6e381 100644 --- a/src/scala/spark/Shuffle.scala +++ b/src/scala/spark/Shuffle.scala @@ -23,6 +23,10 @@ trait Shuffle[K, V, C] { */ private object Shuffle extends Logging { + // Tracker communication constants + val ReducerEntering = 0 + val ReducerLeaving = 1 + // ShuffleTracker info private var MasterHostAddress_ = System.getProperty( "spark.shuffle.masterHostAddress", InetAddress.getLocalHost.getHostAddress) @@ -54,7 +58,6 @@ extends Logging { private var ThrottleFraction_ = System.getProperty( "spark.shuffle.throttleFraction", "2.0").toDouble - def MasterHostAddress = MasterHostAddress_ def MasterTrackerPort = MasterTrackerPort_ diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala index 38c48c478127bc350dbbe933a4a7c1ec91d0a708..3d9cd7b75d4167104f49f5fa1498e9f394b9dca5 100644 --- a/src/scala/spark/ShuffleTrackerStrategy.scala +++ b/src/scala/spark/ShuffleTrackerStrategy.scala @@ -1,7 +1,8 @@ package spark -import java.util.Random +import java.util.{BitSet, Random} +import scala.collection.mutable.ArrayBuffer import scala.util.Sorting._ /** @@ -11,11 +12,11 @@ trait ShuffleTrackerStrategy { // Initialize def initialize(outputLocs_ : Array[SplitInfo]): Unit - // Select a split and send it back - def selectSplit(reducerSplitInfo: SplitInfo): Int + // Select a set of splits and send back + def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] // Update internal stats if things could be sent back successfully - def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit // A reducer is done. Update internal stats def deleteReducerFrom(reducerSplitInfo: SplitInfo, @@ -50,9 +51,11 @@ extends ShuffleTrackerStrategy with Logging { totalConnectionsPerLoc = Array.tabulate(numSources)(_ => 0) } - def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized { + def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized { var minConnections = Int.MaxValue - var splitIndex = -1 + var minIndex = -1 + + var splitIndices = ArrayBuffer[Int]() for (i <- 0 until numSources) { // TODO: Use of MaxRxConnections instead of MaxTxConnections is @@ -62,15 +65,19 @@ extends ShuffleTrackerStrategy with Logging { totalConnectionsPerLoc(i) < minConnections && !reducerSplitInfo.hasSplitsBitVector.get(i)) { minConnections = totalConnectionsPerLoc(i) - splitIndex = i + minIndex = i } } + + if (minIndex != -1) { + splitIndices += minIndex + } - return splitIndex + return splitIndices } - def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { - if (splitIndex != -1) { + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized { + splitIndices.foreach { splitIndex => curConnectionsPerLoc(splitIndex) = curConnectionsPerLoc(splitIndex) + 1 totalConnectionsPerLoc(splitIndex) = totalConnectionsPerLoc(splitIndex) + 1 @@ -111,17 +118,17 @@ extends ShuffleTrackerStrategy with Logging { numMappers = outputLocs.size } - def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized { + def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized { var splitIndex = -1 do { splitIndex = ranGen.nextInt(numMappers) } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex)) - return splitIndex + return ArrayBuffer(splitIndex) } - def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized { } def deleteReducerFrom(reducerSplitInfo: SplitInfo, @@ -176,7 +183,7 @@ extends ShuffleTrackerStrategy with Logging { totalConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0) } - def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized { + def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized { var splitIndex = -1 // Estimate time remaining to finish receiving for all reducer/mapper pairs @@ -260,11 +267,11 @@ extends ShuffleTrackerStrategy with Logging { } } - return splitIndex + return ArrayBuffer(splitIndex) } - def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { - if (splitIndex != -1) { + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized { + splitIndices.foreach { splitIndex => curConnectionsPerLoc(splitIndex) += 1 totalConnectionsPerLoc(splitIndex) += 1 } @@ -345,8 +352,8 @@ extends ShuffleTrackerStrategy with Logging { maxConnectionsPerReducer = Array.tabulate(numReducers)(_ => Shuffle.MaxRxConnections) } - def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized { - var splitIndex = -1 + def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized { + var splitIndices = ArrayBuffer[Int]() // Estimate time remaining to finish receiving for all reducer/mapper pairs // If speed is unknown or zero then make it 1 to give a large estimate @@ -389,17 +396,33 @@ extends ShuffleTrackerStrategy with Logging { // Send back a splitIndex if this reducer is within its limit if (curConnectionsPerReducer(reducerSplitInfo.splitId) < maxConnectionsPerReducer(reducerSplitInfo.splitId)) { + + var i = maxConnectionsPerReducer(reducerSplitInfo.splitId) - + curConnectionsPerReducer(reducerSplitInfo.splitId) + + var temp = reducerSplitInfo.hasSplitsBitVector.clone.asInstanceOf[BitSet] + temp.flip(0, numMappers) + + i = Math.min(i, temp.cardinality) - do { - splitIndex = ranGen.nextInt(numMappers) - } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex)) + while (i > 0) { + var splitIndex = -1 + + do { + splitIndex = ranGen.nextInt(numMappers) + } while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex)) + + reducerSplitInfo.hasSplitsBitVector.set(splitIndex) + splitIndices += splitIndex + i -= 1 + } } - return splitIndex + return splitIndices } - def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized { - if (splitIndex != -1) { + def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized { + splitIndices.foreach { splitIndex => curConnectionsPerReducer(reducerSplitInfo.splitId) += 1 } } diff --git a/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala b/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala index 7afbfb34c7913f04811df370e1eb7ba508ac08aa..62a33067c26508863f2ec2b4ac88904f8d9ab128 100644 --- a/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedInMemoryShuffle.scala @@ -188,20 +188,22 @@ extends Shuffle[K, V, C] with Logging { // Receive which split to pull from the tracker logInfo("Talking to tracker...") val startTime = System.currentTimeMillis - val splitIndex = getTrackerSelectedSplit(myId) - logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime)) + val splitIndices = getTrackerSelectedSplit(myId) + logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime)) - if (splitIndex != -1) { - val selectedSplitInfo = outputLocs(splitIndex) - val requestSplit = - "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) - - threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, - requestSplit, myId)) - - // splitIndex is in transit. Will be unset in the ShuffleClient - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex) + if (splitIndices.size > 0) { + splitIndices.foreach { splitIndex => + val selectedSplitInfo = outputLocs(splitIndex) + val requestSplit = + "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) + + threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, + requestSplit, myId)) + + // splitIndex is in transit. Will be unset in the ShuffleClient + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex) + } } } else { // Tracker replied back with a NO. Sleep for a while. @@ -282,13 +284,13 @@ extends Shuffle[K, V, C] with Logging { } // Talks to the tracker and receives instruction - private def getTrackerSelectedSplit(myId: Int): Int = { + private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = { // Local status of hasSplitsBitVector and splitsInRequestBitVector val localSplitInfo = getLocalSplitInfo(myId) // DO NOT talk to the tracker if all the required splits are already busy if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) { - return -1 + return ArrayBuffer[Int]() } val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress, @@ -299,30 +301,30 @@ extends Shuffle[K, V, C] with Logging { val oisTracker = new ObjectInputStream(clientSocketToTracker.getInputStream) - var selectedSplitIndex = -1 + var selectedSplitIndices = ArrayBuffer[Int]() // Setup the timeout mechanism var timeOutTask = new TimerTask { override def run: Unit = { logInfo("Waited enough for tracker response... Take random response...") - // sockets will be closed in finally + // sockets will be closed in finally + // TODO: Sometimes timer wont go off // TODO: Selecting randomly here. Tracker won't know about it and get an // asssertion failure when this thread leaves - selectedSplitIndex = selectRandomSplit + selectedSplitIndices = ArrayBuffer(selectRandomSplit) } } var timeOutTimer = new Timer // TODO: Which timeout to use? - timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) + // timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) try { // Send intention - oosTracker.writeObject( - TrackedCustomBlockedInMemoryShuffle.ReducerEntering) + oosTracker.writeObject(Shuffle.ReducerEntering) oosTracker.flush() // Send what this reducer has @@ -330,7 +332,7 @@ extends Shuffle[K, V, C] with Logging { oosTracker.flush() // Receive reply from the tracker - selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int] + selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]] // Turn the timer OFF timeOutTimer.cancel() @@ -344,7 +346,7 @@ extends Shuffle[K, V, C] with Logging { clientSocketToTracker.close() } - return selectedSplitIndex + return selectedSplitIndices } class ShuffleTracker(outputLocs: Array[SplitInfo]) @@ -391,31 +393,29 @@ extends Shuffle[K, V, C] with Logging { // Receive intention val reducerIntention = ois.readObject.asInstanceOf[Int] - if (reducerIntention == - TrackedCustomBlockedInMemoryShuffle.ReducerEntering) { + if (reducerIntention == Shuffle.ReducerEntering) { // Receive what the reducer has val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] - // Select split and update stats if necessary - var selectedSplitIndex = -1 + // Select splits and update stats if necessary + var selectedSplitIndices = ArrayBuffer[Int]() trackerStrategy.synchronized { - selectedSplitIndex = trackerStrategy.selectSplit( + selectedSplitIndices = trackerStrategy.selectSplit( reducerSplitInfo) } // Send reply back - oos.writeObject(selectedSplitIndex) + oos.writeObject(selectedSplitIndices) oos.flush() // Update internal stats, only if receiver got the reply trackerStrategy.synchronized { trackerStrategy.AddReducerToSplit(reducerSplitInfo, - selectedSplitIndex) + selectedSplitIndices) } } - else if (reducerIntention == - TrackedCustomBlockedInMemoryShuffle.ReducerLeaving) { + else if (reducerIntention == Shuffle.ReducerLeaving) { val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] @@ -647,8 +647,7 @@ extends Shuffle[K, V, C] with Logging { try { // Send intention - oosTracker.writeObject( - TrackedCustomBlockedInMemoryShuffle.ReducerLeaving) + oosTracker.writeObject(Shuffle.ReducerLeaving) oosTracker.flush() // Send reducerSplitInfo @@ -698,10 +697,6 @@ extends Shuffle[K, V, C] with Logging { } object TrackedCustomBlockedInMemoryShuffle extends Logging { - // Tracker communication constants - val ReducerEntering = 0 - val ReducerLeaving = 1 - // Cache for keeping the splits around val splitsCache = new HashMap[String, Array[Byte]] diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index 26fa23ad0c2257f1745d902c3c7c7dc25f5513de..eb2a11c9669fbac65b582d8e7b5d8480db61122e 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -176,20 +176,22 @@ extends Shuffle[K, V, C] with Logging { // Receive which split to pull from the tracker logInfo("Talking to tracker...") val startTime = System.currentTimeMillis - val splitIndex = getTrackerSelectedSplit(myId) - logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime)) + val splitIndices = getTrackerSelectedSplit(myId) + logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime)) - if (splitIndex != -1) { - val selectedSplitInfo = outputLocs(splitIndex) - val requestSplit = - "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) - - threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, - requestSplit, myId)) - - // splitIndex is in transit. Will be unset in the ShuffleClient - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex) + if (splitIndices.size > 0) { + splitIndices.foreach { splitIndex => + val selectedSplitInfo = outputLocs(splitIndex) + val requestSplit = + "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) + + threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, + requestSplit, myId)) + + // splitIndex is in transit. Will be unset in the ShuffleClient + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex) + } } } else { // Tracker replied back with a NO. Sleep for a while. @@ -270,13 +272,13 @@ extends Shuffle[K, V, C] with Logging { } // Talks to the tracker and receives instruction - private def getTrackerSelectedSplit(myId: Int): Int = { + private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = { // Local status of hasSplitsBitVector and splitsInRequestBitVector val localSplitInfo = getLocalSplitInfo(myId) // DO NOT talk to the tracker if all the required splits are already busy if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) { - return -1 + return ArrayBuffer[Int]() } val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress, @@ -287,30 +289,30 @@ extends Shuffle[K, V, C] with Logging { val oisTracker = new ObjectInputStream(clientSocketToTracker.getInputStream) - var selectedSplitIndex = -1 + var selectedSplitIndices = ArrayBuffer[Int]() // Setup the timeout mechanism var timeOutTask = new TimerTask { override def run: Unit = { logInfo("Waited enough for tracker response... Take random response...") - // sockets will be closed in finally + // sockets will be closed in finally + // TODO: Sometimes timer wont go off // TODO: Selecting randomly here. Tracker won't know about it and get an // asssertion failure when this thread leaves - selectedSplitIndex = selectRandomSplit + selectedSplitIndices = ArrayBuffer(selectRandomSplit) } } var timeOutTimer = new Timer // TODO: Which timeout to use? - timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) + // timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) try { // Send intention - oosTracker.writeObject( - TrackedCustomBlockedLocalFileShuffle.ReducerEntering) + oosTracker.writeObject(Shuffle.ReducerEntering) oosTracker.flush() // Send what this reducer has @@ -318,7 +320,7 @@ extends Shuffle[K, V, C] with Logging { oosTracker.flush() // Receive reply from the tracker - selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int] + selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]] // Turn the timer OFF timeOutTimer.cancel() @@ -332,7 +334,7 @@ extends Shuffle[K, V, C] with Logging { clientSocketToTracker.close() } - return selectedSplitIndex + return selectedSplitIndices } class ShuffleTracker(outputLocs: Array[SplitInfo]) @@ -379,31 +381,29 @@ extends Shuffle[K, V, C] with Logging { // Receive intention val reducerIntention = ois.readObject.asInstanceOf[Int] - if (reducerIntention == - TrackedCustomBlockedLocalFileShuffle.ReducerEntering) { + if (reducerIntention == Shuffle.ReducerEntering) { // Receive what the reducer has val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] - // Select split and update stats if necessary - var selectedSplitIndex = -1 + // Select splits and update stats if necessary + var selectedSplitIndices = ArrayBuffer[Int]() trackerStrategy.synchronized { - selectedSplitIndex = trackerStrategy.selectSplit( + selectedSplitIndices = trackerStrategy.selectSplit( reducerSplitInfo) } // Send reply back - oos.writeObject(selectedSplitIndex) + oos.writeObject(selectedSplitIndices) oos.flush() // Update internal stats, only if receiver got the reply trackerStrategy.synchronized { trackerStrategy.AddReducerToSplit(reducerSplitInfo, - selectedSplitIndex) + selectedSplitIndices) } } - else if (reducerIntention == - TrackedCustomBlockedLocalFileShuffle.ReducerLeaving) { + else if (reducerIntention == Shuffle.ReducerLeaving) { val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] @@ -635,8 +635,7 @@ extends Shuffle[K, V, C] with Logging { try { // Send intention - oosTracker.writeObject( - TrackedCustomBlockedLocalFileShuffle.ReducerLeaving) + oosTracker.writeObject(Shuffle.ReducerLeaving) oosTracker.flush() // Send reducerSplitInfo @@ -686,10 +685,6 @@ extends Shuffle[K, V, C] with Logging { } object TrackedCustomBlockedLocalFileShuffle extends Logging { - // Tracker communication constants - val ReducerEntering = 0 - val ReducerLeaving = 1 - private var initialized = false private var nextShuffleId = new AtomicLong(0) diff --git a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala index 3badcc4da2085d526afccd590775b4e6097cdde9..9869626e5943b45c9ad228357e7888c3682f747d 100644 --- a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala @@ -113,20 +113,23 @@ extends Shuffle[K, V, C] with Logging { while (hasSplits < totalSplits && numThreadsToCreate > 0) { // Receive which split to pull from the tracker logInfo("Talking to tracker...") - val splitIndex = getTrackerSelectedSplit(myId) - logInfo("Got %d from tracker...".format(splitIndex)) + val startTime = System.currentTimeMillis + val splitIndices = getTrackerSelectedSplit(myId) + logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime)) - if (splitIndex != -1) { - val selectedSplitInfo = outputLocs(splitIndex) - val requestSplit = - "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) - - threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, - requestSplit, myId)) - - // splitIndex is in transit. Will be unset in the ShuffleClient - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex) + if (splitIndices.size > 0) { + splitIndices.foreach { splitIndex => + val selectedSplitInfo = outputLocs(splitIndex) + val requestSplit = + "%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId) + + threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo, + requestSplit, myId)) + + // splitIndex is in transit. Will be unset in the ShuffleClient + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex) + } } } else { // Tracker replied back with a NO. Sleep for a while. @@ -200,13 +203,13 @@ extends Shuffle[K, V, C] with Logging { } // Talks to the tracker and receives instruction - private def getTrackerSelectedSplit(myId: Int): Int = { + private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = { // Local status of hasSplitsBitVector and splitsInRequestBitVector val localSplitInfo = getLocalSplitInfo(myId) // DO NOT talk to the tracker if all the required splits are already busy if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) { - return -1 + return ArrayBuffer[Int]() } val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress, @@ -217,30 +220,30 @@ extends Shuffle[K, V, C] with Logging { val oisTracker = new ObjectInputStream(clientSocketToTracker.getInputStream) - var selectedSplitIndex = -1 + var selectedSplitIndices = ArrayBuffer[Int]() // Setup the timeout mechanism var timeOutTask = new TimerTask { override def run: Unit = { logInfo("Waited enough for tracker response... Take random response...") - // sockets will be closed in finally + // sockets will be closed in finally + // TODO: Sometimes timer wont go off // TODO: Selecting randomly here. Tracker won't know about it and get an // asssertion failure when this thread leaves - selectedSplitIndex = selectRandomSplit + selectedSplitIndices = ArrayBuffer(selectRandomSplit) } } var timeOutTimer = new Timer // TODO: Which timeout to use? - timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) + // timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval) try { // Send intention - oosTracker.writeObject( - TrackedCustomParallelLocalFileShuffle.ReducerEntering) + oosTracker.writeObject(Shuffle.ReducerEntering) oosTracker.flush() // Send what this reducer has @@ -248,7 +251,7 @@ extends Shuffle[K, V, C] with Logging { oosTracker.flush() // Receive reply from the tracker - selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int] + selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]] // Turn the timer OFF timeOutTimer.cancel() @@ -262,7 +265,7 @@ extends Shuffle[K, V, C] with Logging { clientSocketToTracker.close() } - return selectedSplitIndex + return selectedSplitIndices } class ShuffleTracker(outputLocs: Array[SplitInfo]) @@ -309,31 +312,29 @@ extends Shuffle[K, V, C] with Logging { // Receive intention val reducerIntention = ois.readObject.asInstanceOf[Int] - if (reducerIntention == - TrackedCustomParallelLocalFileShuffle.ReducerEntering) { + if (reducerIntention == Shuffle.ReducerEntering) { // Receive what the reducer has val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] // Select split and update stats if necessary - var selectedSplitIndex = -1 + var selectedSplitIndices = ArrayBuffer[Int]() trackerStrategy.synchronized { - selectedSplitIndex = trackerStrategy.selectSplit( + selectedSplitIndices = trackerStrategy.selectSplit( reducerSplitInfo) } // Send reply back - oos.writeObject(selectedSplitIndex) + oos.writeObject(selectedSplitIndices) oos.flush() // Update internal stats, only if receiver got the reply trackerStrategy.synchronized { trackerStrategy.AddReducerToSplit(reducerSplitInfo, - selectedSplitIndex) + selectedSplitIndices) } } - else if (reducerIntention == - TrackedCustomParallelLocalFileShuffle.ReducerLeaving) { + else if (reducerIntention == Shuffle.ReducerLeaving) { val reducerSplitInfo = ois.readObject.asInstanceOf[SplitInfo] @@ -557,8 +558,7 @@ extends Shuffle[K, V, C] with Logging { try { // Send intention - oosTracker.writeObject( - TrackedCustomParallelLocalFileShuffle.ReducerLeaving) + oosTracker.writeObject(Shuffle.ReducerLeaving) oosTracker.flush() // Send reducerSplitInfo @@ -606,10 +606,6 @@ extends Shuffle[K, V, C] with Logging { } object TrackedCustomParallelLocalFileShuffle extends Logging { - // Tracker communication constants - val ReducerEntering = 0 - val ReducerLeaving = 1 - private var initialized = false private var nextShuffleId = new AtomicLong(0)