diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index c4827b81e8b6396dd6f2655aa9e3f0bef7d5b318..df688fa0e58aea3f3d852932067f4ef28155c772 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -86,7 +86,13 @@ object ResolveHints {
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
       case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
-        applyBroadcastHint(h.child, h.parameters.toSet)
+        if (h.parameters.isEmpty) {
+          // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
+          BroadcastHint(h.child)
+        } else {
+          // Otherwise, find within the subtree query plans that should be broadcasted.
+          applyBroadcastHint(h.child, h.parameters.toSet)
+        }
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 147e7651ce55ba9adf73b2a01dbc18a86df63979..620c8bd54ba004b54a68d064defe2a097e816919 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1160,6 +1160,22 @@ class Dataset[T] private[sql](
    */
   def apply(colName: String): Column = col(colName)
 
+  /**
+   * Specifies some hint on the current Dataset. As an example, the following code specifies
+   * that one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 2.2.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
+    Hint(name, parameters, logicalPlan)
+  }
+
   /**
    * Selects column based on the column name and return it as a [[Column]].
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 541ffb58e727fd4fe61661b0f7c8a17f244d3d71..4a52af6c32c377c7c2006408a14a75d5e57a64a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -151,7 +151,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
   }
 
-  test("broadcast join hint") {
+  test("broadcast join hint using broadcast function") {
     val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
     val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
 
@@ -174,6 +174,22 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("broadcast join hint using Dataset.hint") {
+    // make sure a giant join is not broadcastable
+    val plan1 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong), "id")
+        .queryExecution.executedPlan
+    assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0)
+
+    // now with a hint it should be broadcasted
+    val plan2 =
+      spark.range(10e10.toLong)
+        .join(spark.range(10e10.toLong).hint("broadcast"), "id")
+        .queryExecution.executedPlan
+    assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
+  }
+
   test("join - outer join conversion") {
     val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
     val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")