diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d78b4c2f8909cf847216800286b4948841782ec8..adad85806d1ea3f48ac22760f15165299c09c46d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -685,53 +685,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ @scala.annotation.varargs - def groupBy(cols: Column*): GroupedData = { - GroupedData(this, cols.map(_.expr), GroupedData.GroupByType) - } - - /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * {{{ - * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup($"department", $"group").avg() - * - * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def rollup(cols: Column*): GroupedData = { - GroupedData(this, cols.map(_.expr), GroupedData.RollupType) - } - - /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * {{{ - * // Compute the average for all numeric columns cubed by department and group. - * df.cube($"department", $"group").avg() - * - * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType) + def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr)) /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. @@ -756,61 +710,7 @@ class DataFrame private[sql]( @scala.annotation.varargs def groupBy(col1: String, cols: String*): GroupedData = { val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType) - } - - /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * This is a variant of rollup that can only group by existing columns using column names - * (i.e. cannot construct expressions). - * - * {{{ - * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup("department", "group").avg() - * - * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def rollup(col1: String, cols: String*): GroupedData = { - val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType) - } - - /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * This is a variant of cube that can only group by existing columns using column names - * (i.e. cannot construct expressions). - * - * {{{ - * // Compute the average for all numeric columns cubed by department and group. - * df.cube("department", "group").avg() - * - * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def cube(col1: String, cols: String*): GroupedData = { - val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType) + new GroupedData(this, colNames.map(colName => resolve(colName))) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index f730e4ae00e2b11eb05a692a8eea1fa93389b683..1381b9f1a60807a568d8798ff58439bbc906bf0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -23,40 +23,9 @@ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.Star import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate} +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.types.NumericType -/** - * Companion object for GroupedData - */ -private[sql] object GroupedData { - def apply( - df: DataFrame, - groupingExprs: Seq[Expression], - groupType: GroupType): GroupedData = { - new GroupedData(df, groupingExprs, groupType: GroupType) - } - - /** - * The Grouping Type - */ - trait GroupType - - /** - * To indicate it's the GroupBy - */ - object GroupByType extends GroupType - - /** - * To indicate it's the CUBE - */ - object CubeType extends GroupType - - /** - * To indicate it's the ROLLUP - */ - object RollupType extends GroupType -} /** * :: Experimental :: @@ -65,37 +34,19 @@ private[sql] object GroupedData { * @since 1.3.0 */ @Experimental -class GroupedData protected[sql]( - df: DataFrame, - groupingExprs: Seq[Expression], - private val groupType: GroupedData.GroupType) { +class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) { - private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { - val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { - val retainedExprs = groupingExprs.map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyString)() - } - retainedExprs ++ aggExprs - } else { - aggExprs - } - - groupType match { - case GroupedData.GroupByType => - DataFrame( - df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan)) - case GroupedData.RollupType => - DataFrame( - df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates)) - case GroupedData.CubeType => - DataFrame( - df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates)) + private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { + val namedGroupingExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyString)() } + DataFrame( + df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan)) } private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression) - : DataFrame = { + : Seq[NamedExpression] = { val columnExprs = if (colNames.isEmpty) { // No columns specified. Use all numeric columns. @@ -112,10 +63,10 @@ class GroupedData protected[sql]( namedExpr } } - toDF(columnExprs.map { c => + columnExprs.map { c => val a = f(c) Alias(a, a.prettyString)() - }) + } } private[this] def strToExpr(expr: String): (Expression => Expression) = { @@ -168,10 +119,10 @@ class GroupedData protected[sql]( * @since 1.3.0 */ def agg(exprs: Map[String, String]): DataFrame = { - toDF(exprs.map { case (colName, expr) => + exprs.map { case (colName, expr) => val a = strToExpr(expr)(df(colName).expr) Alias(a, a.prettyString)() - }.toSeq) + }.toSeq } /** @@ -224,10 +175,19 @@ class GroupedData protected[sql]( */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = { - toDF((expr +: exprs).map(_.expr).map { + val aggExprs = (expr +: exprs).map(_.expr).map { case expr: NamedExpression => expr case expr: Expression => Alias(expr, expr.prettyString)() - }) + } + if (df.sqlContext.conf.dataFrameRetainGroupColumns) { + val retainedExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyString)() + } + DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan)) + } else { + DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) + } } /** @@ -236,7 +196,7 @@ class GroupedData protected[sql]( * * @since 1.3.0 */ - def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")())) + def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")()) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. @@ -296,5 +256,5 @@ class GroupedData protected[sql]( @scala.annotation.varargs def sum(colNames: String*): DataFrame = { aggregateNumericColumns(colNames:_*)(Sum) - } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala deleted file mode 100644 index 3ad05f482504c6df0e0cc2338f21ec413eb9fc53..0000000000000000000000000000000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ - -case class TestData2Int(a: Int, b: Int) - -// TODO ideally we should put the test suite into the package `sql`, as -// `hive` package is optional in compiling, however, `SQLContext.sql` doesn't -// support the `cube` or `rollup` yet. -class HiveDataFrameAnalyticsSuite extends QueryTest { - val testData = - TestHive.sparkContext.parallelize( - TestData2Int(1, 2) :: - TestData2Int(2, 4) :: Nil).toDF() - - testData.registerTempTable("mytable") - - test("rollup") { - checkAnswer( - testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b")), - sql("select a + b, b, sum(a - b) from mytable group by a + b, b with rollup").collect() - ) - - checkAnswer( - testData.rollup("a", "b").agg(sum("b")), - sql("select a, b, sum(b) from mytable group by a, b with rollup").collect() - ) - } - - test("cube") { - checkAnswer( - testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b")), - sql("select a + b, b, sum(a - b) from mytable group by a + b, b with cube").collect() - ) - - checkAnswer( - testData.cube("a", "b").agg(sum("b")), - sql("select a, b, sum(b) from mytable group by a, b with cube").collect() - ) - } -}