Skip to content
Snippets Groups Projects
Commit 7e26b576 authored by Michael Armbrust's avatar Michael Armbrust Committed by Reynold Xin
Browse files

[SPARK-2441][SQL] Add more efficient distinct operator.

Author: Michael Armbrust <michael@databricks.com>

Closes #1366 from marmbrus/partialDistinct and squashes the following commits:

12a31ab [Michael Armbrust] Add more efficient distinct operator.
parent 7a013529
No related branches found
No related tags found
No related merge requests found
......@@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
execution.Aggregate(
partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
......
......@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
/**
......@@ -248,6 +248,37 @@ object ExistingRdd {
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute() = rdd
}
/**
* :: DeveloperApi ::
* Computes the set of distinct input rows using a HashSet.
* @param partial when true the distinct operation is performed partially, per partition, without
* shuffling the data.
* @param child the input query plan.
*/
@DeveloperApi
case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def output = child.output
override def requiredChildDistribution =
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
override def execute() = {
child.execute().mapPartitions { iter =>
val hashSet = new scala.collection.mutable.HashSet[Row]()
var currentRow: Row = null
while (iter.hasNext) {
currentRow = iter.next()
if (!hashSet.contains(currentRow)) {
hashSet.add(currentRow.copy())
}
}
hashSet.iterator
}
}
}
/**
* :: DeveloperApi ::
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment