diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 576ef63dbfb3d42a421705bf9660da8714ae0203..6c568cc2b0914e6d90fbbbc83cb8974dfa493419 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -411,8 +411,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m var bytesInFlight = 0L def sendRequest(req: FetchRequest) { + logDebug("Sending request for %d blocks (%s) from %s".format( + req.blocks.size, Utils.memoryBytesToString(req.size), req.address.ip)) val cmId = new ConnectionManagerId(req.address.ip, req.address.port) - val blockMessageArray = new BlockMessageArray(req.blocks.map{ + val blockMessageArray = new BlockMessageArray(req.blocks.map { case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId)) }) bytesInFlight += req.size @@ -450,10 +452,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds ++= blockInfos.map(_._1) } else { remoteBlockIds ++= blockInfos.map(_._1) - // Make our requests at least maxBytesInFlight / 4 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 4 + // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 // nodes, rather than blocking on reading output from one node. - val minRequestSize = math.max(maxBytesInFlight / 4, 1L) + val minRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(String, Long)]