From 4545df67cfd32b19c1da382fb77aa0c28f41e825 Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Thu, 30 Dec 2010 17:10:20 -0800
Subject: [PATCH] Consumption is delayed until everything has been received. 
 Otherwise it interferes with network performance.

---
 conf/java-opts                                |  2 +-
 .../spark/CustomBlockedInMemoryShuffle.scala  | 23 +++++++++++++------
 .../spark/CustomBlockedLocalFileShuffle.scala | 23 +++++++++++++------
 .../spark/CustomParallelInMemoryShuffle.scala | 23 +++++++++++++------
 .../CustomParallelLocalFileShuffle.scala      | 23 +++++++++++++------
 .../spark/HttpBlockedLocalFileShuffle.scala   | 23 +++++++++++++------
 .../spark/HttpParallelLocalFileShuffle.scala  | 23 +++++++++++++------
 .../spark/ManualBlockedLocalFileShuffle.scala | 23 +++++++++++++------
 ...TrackedCustomBlockedLocalFileShuffle.scala | 23 +++++++++++++------
 ...rackedCustomParallelLocalFileShuffle.scala | 23 +++++++++++++------
 10 files changed, 145 insertions(+), 64 deletions(-)

diff --git a/conf/java-opts b/conf/java-opts
index 59a9d98fc9..3bd5c79803 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=50
+-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500
diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala
index 7f844883a5..771255e441 100644
--- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala
+++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala
@@ -162,12 +162,6 @@ extends Shuffle[K, V, C] with Logging {
       receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
       combiners = new HashMap[K, C]
       
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       var threadPool = CustomBlockedInMemoryShuffle.newDaemonFixedThreadPool(
         CustomBlockedInMemoryShuffle.MaxRxConnections)
         
@@ -200,6 +194,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -227,7 +236,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
index 96ac923b25..220289b007 100644
--- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
@@ -148,12 +148,6 @@ extends Shuffle[K, V, C] with Logging {
       receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
       combiners = new HashMap[K, C]
       
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool(
         CustomBlockedLocalFileShuffle.MaxRxConnections)
         
@@ -187,6 +181,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -214,7 +223,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala
index 6cb7d64046..4806e14e80 100644
--- a/src/scala/spark/CustomParallelInMemoryShuffle.scala
+++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala
@@ -108,12 +108,6 @@ extends Shuffle[K, V, C] with Logging {
       var threadPool = CustomParallelInMemoryShuffle.newDaemonFixedThreadPool(
         CustomParallelInMemoryShuffle.MaxRxConnections)
         
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-        
       while (hasSplits < totalSplits) {
         var numThreadsToCreate = Math.min(totalSplits, 
           CustomParallelInMemoryShuffle.MaxRxConnections) - 
@@ -144,6 +138,21 @@ extends Shuffle[K, V, C] with Logging {
       }
       
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -171,7 +180,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala
index 0d17ace438..45f629e414 100644
--- a/src/scala/spark/CustomParallelLocalFileShuffle.scala
+++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala
@@ -99,12 +99,6 @@ extends Shuffle[K, V, C] with Logging {
       var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool(
         CustomParallelLocalFileShuffle.MaxRxConnections)
         
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-        
       while (hasSplits < totalSplits) {
         var numThreadsToCreate = Math.min(totalSplits, 
           CustomParallelLocalFileShuffle.MaxRxConnections) - 
@@ -135,6 +129,21 @@ extends Shuffle[K, V, C] with Logging {
       }
       
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -162,7 +171,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala
index 0e32a983b8..e6579fc5aa 100644
--- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala
@@ -132,12 +132,6 @@ extends Shuffle[K, V, C] with Logging {
       receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]      
       combiners = new HashMap[K, C]
       
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool(
         HttpBlockedLocalFileShuffle.MaxRxConnections)
         
@@ -170,6 +164,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -197,7 +206,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala
index ca6a3c2cd0..b6ddee6d2f 100644
--- a/src/scala/spark/HttpParallelLocalFileShuffle.scala
+++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala
@@ -96,12 +96,6 @@ extends Shuffle[K, V, C] with Logging {
       var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool(
         HttpParallelLocalFileShuffle.MaxRxConnections)
         
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       while (hasSplits < totalSplits) {
         var numThreadsToCreate =
           Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) -
@@ -131,6 +125,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -158,7 +167,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/ManualBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala
index ce20c0143a..13b90ce40f 100644
--- a/src/scala/spark/ManualBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala
@@ -139,12 +139,6 @@ extends Shuffle[K, V, C] with Logging {
       receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
       combiners = new HashMap[K, C]
       
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       var threadPool = ManualBlockedLocalFileShuffle.newDaemonFixedThreadPool(
         ManualBlockedLocalFileShuffle.MaxRxConnections)
         
@@ -177,6 +171,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -204,7 +213,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
index f0534bebc1..936e8bee48 100644
--- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
@@ -149,12 +149,6 @@ extends Shuffle[K, V, C] with Logging {
       receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
       combiners = new HashMap[K, C]
       
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-
       var threadPool = Shuffle.newDaemonFixedThreadPool(
         Shuffle.MaxRxConnections)
         
@@ -189,6 +183,21 @@ extends Shuffle[K, V, C] with Logging {
       }
 
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -384,7 +393,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {   
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
diff --git a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala
index 52b52e3600..54b4e8e130 100644
--- a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala
+++ b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala
@@ -99,12 +99,6 @@ extends Shuffle[K, V, C] with Logging {
       var threadPool = 
         Shuffle.newDaemonFixedThreadPool(Shuffle.MaxRxConnections)
         
-      // Start consumer
-      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
-      shuffleConsumer.setDaemon(true)
-      shuffleConsumer.start()
-      logInfo("ShuffleConsumer started...")
-        
       while (hasSplits < totalSplits) {
         var numThreadsToCreate = 
           Math.min(totalSplits, Shuffle.MaxRxConnections) - 
@@ -136,6 +130,21 @@ extends Shuffle[K, V, C] with Logging {
       }
       
       threadPool.shutdown()
+
+      // Start consumer
+      // TODO: Consumption is delayed until everything has been received. 
+      // Otherwise it interferes with network performance
+      var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
+      shuffleConsumer.setDaemon(true)
+      shuffleConsumer.start()
+      logInfo("ShuffleConsumer started...")
+
+      // Don't return until consumption is finished
+      // TODO: Replace with a lock later. 
+      while (receivedData.size > 0) {
+        Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval)
+      }
+      
       combiners
     })
   }
@@ -332,7 +341,7 @@ extends Shuffle[K, V, C] with Logging {
   extends Thread with Logging {
     override def run: Unit = {
       // Run until all splits are here
-      while (hasSplits < totalSplits) {
+      while (receivedData.size > 0) {
         var splitIndex = -1
         var recvByteArray: Array[Byte] = null
       
-- 
GitLab