diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d58b8acefdade6812dfcebdede1a3c887160da61..d130962c63918d17fd1beb72e81f8a4c341cc441 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1336,7 +1336,7 @@ class Analyzer(
 
         // Category 1:
         // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
-        case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
+        case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
 
         // Category 2:
         // These operators can be anywhere in a correlated subquery.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ea4560aac7259fe0baf363fa4adb0ad8851f3220..2e3ac3e474866606d6da5fa199fcc7594669ed76 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper {
                  |in operator ${operator.simpleString}
                """.stripMargin)
 
-          case _: Hint =>
+          case _: UnresolvedHint =>
             throw new IllegalStateException(
               "Internal error: logical hint operator should have been removed during analysis")
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index df688fa0e58aea3f3d852932067f4ef28155c772..9dfd84cbc99410e49a560d393bec173795a70ac5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,11 +57,11 @@ object ResolveHints {
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
           case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
           case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
 
-          case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
+          case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
             // Don't traverse down these nodes.
             // For an existing broadcast hint, there is no point going down (if we do, we either
             // won't change the structure, or will introduce another broadcast hint that is useless.
@@ -85,10 +85,10 @@ object ResolveHints {
     }
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+      case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
         if (h.parameters.isEmpty) {
           // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
-          BroadcastHint(h.child)
+          ResolvedHint(h.child, isBroadcastable = Option(true))
         } else {
           // Otherwise, find within the subtree query plans that should be broadcasted.
           applyBroadcastHint(h.child, h.parameters.toSet)
@@ -102,7 +102,7 @@ object ResolveHints {
    */
   object RemoveAllHints extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint => h.child
+      case h: UnresolvedHint => h.child
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1802cd4bb131b59bb696f5ec6d289fad2e3270c6..ae2f6bfa94ae78f559226a9db2d6a9580736825b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -862,7 +862,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
     // Note that some operators (e.g. project, aggregate, union) are being handled separately
     // (earlier in this rule).
     case _: AppendColumns => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: Distinct => true
     case _: Generate => true
     case _: Pivot => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index d3ef5ea8409193f42f033503096bf2a4318cbf89..8931eb2c8f3b117395224eb473a0576d2ae6198e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -478,7 +478,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
     case _: Distinct => true
     case _: AppendColumns => true
     case _: AppendColumnsWithObject => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: RepartitionByExpression => true
     case _: Repartition => true
     case _: Sort => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f033fd4834c9625a53606715ce927ce440b0539b..7d2e3a6fe7580b9ab3238a83f823078496742965 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
-   * Add a [[Hint]] to a logical plan.
+   * Add a [[UnresolvedHint]] to a logical plan.
    */
   private def withHints(
       ctx: HintContext,
       query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
     val stmt = ctx.hintStatement
-    Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
+    UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index d39b0ef7e1d8abbc5d806d5d41a0ebc23ceb545f..ef925f92ecc7e1788a86c01fa4d5b9d47eff2f30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper {
         val substitutedCondition = substitute(aliases)(condition)
         (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
 
-      case BroadcastHint(child) =>
-        collectProjectsAndFilters(child)
+      case h: ResolvedHint =>
+        collectProjectsAndFilters(h.child)
 
       case other =>
         (None, Nil, other, Map.empty)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 3d4efef953a64e1c9bbc7d9d794e081c85fa2e58..81bb374cb0500271184d6c0e153b8c77ee4c869c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -68,6 +68,11 @@ case class Statistics(
       s"isBroadcastable=$isBroadcastable"
     ).filter(_.nonEmpty).mkString(", ")
   }
+
+  /** Must be called when computing stats for a join operator to reset hints. */
+  def resetHintsForJoin(): Statistics = copy(
+    isBroadcastable = false
+  )
 }
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d291ca0020838843a57943641e62da08c65fb82d..9f34b371740bd98231f4fab54fe9ff2555d19c19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -364,7 +364,7 @@ case class Join(
       case _ =>
         // Make sure we don't propagate isBroadcastable in other joins, because
         // they could explode the size.
-        super.computeStats(conf).copy(isBroadcastable = false)
+        super.computeStats(conf).resetHintsForJoin()
     }
 
     if (conf.cboEnabled) {
@@ -375,26 +375,6 @@ case class Join(
   }
 }
 
-/**
- * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
- */
-case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  // set isBroadcastable to true so the child will be broadcasted
-  override def computeStats(conf: SQLConf): Statistics =
-    child.stats(conf).copy(isBroadcastable = true)
-}
-
-/**
- * A general hint for the child. This node will be eliminated post analysis.
- * A pair of (name, parameters).
- */
-case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
-  override lazy val resolved: Boolean = false
-  override def output: Seq[Attribute] = child.output
-}
-
 /**
  * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
  * concrete implementations during analysis.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9bcbfbb4d1397898337a80a5db0dc01aaee41f73
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A general hint for the child that is not yet resolved. This node is generated by the parser and
+ * should be removed This node will be eliminated post analysis.
+ * A pair of (name, parameters).
+ */
+case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
+ */
+case class ResolvedHint(
+    child: LogicalPlan,
+    isBroadcastable: Option[Boolean] = None)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def computeStats(conf: SQLConf): Statistics = {
+    val stats = child.stats(conf)
+    isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
index d101e2227462d9d0797e3f89efcb3cfa223930f9..bb914e11a139a0859d38b322a163e7afef5b1a3c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -28,68 +28,70 @@ class ResolveHintsSuite extends AnalysisTest {
 
   test("invalid hints should be ignored") {
     checkAnalysis(
-      Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
+      UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
       testRelation,
       caseSensitive = false)
   }
 
   test("case-sensitive or insensitive parameters") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = true)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
       testRelation,
       caseSensitive = true)
   }
 
   test("multiple broadcast hint aliases") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
-      Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None),
+      UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))),
+      Join(ResolvedHint(testRelation, isBroadcastable = Option(true)),
+        ResolvedHint(testRelation2, isBroadcastable = Option(true)), Inner, None),
       caseSensitive = false)
   }
 
   test("do not traverse past existing broadcast hints") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))),
-      BroadcastHint(testRelation.where('a > 1)).analyze,
+      UnresolvedHint("MAPJOIN", Seq("table"),
+        ResolvedHint(table("table").where('a > 1), isBroadcastable = Option(true))),
+      ResolvedHint(testRelation.where('a > 1), isBroadcastable = Option(true)).analyze,
       caseSensitive = false)
   }
 
   test("should work for subqueries") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     // Negative case: if the alias doesn't match, don't match the original table name.
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
       testRelation,
       caseSensitive = false)
   }
 
   test("do not traverse past subquery alias") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
       testRelation.where('a > 1).analyze,
       caseSensitive = false)
   }
@@ -102,7 +104,8 @@ class ResolveHintsSuite extends AnalysisTest {
           |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
         """.stripMargin
       ),
