Skip to content
Snippets Groups Projects
Commit f6c447f8 authored by Evan Racah's avatar Evan Racah Committed by Andrew Or
Browse files

Removed code duplication in ShuffleBlockFetcherIterator

Added fetchUpToMaxBytes() to prevent having to update both code blocks when a change is made.

Author: Evan Racah <ejracah@gmail.com>

Closes #8514 from eracah/master.
parent 0985d2c3
No related branches found
No related tags found
No related merge requests found
...@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator( ...@@ -260,10 +260,7 @@ final class ShuffleBlockFetcherIterator(
fetchRequests ++= Utils.randomize(remoteRequests) fetchRequests ++= Utils.randomize(remoteRequests)
// Send out initial requests for blocks, up to our maxBytesInFlight // Send out initial requests for blocks, up to our maxBytesInFlight
while (fetchRequests.nonEmpty && fetchUpToMaxBytes()
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
val numFetches = remoteRequests.size - fetchRequests.size val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
...@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator( ...@@ -296,10 +293,7 @@ final class ShuffleBlockFetcherIterator(
case _ => case _ =>
} }
// Send fetch requests up to maxBytesInFlight // Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty && fetchUpToMaxBytes()
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
result match { result match {
case FailureFetchResult(blockId, address, e) => case FailureFetchResult(blockId, address, e) =>
...@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator( ...@@ -315,6 +309,14 @@ final class ShuffleBlockFetcherIterator(
} }
} }
private def fetchUpToMaxBytes(): Unit = {
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
}
private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = { private def throwFetchFailedException(blockId: BlockId, address: BlockManagerId, e: Throwable) = {
blockId match { blockId match {
case ShuffleBlockId(shufId, mapId, reduceId) => case ShuffleBlockId(shufId, mapId, reduceId) =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment