Skip to content
Snippets Groups Projects
Commit f20ed14e authored by Ali Ghodsi's avatar Ali Ghodsi
Browse files

Merged in from upstream to use TaskLocation instead of strings

parent 5cd21c41
No related branches found
No related tags found
No related merge requests found
......@@ -56,8 +56,7 @@ import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler._
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
......@@ -65,6 +64,10 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import scala.Some
import spark.scheduler.StageInfo
import spark.storage.RDDInfo
import spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
......@@ -620,7 +623,7 @@ class SparkContext(
* @param partition to be looked up for locality
* @return list of preferred locations for the partition
*/
private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
dagScheduler.getPreferredLocs(rdd, partition)
}
......
......@@ -52,7 +52,7 @@ case class CoalescedRDDPartition(
*/
def localFraction: Double = {
val loc = parents.count(p =>
rdd.context.getPreferredLocs(rdd, p.index).contains(preferredLocation))
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
......@@ -167,8 +167,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
def currPrefLocs(part: Partition): Seq[String] =
prev.context.getPreferredLocs(prev, part.index)
def currPrefLocs(part: Partition): Seq[String] = {
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
......@@ -282,8 +283,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
* @return partition group (bin to be put in)
*/
def pickBin(p: Partition): PartitionGroup = {
val pref = prev.context.getPreferredLocs(prev, p.index).
map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations
val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment