Skip to content
Snippets Groups Projects
Commit c5483e39 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- ParallelLocalFileShuffle does NOT use HttpPipelining at all.

 - Config option related to pipelining has been removed.
 - Summary: Basic -> Pipelining / Parallel -> NO pipelining
parent 56d8a2af
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.ParallelLocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true -Dspark.parallelLocalFileShuffle.MaxConnections=2
-Dspark.shuffle.class=spark.ParallelLocalFileShuffle -Dspark.parallelLocalFileShuffle.MaxConnections=2
......@@ -68,35 +68,12 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
(myIndex, ParallelLocalFileShuffle.serverUri)
}).collect()
// Load config option to decide whether or not to use HTTP pipelining
val UseHttpPipelining =
System.getProperty("spark.shuffle.UseHttpPipelining", "true").toBoolean
// Build a traversable list of pairs of server URI and split. Needs to be
// of type TraversableOnce[(String, ArrayBuffer[Int])]
val splitsByUri = if (UseHttpPipelining) {
// Build a hashmap from server URI to list of splits (to facillitate
// fetching all the URIs on a server within a single connection)
val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]]
for ((inputId, serverUri) <- outputLocs) {
splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
}
splitsByUriHM.toArray
} else {
// Don't use HTTP pipelining
val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])]
for ((inputId, serverUri) <- outputLocs) {
splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId))
}
splitsByUriAB.toArray
}
// TODO: Could broadcast splitsByUri
// TODO: Could broadcast outputLocs
// Return an RDD that does each of the merges for a given partition
val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
return indexes.flatMap((myId: Int) => {
totalSplits = splitsByUri.size
totalSplits = outputLocs.size
hasSplits = 0
hasSplitsBitVector = new BitSet (totalSplits)
splitsInRequestBitVector = new BitSet (totalSplits)
......@@ -115,10 +92,10 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
val splitIndex = selectRandomSplit
if (splitIndex != -1) {
val (serverUri, inputIds) = splitsByUri (splitIndex)
val (inputId, serverUri) = outputLocs (splitIndex)
threadPool.execute ( new ShuffleClient (serverUri, shuffleId.toInt,
inputIds, myId, splitIndex, mergeCombiners))
inputId, myId, splitIndex, mergeCombiners))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
......@@ -148,47 +125,46 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
if (requiredSplits.size > 0) {
requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt (requiredSplits.size))
requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt (
requiredSplits.size))
} else {
-1
}
}
class ShuffleClient (serverUri: String, shuffleId: Int,
inputIds: ArrayBuffer[Int], myId: Int, splitIndex: Int,
inputId: Int, myId: Int, splitIndex: Int,
mergeCombiners: (C, C) => C)
extends Thread with Logging {
private var receptionSucceeded = false
override def run: Unit = {
try {
for (inputId <- inputIds) {
val url =
"%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
val readStartTime = System.currentTimeMillis
logInfo ("BEGIN READ: " + url)
val url =
"%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners.synchronized {
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
}
val readStartTime = System.currentTimeMillis
logInfo ("BEGIN READ: " + url)
val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners.synchronized {
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
}
}
} catch {
case e: EOFException => {}
}
inputStream.close()
logInfo ("END READ: " + url)
val readTime = (System.currentTimeMillis - readStartTime)
logInfo ("Reading " + url + " took " + readTime + " millis.")
} catch {
case e: EOFException => {}
}
inputStream.close()
logInfo ("END READ: " + url)
val readTime = (System.currentTimeMillis - readStartTime)
logInfo ("Reading " + url + " took " + readTime + " millis.")
// Reception completed. Update stats.
hasSplitsBitVector.synchronized {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment