diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 5add4fc433fb3c272d25daed79e8b37ef12d567e..e3113205bebdc95c8939e1b4f54b848753f692f3 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onSuccess { case message => val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - - for (blockMessage <- blockMessageArray) { - if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { - listener.onBlockFetchFailure( - new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId")) - } else { - val blockId = blockMessage.getId - val networkSize = blockMessage.getData.limit() - listener.onBlockFetchSuccess( - blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. + if (blockMessageArray.isEmpty) { + listener.onBlockFetchFailure( + new SparkException(s"Received empty message from $cmId")) + } else { + for (blockMessage <- blockMessageArray) { + val msgType = blockMessage.getType + if (msgType != BlockMessage.TYPE_GOT_BLOCK) { + listener.onBlockFetchFailure( + new SparkException(s"Unexpected message ${msgType} received from $cmId")) + } else { + val blockId = blockMessage.getId + listener.onBlockFetchSuccess( + blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + } } } }(cm.futureExecContext)