-      BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
+      ResolvedHint(testRelation.where('a > 1).select('a), isBroadcastable = Option(true))
+        .select('a).analyze,
       caseSensitive = false)
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 589607e3ad5cb43de538040d8393f2e5b210bba6..a0a0daea7d075c871f68f1497a16bbaa6a3390d7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -321,15 +321,14 @@ class ColumnPruningSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Inner, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(
-          Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Inner, None).analyze
 
     comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 950aa2379517efa46c615ee6a907382f73d6f7e7..d4d281e7e05dbcb112f3b79ff3a787923169d09d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -798,12 +798,12 @@ class FilterPushdownSuite extends PlanTest {
   }
 
   test("broadcast hint") {
-    val originalQuery = BroadcastHint(testRelation)
+    val originalQuery = ResolvedHint(testRelation)
       .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
 
     val optimized = Optimize.execute(originalQuery.analyze)
 
-    val correctAnswer = BroadcastHint(testRelation.where('a === 2L))
+    val correctAnswer = ResolvedHint(testRelation.where('a === 2L))
       .where('b + Rand(10).as("rnd") === 3)
       .analyze
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index a43d78c7bd4474c8e47f7a738246478196475f96..105407d43bf3935e5f4c60df63d164e36a4c2a4d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -129,14 +129,14 @@ class JoinOptimizationSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Cross, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Cross, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Cross, None).analyze
 
     comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index d78741d032f38356c4569b1d1aaa0bf73b9e03bd..134e7614608819eff1f53a5d3880a14029f670d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -534,30 +534,31 @@ class PlanParserSuite extends PlanTest {
 
     comparePlans(
       parsePlan("SELECT /*+ HINT */ * FROM t"),
-      Hint("HINT", Seq.empty, table("t").select(star())))
+      UnresolvedHint("HINT", Seq.empty, table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
-      Hint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
-      Hint("MAPJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
-      Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
+      UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"),
-      Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
+      UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
-      Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
-      Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
+      UnresolvedHint("MAPJOIN", Seq("t"),
+        table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index b06871f96f0d8e3f89d0242d69f8bfdd4ee4c835..81b91e63b8f675e8b778077f7e32f529d90fd86c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -45,7 +45,7 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
       expectedStatsCboOn = filterStatsCboOn,
       expectedStatsCboOff = filterStatsCboOff)
 
-    val broadcastHint = BroadcastHint(filter)
+    val broadcastHint = ResolvedHint(filter, isBroadcastable = Option(true))
     checkStats(
       broadcastHint,
       expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 53773f18ce553831756a6840d9d1f278764db2b5..cbab029b87b2a7a81c75bf6f47e066097f8e3e59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1174,7 +1174,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
-    Hint(name, parameters, logicalPlan)
+    UnresolvedHint(name, parameters, logicalPlan)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 73541c22c6308ad8f1d569efabb9f83b8ed247c8..5981b49da277edd73c4a1c16aa74eb790e728ae3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -433,7 +433,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
         RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
-      case BroadcastHint(child) => planLater(child) :: Nil
+      case h: ResolvedHint => planLater(h.child) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5edf03666ac222d58c0f503d7c19ff9be1e599ff..563eae0b6483fa86c3af2a52cdfec25ae2f74e7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
+import org.apache.spark.sql.catalyst.plans.logical.ResolvedHint
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.internal.SQLConf
@@ -1019,7 +1019,8 @@ object functions {
    * @since 1.5.0
    */
   def broadcast[T](df: Dataset[T]): Dataset[T] = {
-    Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc)
+    Dataset[T](df.sparkSession,
+      ResolvedHint(df.logicalPlan, isBroadcastable = Option(true)))(df.exprEnc)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 26c45e092dc6505af7f9c2e0f43c054352039f74..afb8ced53e25c3daa7ff816c398c40f6b9d41029 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -157,7 +157,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
   }
 
   test("broadcast hint in SQL") {
-    import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Join}
+    import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}
 
     spark.range(10).createOrReplaceTempView("t")
     spark.range(10).createOrReplaceTempView("u")
@@ -170,12 +170,12 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
       val plan3 = sql(s"SELECT /*+ $name(v) */ * FROM t JOIN u ON t.id = u.id").queryExecution
         .optimizedPlan
 
-      assert(plan1.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(plan2.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
+      assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
     }
   }