Skip to content
Snippets Groups Projects
Commit c035c0f2 authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

[SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD

CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. The partition is serialized separately from the RDD, so when the RDD and partition arrive on the worker, the references in the partition and in the RDD no longer point to the same object.

This is a relatively minor performance issue (the closure can be 2x larger than it needs to be because the rdds and partitions are serialized twice; see numbers below) but is more annoying as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects. I'm not sure if this is enough of a potential future problem to fix this old and central part of the code, so hoping to get input from others here.

I did some simple experiments to see how much this effects closure size. For this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my change. The difference comes from eliminating duplicate serialization of the shuffle handle.

For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.

The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None by default). It would also get bigger for a big RDD -- although I can't think of any examples where the RDD object gets large.  The difference is not affected by the size of the function the user specifies, which (based on my understanding) is typically the source of large task closures.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #4145 from kayousterhout/SPARK-5360 and squashes the following commits:

85156c3 [Kay Ousterhout] Better comment the narrowDeps parameter
cff0209 [Kay Ousterhout] Fixed spelling issue
658e1af [Kay Ousterhout] [SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD
parent 5fea3e5c
No related branches found
No related tags found
No related merge requests found
...@@ -29,15 +29,16 @@ import org.apache.spark.annotation.DeveloperApi ...@@ -29,15 +29,16 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
private[spark] sealed trait CoGroupSplitDep extends Serializable
/** The references to rdd and splitIndex are transient because redundant information is stored
* in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
* CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
* task closure. */
private[spark] case class NarrowCoGroupSplitDep( private[spark] case class NarrowCoGroupSplitDep(
rdd: RDD[_], @transient rdd: RDD[_],
splitIndex: Int, @transient splitIndex: Int,
var split: Partition var split: Partition
) extends CoGroupSplitDep { ) extends Serializable {
@throws(classOf[IOException]) @throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
...@@ -47,9 +48,16 @@ private[spark] case class NarrowCoGroupSplitDep( ...@@ -47,9 +48,16 @@ private[spark] case class NarrowCoGroupSplitDep(
} }
} }
private[spark] case class ShuffleCoGroupSplitDep(handle: ShuffleHandle) extends CoGroupSplitDep /**
* Stores information about the narrow dependencies used by a CoGroupedRdd.
private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) *
* @param narrowDeps maps to the dependencies variable in the parent RDD: for each one to one
* dependency in dependencies, narrowDeps has a NarrowCoGroupSplitDep (describing
* the partition for that dependency) at the corresponding index. The size of
* narrowDeps should always be equal to the number of parents.
*/
private[spark] class CoGroupPartition(
idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])
extends Partition with Serializable { extends Partition with Serializable {
override val index: Int = idx override val index: Int = idx
override def hashCode(): Int = idx override def hashCode(): Int = idx
...@@ -105,9 +113,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: ...@@ -105,9 +113,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// Assume each RDD contributed a single dependency, and get it // Assume each RDD contributed a single dependency, and get it
dependencies(j) match { dependencies(j) match {
case s: ShuffleDependency[_, _, _] => case s: ShuffleDependency[_, _, _] =>
new ShuffleCoGroupSplitDep(s.shuffleHandle) None
case _ => case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
} }
}.toArray) }.toArray)
} }
...@@ -120,20 +128,21 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: ...@@ -120,20 +128,21 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val sparkConf = SparkEnv.get.conf val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition] val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.length val numRdds = dependencies.length
// A list of (rdd iterator, dependency number) pairs // A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- split.deps.zipWithIndex) dep match { for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent // Read them from the parent
val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]] val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
rddIterators += ((it, depNum)) rddIterators += ((it, depNum))
case ShuffleCoGroupSplitDep(handle) => case shuffleDependency: ShuffleDependency[_, _, _] =>
// Read map outputs of shuffle // Read map outputs of shuffle
val it = SparkEnv.get.shuffleManager val it = SparkEnv.get.shuffleManager
.getReader(handle, split.index, split.index + 1, context) .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
.read() .read()
rddIterators += ((it, depNum)) rddIterators += ((it, depNum))
} }
......
...@@ -81,9 +81,9 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( ...@@ -81,9 +81,9 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match { dependencies(j) match {
case s: ShuffleDependency[_, _, _] => case s: ShuffleDependency[_, _, _] =>
new ShuffleCoGroupSplitDep(s.shuffleHandle) None
case _ => case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
} }
}.toArray) }.toArray)
} }
...@@ -105,20 +105,26 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( ...@@ -105,20 +105,26 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq seq
} }
} }
def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit): Unit = dep match { def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => dependencies(depNum) match {
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op) case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
oneToOneDependency.rdd.iterator(dependencyPartition, context)
.asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
case ShuffleCoGroupSplitDep(handle) => case shuffleDependency: ShuffleDependency[_, _, _] =>
val iter = SparkEnv.get.shuffleManager val iter = SparkEnv.get.shuffleManager
.getReader(handle, partition.index, partition.index + 1, context) .getReader(
.read() shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
iter.foreach(op) .read()
iter.foreach(op)
}
} }
// the first dep is rdd1; add all values to the map // the first dep is rdd1; add all values to the map
integrate(partition.deps(0), t => getSeq(t._1) += t._2) integrate(0, t => getSeq(t._1) += t._2)
// the second dep is rdd2; remove all of its keys // the second dep is rdd2; remove all of its keys
integrate(partition.deps(1), t => map.remove(t._1)) integrate(1, t => map.remove(t._1))
map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment