Skip to content
Snippets Groups Projects
Commit 7201f033 authored by Guillaume Poulin's avatar Guillaume Poulin Committed by Sean Owen
Browse files

[SPARK-12425][STREAMING] DStream union optimisation

Use PartitionerAwareUnionRDD when possbile for optimizing shuffling and
preserving the partitioner.

Author: Guillaume Poulin <poulin.guillaume@gmail.com>

Closes #10382 from gpoulin/dstream_union_optimisation.
parent a172e11c
No related branches found
No related tags found
No related merge requests found
......@@ -568,11 +568,7 @@ abstract class RDD[T: ClassTag](
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = withScope {
if (partitioner.isDefined && other.partitioner == partitioner) {
new PartitionerAwareUnionRDD(sc, Array(this, other))
} else {
new UnionRDD(sc, Array(this, other))
}
sc.union(this, other)
}
/**
......
......@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.spark.SparkException
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
private[streaming]
......@@ -45,7 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
s" time $validTime")
}
if (rdds.nonEmpty) {
Some(new UnionRDD(ssc.sc, rdds))
Some(ssc.sc.union(rdds))
} else {
None
}
......
......@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.Duration
......@@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
logDebug("Using partition aware union for windowing at " + validTime)
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc, rddsInWindow)
}
Some(windowRDD)
Some(ssc.sc.union(rddsInWindow))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment