diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 34371c9659423bfd1e4f75059e53c50b4f2502fc..73e4bfd78e577f0f4436afac7b17f7399ffbf2da 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -46,7 +46,9 @@ object MimaExcludes {
               "org.apache.spark.api.java.JavaRDDLike.partitioner"),
             // Mima false positive (was a private[spark] class)
             ProblemFilters.exclude[MissingClassProblem](
-              "org.apache.spark.util.collection.PairIterator")
+              "org.apache.spark.util.collection.PairIterator"),
+            // SQL execution is considered private.
+            excludePackage("org.apache.spark.sql.execution")
           )
         case v if v.startsWith("1.4") =>
           Seq(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5c6379b8d44b06dd0343b0c6bf4d1893b8812ef9..0a17b10c521e547b1c1529c66cee61a694fa6817 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -36,6 +36,8 @@ object DefaultOptimizer extends Optimizer {
     // SubQueries are only needed for analysis and can be removed before execution.
     Batch("Remove SubQueries", FixedPoint(100),
       EliminateSubQueries) ::
+    Batch("Distinct", FixedPoint(100),
+      ReplaceDistinctWithAggregate) ::
     Batch("Operator Reordering", FixedPoint(100),
       UnionPushdown,
       CombineFilters,
@@ -696,3 +698,15 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
       LocalRelation(projectList.map(_.toAttribute), data.map(projection))
   }
 }
+
+/**
+ * Replaces logical [[Distinct]] operator with an [[Aggregate]] operator.
+ * {{{
+ *   SELECT DISTINCT f1, f2 FROM t  ==>  SELECT f1, f2 FROM t GROUP BY f1, f2
+ * }}}
+ */
+object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Distinct(child) => Aggregate(child.output, child.output, child)
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 33a9e55a47deeb3e5b1468c4919c967d3b8fbe1c..e77e5c27b687ae445f178bd55eb83f68851d80bd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -339,6 +339,9 @@ case class Sample(
   override def output: Seq[Attribute] = child.output
 }
 
+/**
+ * Returns a new logical plan that dedups input rows.
+ */
 case class Distinct(child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..df29a62ff0e1564a5f0117d7ca4a025c4c216dd1
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ReplaceDistinctWithAggregateSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("ProjectCollapsing", Once, ReplaceDistinctWithAggregate) :: Nil
+  }
+
+  test("replace distinct with aggregate") {
+    val input = LocalRelation('a.int, 'b.int)
+
+    val query = Distinct(input)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer = Aggregate(input.output, input.output, input)
+
+    comparePlans(optimized, correctAnswer)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d1a54ada7b191674486c94748e5c964b8f406fe6..4a224153e1a37644522a9b64dde86b80d1ff5662 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1311,7 +1311,7 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  override def distinct: DataFrame = Distinct(logicalPlan)
+  override def distinct: DataFrame = dropDuplicates()
 
   /**
    * @group basic
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d0a1ad00560d3f747611584d39af12aa65f79eb4..7a1331a39151a5043a9a53b39a88e6d9787b734d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -284,8 +284,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case r: RunnableCommand => ExecutedCommand(r) :: Nil
 
       case logical.Distinct(child) =>
-        execution.Distinct(partial = false,
-          execution.Distinct(partial = true, planLater(child))) :: Nil
+        throw new IllegalStateException(
+          "logical distinct operator should have been replaced by aggregate in the optimizer")
       case logical.Repartition(numPartitions, shuffle, child) =>
         execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
       case logical.SortPartitions(sortExprs, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a30ade86441ca18f044463a3c236a44c3c161658..fb42072f9d5a7c30fe83704c33474445e0a24c2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -230,37 +230,6 @@ case class ExternalSort(
   override def outputOrdering: Seq[SortOrder] = sortOrder
 }
 
-/**
- * :: DeveloperApi ::
- * Computes the set of distinct input rows using a HashSet.
- * @param partial when true the distinct operation is performed partially, per partition, without
- *                shuffling the data.
- * @param child the input query plan.
- */
-@DeveloperApi
-case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
-
-  protected override def doExecute(): RDD[Row] = {
-    child.execute().mapPartitions { iter =>
-      val hashSet = new scala.collection.mutable.HashSet[Row]()
-
-      var currentRow: Row = null
-      while (iter.hasNext) {
-        currentRow = iter.next()
-        if (!hashSet.contains(currentRow)) {
-          hashSet.add(currentRow.copy())
-        }
-      }
-
-      hashSet.iterator
-    }
-  }
-}
-
 /**
  * :: DeveloperApi ::
  * Return a new RDD that has exactly `numPartitions` partitions.