Skip to content
Snippets Groups Projects
Commit 4f93d0ca authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-4759] Fix driver hanging from coalescing partitions

The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction.

This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly.

Author: Andrew Or <andrew@databricks.com>

Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits:

e520d6b [Andrew Or] Oops
3ebf8bd [Andrew Or] A few comments
f370a4e [Andrew Or] Fix tests
2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location
parent 447ae2de
No related branches found
No related tags found
No related merge requests found
...@@ -35,11 +35,10 @@ import org.apache.spark.util.Utils ...@@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
* @param preferredLocation the preferred location for this partition * @param preferredLocation the preferred location for this partition
*/ */
private[spark] case class CoalescedRDDPartition( private[spark] case class CoalescedRDDPartition(
index: Int, index: Int,
@transient rdd: RDD[_], @transient rdd: RDD[_],
parentsIndices: Array[Int], parentsIndices: Array[Int],
@transient preferredLocation: String = "" @transient preferredLocation: Option[String] = None) extends Partition {
) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException]) @throws(classOf[IOException])
...@@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition( ...@@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1 * @return locality of this coalesced partition between 0 and 1
*/ */
def localFraction: Double = { def localFraction: Double = {
val loc = parents.count(p => val loc = parents.count { p =>
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation)) val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
} }
} }
...@@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition( ...@@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/ */
private[spark] class CoalescedRDD[T: ClassTag]( private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T], @transient var prev: RDD[T],
maxPartitions: Int, maxPartitions: Int,
balanceSlack: Double = 0.10) balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
...@@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( ...@@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* @return the machine most preferred by split * @return the machine most preferred by split
*/ */
override def getPreferredLocations(partition: Partition): Seq[String] = { override def getPreferredLocations(partition: Partition): Seq[String] = {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
} }
} }
...@@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( ...@@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* *
*/ */
private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
...@@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc ...@@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
} }
} }
private[spark] case class PartitionGroup(prefLoc: String = "") { private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]() var arr = mutable.ArrayBuffer[Partition]()
def size = arr.size def size = arr.size
} }
private object PartitionGroup {
def apply(prefLoc: String): PartitionGroup = {
require(prefLoc != "", "Preferred location must not be empty")
PartitionGroup(Some(prefLoc))
}
}
...@@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { ...@@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality") { test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3) val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation) val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped") assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment