diff --git a/conf/java-opts b/conf/java-opts index fc6064d109c98c35708e7ce6c0145425c7aa07d9..21795fbd8d3b676ee2989d77dbc654ad14c7b7b7 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.TrackedCustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 +-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 66827078281c5fe74e552391ff4c670211c3d852..4a8aae57325b3983f9434e16029eb5a86883bf87 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -256,22 +256,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - } - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } } } } @@ -354,7 +338,21 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + } + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index e40e8fccb7e23b5802578c70b46eeaa0e527dbab..2853e29cd9a82954db7176a81105d0e63a090c4b 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -243,22 +243,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - } - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } } } } @@ -341,7 +325,21 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + } + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala index 7d4c6f1cc386840ea7c52bcfce60163fa0a8e508..6cb7d64046cd77194bc8eaf866fe968e63bead83 100644 --- a/src/scala/spark/CustomParallelInMemoryShuffle.scala +++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala @@ -200,18 +200,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - } } } @@ -285,7 +273,16 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index d41057b0d95a3e4305e2dccbd88758478eca6018..0d17ace4389f92169d464185b57046940764b5ae 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -191,18 +191,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - } } } @@ -276,7 +264,16 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index bd6f274264068669a69a4f139aee6b8336de2fd2..0e32a983b8105edbc7921990bbf6ea7613ad0d03 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -226,22 +226,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - } - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } } } } @@ -323,7 +307,21 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + } + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index cdc961da32931862e185b006d0d8540c11b75ea8..ca6a3c2cd0359217bf4d9983fbfc204e676ba6ff 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -187,18 +187,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - } } } @@ -252,7 +240,16 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true diff --git a/src/scala/spark/ManualBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala index 03b7b384150ace9a6c6f4ea6dd4230ef0bc8ecb8..ce20c0143afce5cdedbe0a0fe5301139ceca1e7e 100644 --- a/src/scala/spark/ManualBlockedLocalFileShuffle.scala +++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala @@ -233,22 +233,6 @@ extends Shuffle[K, V, C] with Logging { case e: EOFException => { } } inputStream.close() - - // Consumption completed. Update stats. - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) - } - hasSplits += 1 - } - - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } } } } @@ -315,7 +299,21 @@ extends Shuffle[K, V, C] with Logging { } } - // NOTE: Update of bitVectors are now done by the consumer + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 + } + + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } receptionSucceeded = true