From 92d2a9a13a944747ef4c04158f4f78f7a9e3d405 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> Date: Wed, 22 Dec 2010 11:28:50 -0800 Subject: [PATCH] Removed unncessary stuff from HttpParallelLocalFileShuffle --- conf/java-opts | 2 +- .../spark/HttpParallelLocalFileShuffle.scala | 53 +++++++------------ 2 files changed, 19 insertions(+), 36 deletions(-) diff --git a/conf/java-opts b/conf/java-opts index 7f64da5b86..b851798d09 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.HttpParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=1000 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000 +-Dspark.shuffle.class=spark.HttpParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index b9a9a8af2a..629cb075ea 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -73,14 +73,6 @@ extends Shuffle[K, V, C] with Logging { val writeTime = System.currentTimeMillis - writeStartTime logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") - // Write the SPLITSIZE file - val splitSizeFile = HttpParallelLocalFileShuffle.getSplitSizeOutputFile( - shuffleId, myIndex, i) - val splitSizeOut = - new ObjectOutputStream(new FileOutputStream(splitSizeFile)) - splitSizeOut.writeObject(file.length.toInt) - - splitSizeOut.close() out.close() } @@ -218,31 +210,28 @@ extends Shuffle[K, V, C] with Logging { override def run: Unit = { try { - // First read the SPLITSIZE file - var requestedFileLen = -1 + // Open connection + val urlString = + "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId) + val url = new URL(urlString) + val httpConnection = + url.openConnection().asInstanceOf[HttpURLConnection] - var url = "%s/shuffle/%d/%d/SPLITSIZE-%d".format(serverUri, shuffleId, - inputId, myId) - val inputStream = new ObjectInputStream(new URL(url).openStream()) - - try { - requestedFileLen = inputStream.readObject().asInstanceOf[Int] - } catch { - case e: EOFException => {} - } - inputStream.close() - - url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId) + // Connect to the server + httpConnection.connect() + // Receive file length + var requestedFileLen = httpConnection.getContentLength + val readStartTime = System.currentTimeMillis - logInfo("BEGIN READ: " + url) + logInfo("BEGIN READ: " + urlString) // Receive data in an Array[Byte] var recvByteArray = new Array[Byte](requestedFileLen) var alreadyRead = 0 var bytesRead = 0 - val isSource = new URL(url).openStream() + val isSource = httpConnection.getInputStream() while (alreadyRead != requestedFileLen) { bytesRead = isSource.read(recvByteArray, alreadyRead, requestedFileLen - alreadyRead) @@ -251,6 +240,9 @@ extends Shuffle[K, V, C] with Logging { } } + // Disconnect + httpConnection.disconnect() + // Make it available to the consumer try { receivedData.put((splitIndex, recvByteArray)) @@ -264,9 +256,9 @@ extends Shuffle[K, V, C] with Logging { receptionSucceeded = true - logInfo("END READ: " + url) + logInfo("END READ: " + urlString) val readTime = System.currentTimeMillis - readStartTime - logInfo("Reading " + url + " took " + readTime + " millis.") + logInfo("Reading " + urlString + " took " + readTime + " millis.") } catch { // EOFException is expected to happen because sender can break // connection due to timeout @@ -380,15 +372,6 @@ object HttpParallelLocalFileShuffle extends Logging { return file } - def getSplitSizeOutputFile(shuffleId: Long, inputId: Int, - outputId: Int): File = { - initializeIfNeeded() - val dir = new File(shuffleDir, shuffleId + "/" + inputId) - dir.mkdirs() - val file = new File(dir, "SPLITSIZE-" + outputId) - return file - } - def getServerUri(): String = { initializeIfNeeded() serverUri -- GitLab