diff --git a/conf/java-opts b/conf/java-opts index 59a9d98fc93b6ca28e14b1dd0ff9bc15c2442607..3bd5c7980388b73bc87c1e372e2e7f045b7968e8 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=50 +-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 7f844883a545b839cf01f51f5eece03fed67ee5e..771255e4414b5c39850e06038edce160c91b9b5c 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -162,12 +162,6 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - var threadPool = CustomBlockedInMemoryShuffle.newDaemonFixedThreadPool( CustomBlockedInMemoryShuffle.MaxRxConnections) @@ -200,6 +194,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -227,7 +236,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 96ac923b25b35a10cc1a46f7c84fd5252ec6c364..220289b007a8ad24313e95e3beb6ce066ab22936 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -148,12 +148,6 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool( CustomBlockedLocalFileShuffle.MaxRxConnections) @@ -187,6 +181,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -214,7 +223,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala index 6cb7d64046cd77194bc8eaf866fe968e63bead83..4806e14e80e7ab2f0d713859de2b1abd18de20ff 100644 --- a/src/scala/spark/CustomParallelInMemoryShuffle.scala +++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala @@ -108,12 +108,6 @@ extends Shuffle[K, V, C] with Logging { var threadPool = CustomParallelInMemoryShuffle.newDaemonFixedThreadPool( CustomParallelInMemoryShuffle.MaxRxConnections) - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, CustomParallelInMemoryShuffle.MaxRxConnections) - @@ -144,6 +138,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -171,7 +180,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index 0d17ace4389f92169d464185b57046940764b5ae..45f629e414d277cab5ba662b4d1897317c6ab19a 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -99,12 +99,6 @@ extends Shuffle[K, V, C] with Logging { var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool( CustomParallelLocalFileShuffle.MaxRxConnections) - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, CustomParallelLocalFileShuffle.MaxRxConnections) - @@ -135,6 +129,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -162,7 +171,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index 0e32a983b8105edbc7921990bbf6ea7613ad0d03..e6579fc5aa4e2fec6723e24445a202f43e57e002 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -132,12 +132,6 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool( HttpBlockedLocalFileShuffle.MaxRxConnections) @@ -170,6 +164,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -197,7 +206,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index ca6a3c2cd0359217bf4d9983fbfc204e676ba6ff..b6ddee6d2fc32e8dca46abaaf5a12720000150d6 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -96,12 +96,6 @@ extends Shuffle[K, V, C] with Logging { var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool( HttpParallelLocalFileShuffle.MaxRxConnections) - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) - @@ -131,6 +125,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -158,7 +167,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/ManualBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala index ce20c0143afce5cdedbe0a0fe5301139ceca1e7e..13b90ce40ff2ebd6ab7015822747cb816972ea9d 100644 --- a/src/scala/spark/ManualBlockedLocalFileShuffle.scala +++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala @@ -139,12 +139,6 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - var threadPool = ManualBlockedLocalFileShuffle.newDaemonFixedThreadPool( ManualBlockedLocalFileShuffle.MaxRxConnections) @@ -177,6 +171,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -204,7 +213,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index f0534bebc11ad12e996e18e8e7783288bce1aec2..936e8bee48fa7a2bf86176239377e0c12640bfd7 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -149,12 +149,6 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - var threadPool = Shuffle.newDaemonFixedThreadPool( Shuffle.MaxRxConnections) @@ -189,6 +183,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -384,7 +393,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null diff --git a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala index 52b52e36008e7c364d5a074fdbc16f4f3ae8d36a..54b4e8e13006302276e469cc843fda611ac947b7 100644 --- a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala @@ -99,12 +99,6 @@ extends Shuffle[K, V, C] with Logging { var threadPool = Shuffle.newDaemonFixedThreadPool(Shuffle.MaxRxConnections) - // Start consumer - var shuffleConsumer = new ShuffleConsumer(mergeCombiners) - shuffleConsumer.setDaemon(true) - shuffleConsumer.start() - logInfo("ShuffleConsumer started...") - while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, Shuffle.MaxRxConnections) - @@ -136,6 +130,21 @@ extends Shuffle[K, V, C] with Logging { } threadPool.shutdown() + + // Start consumer + // TODO: Consumption is delayed until everything has been received. + // Otherwise it interferes with network performance + var shuffleConsumer = new ShuffleConsumer(mergeCombiners) + shuffleConsumer.setDaemon(true) + shuffleConsumer.start() + logInfo("ShuffleConsumer started...") + + // Don't return until consumption is finished + // TODO: Replace with a lock later. + while (receivedData.size > 0) { + Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + } + combiners }) } @@ -332,7 +341,7 @@ extends Shuffle[K, V, C] with Logging { extends Thread with Logging { override def run: Unit = { // Run until all splits are here - while (hasSplits < totalSplits) { + while (receivedData.size > 0) { var splitIndex = -1 var recvByteArray: Array[Byte] = null