From 5dadda86456e1d3918e320b83aec7e2f1352d95d Mon Sep 17 00:00:00 2001
From: Yanjie Gao <gaoyanjie55@163.com>
Date: Fri, 4 Jul 2014 02:43:57 -0700
Subject: [PATCH] [SPARK-2234][SQL]Spark SQL basicOperators  add Except
 operator

Hi all,
I want to submit a Except operator in basicOperators.scala
In SQL case.SQL support two table do except operator.
select * from table1
except
select * from table2
This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support
JIRA:https://issues.apache.org/jira/browse/SPARK-2234

Author: Yanjie Gao <gaoyanjie55@163.com>
Author: YanjieGao <396154235@qq.com>
Author: root <root@node4.(none)>
Author: gaoyanjie <gaoyanjie55@163.com>

Closes #1151 from YanjieGao/patch-6 and squashes the following commits:

f19f899 [YanjieGao] add a new blank line in basicoperators.scala
2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies
fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6
9940d19 [YanjieGao] make comment less than 100c
09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment
b4b5867 [root] Merge remote branch 'upstream/master' into patch-6
b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext
7e0ec29 [Yanjie Gao] delete multi test
7e7c83f [Yanjie Gao] delete conflict except
b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators
4dc8166 [YanjieGao] resolve conflict
fa68a98 [Yanjie Gao] Update joins.scala
8e6bb00 [Yanjie Gao] delete conflict except
dd9ba5e [Yanjie Gao] Update joins.scala
a0d4e73 [Yanjie Gao] delete skew join
60f5ddd [Yanjie Gao] update less than 100c
0e72233 [Yanjie Gao] update SQLQuerySuite on master branch
7f916b5 [Yanjie Gao] update execution/basicOperators on master branch
a28dece [Yanjie Gao] Update logical/basicOperators on master branch
a639935 [Yanjie Gao] Update SparkStrategies.scala
3bf7def [Yanjie Gao] update SqlParser on master branch
26f833f [Yanjie Gao] update SparkStrategies.scala on master branch
8dd063f [Yanjie Gao] Update logical/basicOperators on master branch
9847dcf [Yanjie Gao] update SqlParser on masterbranch
d6a4604 [Yanjie Gao] Update joins.scala
424c507 [Yanjie Gao] Update joins.scala
7680742 [Yanjie Gao] Update SqlParser.scala
a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151
5c8a224 [Yanjie Gao] update the line less than 100c
ee066b3 [Yanjie Gao] Update basicOperators.scala
32a80ab [Yanjie Gao] remove except in HiveQl
cf232eb [Yanjie Gao] update 1comment 2space3 left.out
f1ea3f3 [Yanjie Gao] remove comment
7ea9b91 [Yanjie Gao] remove annotation
7f3d613 [Yanjie Gao] update .map(_.copy())
670a1bb [Yanjie Gao] Update HiveQl.scala
3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala
a36eb0a [Yanjie Gao] Update basicOperators.scala
7859e56 [Yanjie Gao] Update SparkStrategies.scala
052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2)
aab3785 [Yanjie Gao] Update SQLQuerySuite.scala
4bf80b1 [Yanjie Gao] update subtract to except
4bdd520 [Yanjie Gao] Update SqlParser.scala
2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala
0808921 [Yanjie Gao] SQLQuerySuite
a8a1948 [Yanjie Gao] SparkStrategies
1fe96c0 [Yanjie Gao] HiveQl.scala update
3305e40 [Yanjie Gao] SqlParser
7a98c37 [Yanjie Gao] Update basicOperators.scala
cf5b9d0 [Yanjie Gao] Update basicOperators.scala
8945835 [Yanjie Gao] object SkewJoin extends Strategy
2b98962 [Yanjie Gao] Update SqlParser.scala
dd32980 [Yanjie Gao] update1
68815b2 [Yanjie Gao] Reformat the code style
4eb43ec [Yanjie Gao] Update basicOperators.scala
aa06072 [Yanjie Gao] Reformat the code sytle
---
 .../org/apache/spark/sql/catalyst/SqlParser.scala |  3 +++
 .../catalyst/plans/logical/basicOperators.scala   |  6 ++++++
 .../spark/sql/execution/SparkStrategies.scala     |  2 ++
 .../spark/sql/execution/basicOperators.scala      | 15 +++++++++++++++
 .../org/apache/spark/sql/SQLQuerySuite.scala      | 14 ++++++++++++++
 5 files changed, 40 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 61762fa2a7..ecb1112955 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -118,6 +118,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected val UNCACHE = Keyword("UNCACHE")
   protected val UNION = Keyword("UNION")
   protected val WHERE = Keyword("WHERE")
+  protected val EXCEPT = Keyword("EXCEPT")
+
 
   // Use reflection to find the reserved words defined in this class.
   protected val reservedWords =
@@ -138,6 +140,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected lazy val query: Parser[LogicalPlan] = (
     select * (
         UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+        EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
         UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
       )
     | insert | cache
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 3e0639867b..bac5a72464 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -89,6 +89,12 @@ case class Join(
   }
 }
 
+case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+  def output = left.output
+
+  def references = Set.empty
+}
+
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0925605b7c..9e036e127b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -273,6 +273,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Limit(limit, planLater(child))(sqlContext) :: Nil
       case Unions(unionChildren) =>
         execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
+      case logical.Except(left,right) =>                                        
+        execution.Except(planLater(left),planLater(right)) :: Nil   
       case logical.Generate(generator, join, outer, _, child) =>
         execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
       case logical.NoRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a278f1ca98..4b59e0b4e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -205,3 +205,18 @@ object ExistingRdd {
 case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
   override def execute() = rdd
 }
+
+/**
+ * :: DeveloperApi ::
+ * Returns a table with the elements from left that are not in right using
+ * the built-in spark subtract function.
+ */
+@DeveloperApi
+case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+  override def output = left.output
+
+  override def execute() = {
+    left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
+  }
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2c1cb18670..5c6701e203 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -371,6 +371,20 @@ class SQLQuerySuite extends QueryTest {
         (3, null)))
   }
 
+  test("EXCEPT") {
+
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "),
+      (1, "a") ::
+      (2, "b") ::
+      (3, "c") ::
+      (4, "d") :: Nil)
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil)
+    checkAnswer(
+      sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
+  }
+
   test("SET commands semantics using sql()") {
     TestSQLContext.settings.synchronized {
       clear()
-- 
GitLab