Skip to content
Snippets Groups Projects
Commit 11589c39 authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Fix ZippedRDD as part Matei's suggestion

parent dfde9ce9
No related branches found
No related tags found
No related merge requests found
package spark.rdd
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
......@@ -49,9 +49,20 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
override def getPreferredLocations(s: Partition): Seq[String] = {
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
// TODO: becomes complicated - intersect on hostPort if available, else fallback to host (removing intersected hostPort's).
// Since I am not very sure about this RDD, leaving it to others to comment better !
rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
val pref1 = rdd1.preferredLocations(partition1)
val pref2 = rdd2.preferredLocations(partition2)
// both partitions are instance local.
val instanceLocalLocations = pref1.intersect(pref2)
// remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local.
val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2)
// Can have mix of instance local (hostPort) and node local (host) locations as preference !
instanceLocalLocations ++ nodeLocalLocations
}
override def clearDependencies() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment