From f6c447f87592286a6f58aee5e0b2dc8dcb470d0c Mon Sep 17 00:00:00 2001
From: Evan Racah <ejracah@gmail.com>
Date: Wed, 2 Sep 2015 22:13:18 -0700
Subject: [PATCH] Removed code duplication in ShuffleBlockFetcherIterator

Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made.

Author: Evan Racah <ejracah@gmail.com>

Closes #8514 from eracah/master.
---
 .../storage/ShuffleBlockFetcherIterator.scala  | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a759ceb96e..0d0448feb5 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
     fetchRequests ++= Utils.randomize(remoteRequests)
 
     // Send out initial requests for blocks, up to our maxBytesInFlight
-    while (fetchRequests.nonEmpty &&
-      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
-      sendRequest(fetchRequests.dequeue())
-    }
+    fetchUpToMaxBytes()
 
     val numFetches = remoteRequests.size - fetchRequests.size
     logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
       case _ =>
     }
     // Send fetch requests up to maxBytesInFlight
-    while (fetchRequests.nonEmpty &&
-      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
-      sendRequest(fetchRequests.dequeue())
-    }
+    fetchUpToMaxBytes()
 
     result match {
       case FailureFetchResult(blockId, address, e) =>
@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  private def fetchUpToMaxBytes(): Unit = {
+    // Send fetch requests up to maxBytesInFlight
+    while (fetchRequests.nonEmpty &&
+      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
+      sendRequest(fetchRequests.dequeue())
+    }
+  }
+
   private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
     blockId match {
       case ShuffleBlockId(shufId, mapId, reduceId) =>
-- 
GitLab