Skip to content
Snippets Groups Projects
Commit 8220d526 authored by Michael Armbrust's avatar Michael Armbrust Committed by Reynold Xin
Browse files

[SPARK-6972][SQL] Add Coalesce to DataFrame

Author: Michael Armbrust <michael@databricks.com>

Closes #5545 from marmbrus/addCoalesce and squashes the following commits:

9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame
parent e5949c28
No related branches found
No related tags found
No related merge requests found
...@@ -908,6 +908,20 @@ class DataFrame private[sql]( ...@@ -908,6 +908,20 @@ class DataFrame private[sql](
schema, needsConversion = false) schema, needsConversion = false)
} }
/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.coalesce(numPartitions),
schema,
needsConversion = false)
}
/** /**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* @group dfops * @group dfops
......
...@@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] { ...@@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] {
def repartition(numPartitions: Int): DataFrame def repartition(numPartitions: Int): DataFrame
def coalesce(numPartitions: Int): DataFrame
def distinct: DataFrame def distinct: DataFrame
} }
...@@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest { ...@@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest {
testData.select('key).collect().toSeq) testData.select('key).collect().toSeq)
} }
test("coalesce") {
assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)
checkAnswer(
testData.select('key).coalesce(1).select('key),
testData.select('key).collect().toSeq)
}
test("groupBy") { test("groupBy") {
checkAnswer( checkAnswer(
testData2.groupBy("a").agg($"a", sum($"b")), testData2.groupBy("a").agg($"a", sum($"b")),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment