Skip to content
Snippets Groups Projects
Commit 92d2a9a1 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Removed unncessary stuff from HttpParallelLocalFileShuffle

parent 4ab268ee
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.HttpParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=1000 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000 -Dspark.shuffle.class=spark.HttpParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000
...@@ -73,14 +73,6 @@ extends Shuffle[K, V, C] with Logging { ...@@ -73,14 +73,6 @@ extends Shuffle[K, V, C] with Logging {
val writeTime = System.currentTimeMillis - writeStartTime val writeTime = System.currentTimeMillis - writeStartTime
logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
// Write the SPLITSIZE file
val splitSizeFile = HttpParallelLocalFileShuffle.getSplitSizeOutputFile(
shuffleId, myIndex, i)
val splitSizeOut =
new ObjectOutputStream(new FileOutputStream(splitSizeFile))
splitSizeOut.writeObject(file.length.toInt)
splitSizeOut.close()
out.close() out.close()
} }
...@@ -218,31 +210,28 @@ extends Shuffle[K, V, C] with Logging { ...@@ -218,31 +210,28 @@ extends Shuffle[K, V, C] with Logging {
override def run: Unit = { override def run: Unit = {
try { try {
// First read the SPLITSIZE file // Open connection
var requestedFileLen = -1 val urlString =
"%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
val url = new URL(urlString)
val httpConnection =
url.openConnection().asInstanceOf[HttpURLConnection]
var url = "%s/shuffle/%d/%d/SPLITSIZE-%d".format(serverUri, shuffleId, // Connect to the server
inputId, myId) httpConnection.connect()
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
requestedFileLen = inputStream.readObject().asInstanceOf[Int]
} catch {
case e: EOFException => {}
}
inputStream.close()
url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
// Receive file length
var requestedFileLen = httpConnection.getContentLength
val readStartTime = System.currentTimeMillis val readStartTime = System.currentTimeMillis
logInfo("BEGIN READ: " + url) logInfo("BEGIN READ: " + urlString)
// Receive data in an Array[Byte] // Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen) var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0 var alreadyRead = 0
var bytesRead = 0 var bytesRead = 0
val isSource = new URL(url).openStream() val isSource = httpConnection.getInputStream()
while (alreadyRead != requestedFileLen) { while (alreadyRead != requestedFileLen) {
bytesRead = isSource.read(recvByteArray, alreadyRead, bytesRead = isSource.read(recvByteArray, alreadyRead,
requestedFileLen - alreadyRead) requestedFileLen - alreadyRead)
...@@ -251,6 +240,9 @@ extends Shuffle[K, V, C] with Logging { ...@@ -251,6 +240,9 @@ extends Shuffle[K, V, C] with Logging {
} }
} }
// Disconnect
httpConnection.disconnect()
// Make it available to the consumer // Make it available to the consumer
try { try {
receivedData.put((splitIndex, recvByteArray)) receivedData.put((splitIndex, recvByteArray))
...@@ -264,9 +256,9 @@ extends Shuffle[K, V, C] with Logging { ...@@ -264,9 +256,9 @@ extends Shuffle[K, V, C] with Logging {
receptionSucceeded = true receptionSucceeded = true
logInfo("END READ: " + url) logInfo("END READ: " + urlString)
val readTime = System.currentTimeMillis - readStartTime val readTime = System.currentTimeMillis - readStartTime
logInfo("Reading " + url + " took " + readTime + " millis.") logInfo("Reading " + urlString + " took " + readTime + " millis.")
} catch { } catch {
// EOFException is expected to happen because sender can break // EOFException is expected to happen because sender can break
// connection due to timeout // connection due to timeout
...@@ -380,15 +372,6 @@ object HttpParallelLocalFileShuffle extends Logging { ...@@ -380,15 +372,6 @@ object HttpParallelLocalFileShuffle extends Logging {
return file return file
} }
def getSplitSizeOutputFile(shuffleId: Long, inputId: Int,
outputId: Int): File = {
initializeIfNeeded()
val dir = new File(shuffleDir, shuffleId + "/" + inputId)
dir.mkdirs()
val file = new File(dir, "SPLITSIZE-" + outputId)
return file
}
def getServerUri(): String = { def getServerUri(): String = {
initializeIfNeeded() initializeIfNeeded()
serverUri serverUri
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment