Skip to content
Snippets Groups Projects
Commit 1d62f8ac authored by Wenchen Fan's avatar Wenchen Fan
Browse files

[SPARK-19659][CORE][FOLLOW-UP] Fetch big blocks to disk when shuffle-read

## What changes were proposed in this pull request?

This PR includes some minor improvement for the comments and tests in https://github.com/apache/spark/pull/16989

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18117 from cloud-fan/follow.
parent 4af37812
No related branches found
No related tags found
No related merge requests found
......@@ -214,11 +214,12 @@ final class ShuffleBlockFetcherIterator(
}
}
// Shuffle remote blocks to disk when the request is too large.
// TODO: Encryption and compression should be considered.
// Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is
// already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
// the data and write it to file directly.
if (req.size > maxReqSizeShuffleToMem) {
val shuffleFiles = blockIds.map {
bId => blockManager.diskBlockManager.createTempLocalBlock()._2
val shuffleFiles = blockIds.map { _ =>
blockManager.diskBlockManager.createTempLocalBlock()._2
}.toArray
shuffleFilesSet ++= shuffleFiles
shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
......
......@@ -36,6 +36,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.Utils
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
......@@ -420,9 +421,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
doReturn(localBmId).when(blockManager).blockManagerId
val diskBlockManager = mock(classOf[DiskBlockManager])
val tmpDir = Utils.createTempDir()
doReturn{
var blockId = new TempLocalBlockId(UUID.randomUUID())
(blockId, new File(blockId.name))
val blockId = TempLocalBlockId(UUID.randomUUID())
(blockId, new File(tmpDir, blockId.name))
}.when(diskBlockManager).createTempLocalBlock()
doReturn(diskBlockManager).when(blockManager).diskBlockManager
......@@ -443,34 +445,34 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
}
})
def fetchShuffleBlock(blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])]): Unit = {
// Set `maxBytesInFlight` and `maxReqsInFlight` to `Int.MaxValue`, so that during the
// construction of `ShuffleBlockFetcherIterator`, all requests to fetch remote shuffle blocks
// are issued. The `maxReqSizeShuffleToMem` is hard-coded as 200 here.
new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress,
(_, in) => in,
maxBytesInFlight = Int.MaxValue,
maxReqsInFlight = Int.MaxValue,
maxReqSizeShuffleToMem = 200,
detectCorrupt = true)
}
val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator1 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress1,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress1)
// `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch
// shuffle block to disk.
assert(shuffleFiles === null)
val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator2 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress2,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress2)
// `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch
// shuffle block to disk.
assert(shuffleFiles != null)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment