Skip to content
Snippets Groups Projects
Commit c4c8f72e authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Fixed an indexing bug in HttpBlockedLocalFileShuffle. It still doesn't work on...

Fixed an indexing bug in HttpBlockedLocalFileShuffle. It still doesn't work on EC2 with >5 nodes cluster.
parent a5a8b704
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 -Dspark.shuffle.class=spark.HttpBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000
...@@ -33,7 +33,7 @@ extends Shuffle[K, V, C] with Logging { ...@@ -33,7 +33,7 @@ extends Shuffle[K, V, C] with Logging {
@transient var hasSplitsBitVector: BitSet = null @transient var hasSplitsBitVector: BitSet = null
@transient var splitsInRequestBitVector: BitSet = null @transient var splitsInRequestBitVector: BitSet = null
@transient var receivedData: LinkedBlockingQueue[(Int, Int, Array[Byte])] = null @transient var receivedData: LinkedBlockingQueue[(Int, Array[Byte])] = null
@transient var combiners: HashMap[K,C] = null @transient var combiners: HashMap[K,C] = null
override def compute(input: RDD[(K, V)], override def compute(input: RDD[(K, V)],
...@@ -130,7 +130,7 @@ extends Shuffle[K, V, C] with Logging { ...@@ -130,7 +130,7 @@ extends Shuffle[K, V, C] with Logging {
hasSplitsBitVector = new BitSet(totalSplits) hasSplitsBitVector = new BitSet(totalSplits)
splitsInRequestBitVector = new BitSet(totalSplits) splitsInRequestBitVector = new BitSet(totalSplits)
receivedData = new LinkedBlockingQueue[(Int, Int, Array[Byte])] receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
combiners = new HashMap[K, C] combiners = new HashMap[K, C]
// Start consumer // Start consumer
...@@ -199,16 +199,13 @@ extends Shuffle[K, V, C] with Logging { ...@@ -199,16 +199,13 @@ extends Shuffle[K, V, C] with Logging {
override def run: Unit = { override def run: Unit = {
// Run until all splits are here // Run until all splits are here
while (hasSplits < totalSplits) { while (hasSplits < totalSplits) {
var inputId = -1
var splitIndex = -1 var splitIndex = -1
var recvByteArray: Array[Byte] = null var recvByteArray: Array[Byte] = null
try { try {
var tempTuple = var tempPair = receivedData.take().asInstanceOf[(Int, Array[Byte])]
receivedData.take().asInstanceOf[(Int, Int, Array[Byte])] splitIndex = tempPair._1
inputId = tempTuple._1 recvByteArray = tempPair._2
splitIndex = tempTuple._2
recvByteArray = tempTuple._3
} catch { } catch {
case e: Exception => { case e: Exception => {
logInfo("Exception during taking data from receivedData") logInfo("Exception during taking data from receivedData")
...@@ -232,10 +229,10 @@ extends Shuffle[K, V, C] with Logging { ...@@ -232,10 +229,10 @@ extends Shuffle[K, V, C] with Logging {
inputStream.close() inputStream.close()
// Consumption completed. Update stats. // Consumption completed. Update stats.
hasBlocksInSplit(inputId) = hasBlocksInSplit(inputId) + 1 hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received // Split has been received only if all the blocks have been received
if (hasBlocksInSplit(inputId) == totalBlocksInSplit(inputId)) { if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
hasSplitsBitVector.synchronized { hasSplitsBitVector.synchronized {
hasSplitsBitVector.set(splitIndex) hasSplitsBitVector.set(splitIndex)
} }
...@@ -257,22 +254,22 @@ extends Shuffle[K, V, C] with Logging { ...@@ -257,22 +254,22 @@ extends Shuffle[K, V, C] with Logging {
override def run: Unit = { override def run: Unit = {
try { try {
// First get the INDEX file if totalBlocksInSplit(inputId) is unknown // First get the INDEX file if totalBlocksInSplit(splitIndex) is unknown
if (totalBlocksInSplit(inputId) == -1) { if (totalBlocksInSplit(splitIndex) == -1) {
val url = "%s/shuffle/%d/%d/INDEX-%d".format(serverUri, shuffleId, val url = "%s/shuffle/%d/%d/INDEX-%d".format(serverUri, shuffleId,
inputId, myId) inputId, myId)
val inputStream = new ObjectInputStream(new URL(url).openStream()) val inputStream = new ObjectInputStream(new URL(url).openStream())
try { try {
while (true) { while (true) {
blocksInSplit(inputId) += blocksInSplit(splitIndex) +=
inputStream.readObject().asInstanceOf[Long] inputStream.readObject().asInstanceOf[Long]
} }
} catch { } catch {
case e: EOFException => {} case e: EOFException => {}
} }
totalBlocksInSplit(inputId) = blocksInSplit(inputId).size totalBlocksInSplit(splitIndex) = blocksInSplit(splitIndex).size
inputStream.close() inputStream.close()
} }
...@@ -284,11 +281,11 @@ extends Shuffle[K, V, C] with Logging { ...@@ -284,11 +281,11 @@ extends Shuffle[K, V, C] with Logging {
url.openConnection().asInstanceOf[HttpURLConnection] url.openConnection().asInstanceOf[HttpURLConnection]
// Set the range to download // Set the range to download
val blockStartsAt = hasBlocksInSplit(inputId) match { val blockStartsAt = hasBlocksInSplit(splitIndex) match {
case 0 => 0 case 0 => 0
case _ => blocksInSplit(inputId)(hasBlocksInSplit(inputId) - 1) + 1 case _ => blocksInSplit(splitIndex)(hasBlocksInSplit(splitIndex) - 1) + 1
} }
val blockEndsAt = blocksInSplit(inputId)(hasBlocksInSplit(inputId)) val blockEndsAt = blocksInSplit(splitIndex)(hasBlocksInSplit(splitIndex))
httpConnection.setRequestProperty("Range", httpConnection.setRequestProperty("Range",
"bytes=" + blockStartsAt + "-" + blockEndsAt) "bytes=" + blockStartsAt + "-" + blockEndsAt)
...@@ -320,7 +317,7 @@ extends Shuffle[K, V, C] with Logging { ...@@ -320,7 +317,7 @@ extends Shuffle[K, V, C] with Logging {
// Make it available to the consumer // Make it available to the consumer
try { try {
receivedData.put((inputId, splitIndex, recvByteArray)) receivedData.put((splitIndex, recvByteArray))
} catch { } catch {
case e: Exception => { case e: Exception => {
logInfo("Exception during putting data into receivedData") logInfo("Exception during putting data into receivedData")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment