From 0d589ba00b5d539fbfef5174221de046a70548cd Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Tue, 23 May 2017 18:44:49 +0200
Subject: [PATCH] [SPARK-20857][SQL] Generic resolved hint node

## What changes were proposed in this pull request?
This patch renames BroadcastHint to ResolvedHint (and Hint to UnresolvedHint) so the hint framework is more generic and would allow us to introduce other hint types in the future without introducing new hint nodes.

## How was this patch tested?
Updated test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #18072 from rxin/SPARK-20857.
---
 .../sql/catalyst/analysis/Analyzer.scala      |  2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala |  2 +-
 .../sql/catalyst/analysis/ResolveHints.scala  | 12 ++---
 .../sql/catalyst/optimizer/Optimizer.scala    |  2 +-
 .../sql/catalyst/optimizer/expressions.scala  |  2 +-
 .../sql/catalyst/parser/AstBuilder.scala      |  4 +-
 .../sql/catalyst/planning/patterns.scala      |  4 +-
 .../catalyst/plans/logical/Statistics.scala   |  5 ++
 .../plans/logical/basicLogicalOperators.scala | 22 +--------
 .../sql/catalyst/plans/logical/hints.scala    | 49 +++++++++++++++++++
 .../catalyst/analysis/ResolveHintsSuite.scala | 41 +++++++++-------
 .../optimizer/ColumnPruningSuite.scala        |  5 +-
 .../optimizer/FilterPushdownSuite.scala       |  4 +-
 .../optimizer/JoinOptimizationSuite.scala     |  4 +-
 .../sql/catalyst/parser/PlanParserSuite.scala | 15 +++---
 .../BasicStatsEstimationSuite.scala           |  2 +-
 .../scala/org/apache/spark/sql/Dataset.scala  |  2 +-
 .../spark/sql/execution/SparkStrategies.scala |  2 +-
 .../org/apache/spark/sql/functions.scala      |  5 +-
 .../execution/joins/BroadcastJoinSuite.scala  | 14 +++---
 20 files changed, 118 insertions(+), 80 deletions(-)
 create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d58b8acefd..d130962c63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1336,7 +1336,7 @@ class Analyzer(
 
         // Category 1:
         // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
-        case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
+        case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
 
         // Category 2:
         // These operators can be anywhere in a correlated subquery.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ea4560aac7..2e3ac3e474 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper {
                  |in operator ${operator.simpleString}
                """.stripMargin)
 
-          case _: Hint =>
+          case _: UnresolvedHint =>
             throw new IllegalStateException(
               "Internal error: logical hint operator should have been removed during analysis")
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index df688fa0e5..9dfd84cbc9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,11 +57,11 @@ object ResolveHints {
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
           case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
           case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
 
-          case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
+          case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
             // Don't traverse down these nodes.
             // For an existing broadcast hint, there is no point going down (if we do, we either
             // won't change the structure, or will introduce another broadcast hint that is useless.
@@ -85,10 +85,10 @@ object ResolveHints {
     }
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+      case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
         if (h.parameters.isEmpty) {
           // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
-          BroadcastHint(h.child)
+          ResolvedHint(h.child, isBroadcastable = Option(true))
         } else {
           // Otherwise, find within the subtree query plans that should be broadcasted.
           applyBroadcastHint(h.child, h.parameters.toSet)
@@ -102,7 +102,7 @@ object ResolveHints {
    */
   object RemoveAllHints extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint => h.child
+      case h: UnresolvedHint => h.child
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1802cd4bb1..ae2f6bfa94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -862,7 +862,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
     // Note that some operators (e.g. project, aggregate, union) are being handled separately
     // (earlier in this rule).
     case _: AppendColumns => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: Distinct => true
     case _: Generate => true
     case _: Pivot => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index d3ef5ea840..8931eb2c8f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -478,7 +478,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
     case _: Distinct => true
     case _: AppendColumns => true
     case _: AppendColumnsWithObject => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: RepartitionByExpression => true
     case _: Repartition => true
     case _: Sort => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f033fd4834..7d2e3a6fe7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
-   * Add a [[Hint]] to a logical plan.
+   * Add a [[UnresolvedHint]] to a logical plan.
    */
   private def withHints(
       ctx: HintContext,
       query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
     val stmt = ctx.hintStatement
-    Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
+    UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index d39b0ef7e1..ef925f92ec 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper {
         val substitutedCondition = substitute(aliases)(condition)
         (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
 
-      case BroadcastHint(child) =>
-        collectProjectsAndFilters(child)
+      case h: ResolvedHint =>
+        collectProjectsAndFilters(h.child)
 
       case other =>
         (None, Nil, other, Map.empty)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 3d4efef953..81bb374cb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -68,6 +68,11 @@ case class Statistics(
       s"isBroadcastable=$isBroadcastable"
     ).filter(_.nonEmpty).mkString(", ")
   }
+
+  /** Must be called when computing stats for a join operator to reset hints. */
+  def resetHintsForJoin(): Statistics = copy(
+    isBroadcastable = false
+  )
 }
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d291ca0020..9f34b37174 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -364,7 +364,7 @@ case class Join(
       case _ =>
         // Make sure we don't propagate isBroadcastable in other joins, because
         // they could explode the size.
-        super.computeStats(conf).copy(isBroadcastable = false)
+        super.computeStats(conf).resetHintsForJoin()
     }
 
     if (conf.cboEnabled) {
@@ -375,26 +375,6 @@ case class Join(
   }
 }
 
-/**
- * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
- */
-case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  // set isBroadcastable to true so the child will be broadcasted
-  override def computeStats(conf: SQLConf): Statistics =
-    child.stats(conf).copy(isBroadcastable = true)
-}
-
-/**
- * A general hint for the child. This node will be eliminated post analysis.
- * A pair of (name, parameters).
- */
-case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
-  override lazy val resolved: Boolean = false
-  override def output: Seq[Attribute] = child.output
-}
-
 /**
  * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
  * concrete implementations during analysis.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
new file mode 100644
index 0000000000..9bcbfbb4d1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A general hint for the child that is not yet resolved. This node is generated by the parser and
+ * should be removed This node will be eliminated post analysis.
+ * A pair of (name, parameters).
+ */
+case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
+ */
+case class ResolvedHint(
+    child: LogicalPlan,
+    isBroadcastable: Option[Boolean] = None)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def computeStats(conf: SQLConf): Statistics = {
+    val stats = child.stats(conf)
+    isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
index d101e22274..bb914e11a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -28,68 +28,70 @@ class ResolveHintsSuite extends AnalysisTest {
 
   test("invalid hints should be ignored") {
     checkAnalysis(
-      Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
+      UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
       testRelation,
       caseSensitive = false)
   }
 
   test("case-sensitive or insensitive parameters") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = true)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
       testRelation,
       caseSensitive = true)
   }
 
   test("multiple broadcast hint aliases") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
-      Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
+      UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
+      Join(ResolvedHint(testRelation, isBroadcastable = Option(true)),
+        ResolvedHint(testRelation2, isBroadcastable = Option(true)), Inner, None),
       caseSensitive = false)
   }
 
   test("do not traverse past existing broadcast hints") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
-      BroadcastHint(testRelation.where('a > 1)).analyze,
+      UnresolvedHint("MAPJOIN", Seq("table"),
+        ResolvedHint(table("table").where('a > 1), isBroadcastable = Option(true))),
+      ResolvedHint(testRelation.where('a > 1), isBroadcastable = Option(true)).analyze,
       caseSensitive = false)
   }
 
   test("should work for subqueries") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     // Negative case: if the alias doesn't match, don't match the original table name.
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
       testRelation,
       caseSensitive = false)
   }
 
   test("do not traverse past subquery alias") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
       testRelation.where('a > 1).analyze,
       caseSensitive = false)
   }
@@ -102,7 +104,8 @@ class ResolveHintsSuite extends AnalysisTest {
           |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
         """.stripMargin
       ),
-      BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
+      ResolvedHint(testRelation.where('a > 1).select('a), isBroadcastable = Option(true))
+        .select('a).analyze,
       caseSensitive = false)
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 589607e3ad..a0a0daea7d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -321,15 +321,14 @@ class ColumnPruningSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Inner, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(
-          Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Inner, None).analyze
 
     comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 950aa23795..d4d281e7e0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -798,12 +798,12 @@ class FilterPushdownSuite extends PlanTest {
   }
 
   test("broadcast hint") {
-    val originalQuery = BroadcastHint(testRelation)
+    val originalQuery = ResolvedHint(testRelation)
       .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
 
     val optimized = Optimize.execute(originalQuery.analyze)
 
-    val correctAnswer = BroadcastHint(testRelation.where('a === 2L))
+    val correctAnswer = ResolvedHint(testRelation.where('a === 2L))
       .where('b + Rand(10).as("rnd") === 3)
       .analyze
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index a43d78c7bd..105407d43b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -129,14 +129,14 @@ class JoinOptimizationSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Cross, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Cross, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Cross, None).analyze
 
     comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index d78741d032..134e761460 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -534,30 +534,31 @@ class PlanParserSuite extends PlanTest {
 
     comparePlans(
       parsePlan("SELECT /*+ HINT */ * FROM t"),
-      Hint("HINT", Seq.empty, table("t").select(star())))
+      UnresolvedHint("HINT", Seq.empty, table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
-      Hint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
-      Hint("MAPJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
-      Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
+      UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"),
-      Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
+      UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
-      Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
-      Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
+      UnresolvedHint("MAPJOIN", Seq("t"),
+        table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index b06871f96f..81b91e63b8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -45,7 +45,7 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
       expectedStatsCboOn = filterStatsCboOn,
       expectedStatsCboOff = filterStatsCboOff)
 
-    val broadcastHint = BroadcastHint(filter)
+    val broadcastHint = ResolvedHint(filter, isBroadcastable = Option(true))
     checkStats(
       broadcastHint,
       expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 53773f18ce..cbab029b87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1174,7 +1174,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
-    Hint(name, parameters, logicalPlan)
+    UnresolvedHint(name, parameters, logicalPlan)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 73541c22c6..5981b49da2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -433,7 +433,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
         RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
-      case BroadcastHint(child) => planLater(child) :: Nil
+      case h: ResolvedHint => planLater(h.child) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5edf03666a..563eae0b64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
+import org.apache.spark.sql.catalyst.plans.logical.ResolvedHint
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.internal.SQLConf
@@ -1019,7 +1019,8 @@ object functions {
    * @since 1.5.0
    */
   def broadcast[T](df: Dataset[T]): Dataset[T] = {
-    Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc)
+    Dataset[T](df.sparkSession,
+      ResolvedHint(df.logicalPlan, isBroadcastable = Option(true)))(df.exprEnc)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 26c45e092d..afb8ced53e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -157,7 +157,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
   }
 
   test("broadcast hint in SQL") {
-    import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Join}
+    import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}
 
     spark.range(10).createOrReplaceTempView("t")
     spark.range(10).createOrReplaceTempView("u")
@@ -170,12 +170,12 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
       val plan3 = sql(s"SELECT /*+ $name(v) */ * FROM t JOIN u ON t.id = u.id").queryExecution
         .optimizedPlan
 
-      assert(plan1.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(plan2.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
+      assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
     }
   }
 
-- 
GitLab