From 1e26fb39534cc5b79c6980e0974bedbf72e19c69 Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Thu, 30 Dec 2010 13:16:35 -0800
Subject: [PATCH] CustomBlockedLocalFileShuffle: reducers are reading multiple
 blocks per connections instead of just one. Sometimes ShuffleServer fails to
 start for small shuffle data with small block size in local VM. No problem
 with large block size.

---
 conf/java-opts                                |   2 +-
 .../spark/CustomBlockedInMemoryShuffle.scala  |   6 +-
 .../spark/CustomBlockedLocalFileShuffle.scala | 232 ++++++++++--------
 src/scala/spark/Shuffle.scala                 |   9 +
 ...TrackedCustomBlockedLocalFileShuffle.scala |   6 +-
 5 files changed, 145 insertions(+), 110 deletions(-)

diff --git a/conf/java-opts b/conf/java-opts
index 21795fbd8d..59a9d98fc9 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000
+-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=50
diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala
index 4a8aae5732..7f844883a5 100644
--- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala
+++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala
@@ -298,6 +298,9 @@ extends Shuffle[K, V, C] with Logging {
         // Receive BLOCKNUM
         totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
 
+        // Turn the timer OFF, if the sender responds before timeout
+        timeOutTimer.cancel()
+
         // Request specific block
         oosSource.writeObject(hasBlocksInSplit(splitIndex))
         
@@ -305,9 +308,6 @@ extends Shuffle[K, V, C] with Logging {
         var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
         logInfo("Received requestedFileLen = " + requestedFileLen)
 
-        // Turn the timer OFF, if the sender responds before timeout
-        timeOutTimer.cancel()
-        
         val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId, 
           hasBlocksInSplit(splitIndex))
         
diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
index 2853e29cd9..96ac923b25 100644
--- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
@@ -285,69 +285,75 @@ extends Shuffle[K, V, C] with Logging {
         // Receive BLOCKNUM
         totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
 
-        // Request specific block
-        oosSource.writeObject(hasBlocksInSplit(splitIndex))
-        
-        // Good to go. First, receive the length of the requested file
-        var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
-        logInfo("Received requestedFileLen = " + requestedFileLen)
-
         // Turn the timer OFF, if the sender responds before timeout
         timeOutTimer.cancel()
-        
-        val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId, 
-          hasBlocksInSplit(splitIndex))
-        
-        // Receive the file
-        if (requestedFileLen != -1) {
-          val readStartTime = System.currentTimeMillis
-          logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
-
-          // Receive data in an Array[Byte]
-          var recvByteArray = new Array[Byte](requestedFileLen)
-          var alreadyRead = 0
-          var bytesRead = 0
+
+        while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) {
+          // Set receptionSucceeded to false before trying for each block
+          receptionSucceeded = false
+
+          // Request specific block
+          oosSource.writeObject(hasBlocksInSplit(splitIndex))
           
-          while (alreadyRead != requestedFileLen) {
-            bytesRead = isSource.read(recvByteArray, alreadyRead, 
-              requestedFileLen - alreadyRead)
-            if (bytesRead > 0) {
-              alreadyRead  = alreadyRead + bytesRead
-            }
-          } 
+          // Good to go. First, receive the length of the requested file
+          var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
+          logInfo("Received requestedFileLen = " + requestedFileLen)
+
+          val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId, 
+            hasBlocksInSplit(splitIndex))
           
-          // Make it available to the consumer
-          try {
-            receivedData.put((splitIndex, recvByteArray))
-          } catch {
-            case e: Exception => {
-              logInfo("Exception during putting data into receivedData")
+          // Receive the file
+          if (requestedFileLen != -1) {
+            val readStartTime = System.currentTimeMillis
+            logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
+
+            // Receive data in an Array[Byte]
+            var recvByteArray = new Array[Byte](requestedFileLen)
+            var alreadyRead = 0
+            var bytesRead = 0
+            
+            while (alreadyRead != requestedFileLen) {
+              bytesRead = isSource.read(recvByteArray, alreadyRead, 
+                requestedFileLen - alreadyRead)
+              if (bytesRead > 0) {
+                alreadyRead  = alreadyRead + bytesRead
+              }
+            } 
+            
+            // Make it available to the consumer
+            try {
+              receivedData.put((splitIndex, recvByteArray))
+            } catch {
+              case e: Exception => {
+                logInfo("Exception during putting data into receivedData")
+              }
             }
-          }
-                  
-          // TODO: Updating stats before consumption is completed
-          hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
-          
-          // Split has been received only if all the blocks have been received
-          if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
-            hasSplitsBitVector.synchronized {
-              hasSplitsBitVector.set(splitIndex)
+                    
+            // TODO: Updating stats before consumption is completed
+            hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
+            
+            // Split has been received only if all the blocks have been received
+            if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
+              hasSplitsBitVector.synchronized {
+                hasSplitsBitVector.set(splitIndex)
+              }
+              hasSplits += 1
             }
-            hasSplits += 1
-          }
 
-          // We have received splitIndex
-          splitsInRequestBitVector.synchronized {
-            splitsInRequestBitVector.set(splitIndex, false)
-          }
+            // We have received splitIndex
+            splitsInRequestBitVector.synchronized {
+              splitsInRequestBitVector.set(splitIndex, false)
+            }
 
-          receptionSucceeded = true
+            // Consistent state in accounting variables
+            receptionSucceeded = true
 
-          logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
-          val readTime = System.currentTimeMillis - readStartTime
-          logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.")
-        } else {
+            logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
+            val readTime = System.currentTimeMillis - readStartTime
+            logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.")
+          } else {
             throw new SparkException("ShuffleServer " + hostAddress + " does not have " + requestPath)
+          } 
         }
       } catch {
         // EOFException is expected to happen because sender can break
@@ -573,9 +579,9 @@ object CustomBlockedLocalFileShuffle extends Logging {
           val (shuffleId, myIndex, outputId) = 
             ois.readObject.asInstanceOf[(Int, Int, Int)]
             
-          var requestPath = "%d/%d/%d".format(shuffleId, myIndex, outputId)
+          var requestPathBase = "%d/%d/%d".format(shuffleId, myIndex, outputId)
 
-          logInfo("requestPath: " + requestPath)
+          logInfo("requestPathBase: " + requestPathBase)
           
           // Read BLOCKNUM file and send back the total number of blocks
           val blockNumFilePath = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir, 
@@ -586,61 +592,81 @@ object CustomBlockedLocalFileShuffle extends Logging {
           blockNumIn.close()
           
           oos.writeObject(BLOCKNUM)
-          
-          // Receive specific block request
-          val blockId = ois.readObject.asInstanceOf[Int]
-          
-          // Ready to send
-          requestPath = requestPath + "-" + blockId
-          
-          // Open the file
-          var requestedFile: File = null
-          var requestedFileLen = -1
-          try {
-            requestedFile = new File(shuffleDir + "/" + requestPath)
-            requestedFileLen = requestedFile.length.toInt
-          } catch {
-            case e: Exception => { }
-          }
-          
-          // Send the length of the requestPath to let the receiver know that 
-          // transfer is about to start
-          // In the case of receiver timeout and connection close, this will
-          // throw a java.net.SocketException: Broken pipe
-          oos.writeObject(requestedFileLen)
-          oos.flush()
-          
-          logInfo("requestedFileLen = " + requestedFileLen)
 
-          // Read and send the requested file
-          if (requestedFileLen != -1) {
-            // Read
-            var byteArray = new Array[Byte](requestedFileLen)
-            val bis = 
-              new BufferedInputStream(new FileInputStream(requestedFile))
-
-            var bytesRead = bis.read(byteArray, 0, byteArray.length)
-            var alreadyRead = bytesRead
-
-            while (alreadyRead < requestedFileLen) {
-              bytesRead = bis.read(byteArray, alreadyRead,
-                (byteArray.length - alreadyRead))
-              if(bytesRead > 0) {
-                alreadyRead = alreadyRead + bytesRead
-              }
-            }            
-            bis.close()
+          val startTime = System.currentTimeMillis
+          var curTime = startTime
+          var keepSending = true
+          var numBlocksToSend = Shuffle.MaxChatBlocks
+          
+          while (keepSending && numBlocksToSend > 0) {
+            // Receive specific block request
+            val blockId = ois.readObject.asInstanceOf[Int]
             
-            // Send
-            bos.write(byteArray, 0, byteArray.length)
-            bos.flush()
-          } else {
-            // Close the connection
+            // Ready to send
+            val requestPath = requestPathBase + "-" + blockId
+            
+            // Open the file
+            var requestedFile: File = null
+            var requestedFileLen = -1
+            try {
+              requestedFile = new File(shuffleDir + "/" + requestPath)
+              requestedFileLen = requestedFile.length.toInt
+            } catch {
+              case e: Exception => { }
+            }
+            
+            // Send the length of the requestPath to let the receiver know that 
+            // transfer is about to start
+            // In the case of receiver timeout and connection close, this will
+            // throw a java.net.SocketException: Broken pipe
+            oos.writeObject(requestedFileLen)
+            oos.flush()
+            
+            logInfo("requestedFileLen = " + requestedFileLen)
+
+            // Read and send the requested file
+            if (requestedFileLen != -1) {
+              // Read
+              var byteArray = new Array[Byte](requestedFileLen)
+              val bis = 
+                new BufferedInputStream(new FileInputStream(requestedFile))
+
+              var bytesRead = bis.read(byteArray, 0, byteArray.length)
+              var alreadyRead = bytesRead
+
+              while (alreadyRead < requestedFileLen) {
+                bytesRead = bis.read(byteArray, alreadyRead,
+                  (byteArray.length - alreadyRead))
+                if(bytesRead > 0) {
+                  alreadyRead = alreadyRead + bytesRead
+                }
+              }            
+              bis.close()
+              
+              // Send
+              bos.write(byteArray, 0, byteArray.length)
+              bos.flush()
+              
+              // Update loop variables
+              numBlocksToSend = numBlocksToSend - 1
+              
+              curTime = System.currentTimeMillis
+              // Revoke sending only if there is anyone waiting in the queue
+              if (curTime - startTime >= Shuffle.MaxChatTime &&
+                  threadPool.getQueue.size > 0) {
+                keepSending = false
+              }
+            } else {
+              // Close the connection
+            }
           }
         } catch {
           // If something went wrong, e.g., the worker at the other end died etc
           // then close everything up
           // Exception can happen if the receiver stops receiving
+          // EOFException is expected to happen because receiver can break
+          // connection as soon as it has all the blocks
+          case eofe: java.io.EOFException => { }
           case e: Exception => {
             logInfo("ShuffleServerThread had a " + e)
           }
diff --git a/src/scala/spark/Shuffle.scala b/src/scala/spark/Shuffle.scala
index 31ae097665..be78bf9a3c 100644
--- a/src/scala/spark/Shuffle.scala
+++ b/src/scala/spark/Shuffle.scala
@@ -44,6 +44,12 @@ extends Logging {
   private var MaxTxConnections_ = System.getProperty(
     "spark.shuffle.maxTxConnections", "8").toInt
 
+  // Upper limit on receiving in blocked implementations (whichever comes first)
+  private var MaxChatTime_ = System.getProperty(
+    "spark.shuffle.maxChatTime", "250").toInt
+  private var MaxChatBlocks_ = System.getProperty(
+    "spark.shuffle.maxChatBlocks", "1024").toInt
+
   def MasterHostAddress = MasterHostAddress_
   def MasterTrackerPort = MasterTrackerPort_
 
@@ -55,6 +61,9 @@ extends Logging {
   def MaxRxConnections = MaxRxConnections_
   def MaxTxConnections = MaxTxConnections_
 
+  def MaxChatTime = MaxChatTime_
+  def MaxChatBlocks = MaxChatBlocks_
+  
   // Returns a standard ThreadFactory except all threads are daemons
   private def newDaemonThreadFactory: ThreadFactory = {
     new ThreadFactory {
diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
index 156c9d4dcc..f0534bebc1 100644
--- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala
@@ -458,6 +458,9 @@ extends Shuffle[K, V, C] with Logging {
         // Receive BLOCKNUM
         totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
 
+        // Turn the timer OFF, if the sender responds before timeout
+        timeOutTimer.cancel()
+        
         // Request specific block
         oosSource.writeObject(hasBlocksInSplit(splitIndex))
         
@@ -465,9 +468,6 @@ extends Shuffle[K, V, C] with Logging {
         var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
         logInfo("Received requestedFileLen = " + requestedFileLen)
 
-        // Turn the timer OFF, if the sender responds before timeout
-        timeOutTimer.cancel()
-        
         // Create a temp variable to be used in different places
         val requestPath = "http://%s:%d/shuffle/%s-%d".format(
           serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit, 
-- 
GitLab