diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 07647508421a4c24de5dce8609e87241ebdc88a6..17e2611790d5a7b319563e1a4c51c5d57ad6c602 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -152,6 +152,25 @@ class Dataset[T] private[sql](
    */
   def count(): Long = toDF().count()
 
+  /**
+    * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+    * @since 1.6.0
+    */
+  def repartition(numPartitions: Int): Dataset[T] = withPlan {
+    Repartition(numPartitions, shuffle = true, _)
+  }
+
+  /**
+    * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+    * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
+    * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
+    * the 100 new partitions will claim 10 of the current partitions.
+    * @since 1.6.0
+    */
+  def coalesce(numPartitions: Int): Dataset[T] = withPlan {
+    Repartition(numPartitions, shuffle = false, _)
+  }
+
   /* *********************** *
    *  Functional Operations  *
    * *********************** */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 13eede1b17d8be090f7e54766bbde5e84d7864db..c253fdbb8c99e3fcb5fb2552ce88c59729918af0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -52,6 +52,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     assert(ds.takeAsList(1).get(0) == item)
   }
 
+  test("coalesce, repartition") {
+    val data = (1 to 100).map(i => ClassData(i.toString, i))
+    val ds = data.toDS()
+
+    assert(ds.repartition(10).rdd.partitions.length == 10)
+    checkAnswer(
+      ds.repartition(10),
+      data: _*)
+
+    assert(ds.coalesce(1).rdd.partitions.length == 1)
+    checkAnswer(
+      ds.coalesce(1),
+      data: _*)
+  }
+
   test("as tuple") {
     val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
     checkAnswer(