From 527fc5d0c990daaacad4740f62cfe6736609b77b Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Wed, 3 May 2017 09:22:25 -0700
Subject: [PATCH] [SPARK-20576][SQL] Support generic hint function in
 Dataset/DataFrame

## What changes were proposed in this pull request?
We allow users to specify hints (currently only "broadcast" is supported) in SQL and DataFrame. However, while SQL has a standard hint format (/*+ ... */), DataFrame doesn't have one and sometimes users are confused that they can't find how to apply a broadcast hint. This ticket adds a generic hint function on DataFrame that allows using the same hint on DataFrames as well as SQL.

As an example, after this patch, the following will apply a broadcast hint on a DataFrame using the new hint function:

```
df1.join(df2.hint("broadcast"))
```

## How was this patch tested?
Added a test case in DataFrameJoinSuite.

Author: Reynold Xin <rxin@databricks.com>

Closes #17839 from rxin/SPARK-20576.
---
 .../sql/catalyst/analysis/ResolveHints.scala   |  8 +++++++-
 .../scala/org/apache/spark/sql/Dataset.scala   | 16 ++++++++++++++++
 .../apache/spark/sql/DataFrameJoinSuite.scala  | 18 +++++++++++++++++-
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index c4827b81e8..df688fa0e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -86,7 +86,13 @@ object ResolveHints {
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
       case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
-        applyBroadcastHint(h.child, h.parameters.toSet)
+        if (h.parameters.isEmpty) {
+          // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
+          BroadcastHint(h.child)
+        } else {
+          // Otherwise, find within the subtree query plans that should be broadcasted.
+          applyBroadcastHint(h.child, h.parameters.toSet)
+        }
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 147e7651ce..620c8bd54b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1160,6 +1160,22 @@ class Dataset[T] private[sql](
    */
   def apply(colName: String): Column = col(colName)
 
+  /**
+   * Specifies some hint on the current Dataset. As an example, the following code specifies
+   * that one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 2.2.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+    Hint(name, parameters, logicalPlan)
+  }
+
   /**
    * Selects column based on the column name and return it as a [[Column]].
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb58e7..4a52af6c32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
   }
 
-  test("broadcast join hint") {
+  test("broadcast join hint using broadcast function") {
     val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
     val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
 
@@ -174,6 +174,22 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("broadcast join hint using Dataset.hint") {
+    // make sure a giant join is not broadcastable
+    val plan1 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong), "id")
+        .queryExecution.executedPlan
+    assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0)
+
+    // now with a hint it should be broadcasted
+    val plan2 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong).hint("broadcast"), "id")
+        .queryExecution.executedPlan
+    assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
+  }
+
   test("join - outer join conversion") {
     val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
     val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
-- 
GitLab