Skip to content
Snippets Groups Projects
Commit 5fdcbdc0 authored by Cheng Hao's avatar Cheng Hao Committed by Michael Armbrust
Browse files

[SPARK-4625] [SQL] Add sort by for DSL & SimpleSqlParser

Add `sort by` support for both DSL & SqlParser.

This PR is relevant with #3386, either one merged, will cause the other rebased.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #3481 from chenghao-intel/sortby and squashes the following commits:

041004f [Cheng Hao] Add sort by for DSL & SimpleSqlParser
parent cf50631a
No related branches found
No related tags found
No related merge requests found
......@@ -85,6 +85,7 @@ class SqlParser extends AbstractSparkSQLParser {
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
......@@ -140,7 +141,7 @@ class SqlParser extends AbstractSparkSQLParser {
(WHERE ~> expression).? ~
(GROUP ~ BY ~> rep1sep(expression, ",")).? ~
(HAVING ~> expression).? ~
(ORDER ~ BY ~> ordering).? ~
sortType.? ~
(LIMIT ~> expression).? ^^ {
case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>
val base = r.getOrElse(NoRelation)
......@@ -150,7 +151,7 @@ class SqlParser extends AbstractSparkSQLParser {
.getOrElse(Project(assignAliases(p), withFilter))
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
val withOrder = o.map(Sort(_, withHaving)).getOrElse(withHaving)
val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
withLimit
}
......@@ -202,6 +203,11 @@ class SqlParser extends AbstractSparkSQLParser {
| FULL ~ OUTER.? ^^^ FullOuter
)
protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
( ORDER ~ BY ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
| SORT ~ BY ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
)
protected lazy val ordering: Parser[Seq[SortOrder]] =
( rep1sep(singleOrder, ",")
| rep1sep(expression, ",") ~ direction.? ^^ {
......
......@@ -246,6 +246,8 @@ package object dsl {
def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
val aliasedExprs = aggregateExprs.map {
case ne: NamedExpression => ne
......
......@@ -216,6 +216,19 @@ class SchemaRDD(
def orderBy(sortExprs: SortOrder*): SchemaRDD =
new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
/**
* Sorts the results by the given expressions within partition.
* {{{
* schemaRDD.sortBy('a)
* schemaRDD.sortBy('a, 'b)
* schemaRDD.sortBy('a.asc, 'b.desc)
* }}}
*
* @group Query
*/
def sortBy(sortExprs: SortOrder*): SchemaRDD =
new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
@deprecated("use limit with integer argument", "1.1.0")
def limit(limitExpr: Expression): SchemaRDD =
new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan))
......
......@@ -120,6 +120,24 @@ class DslQuerySuite extends QueryTest {
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
}
test("sorting #2") {
checkAnswer(
testData2.sortBy('a.asc, 'b.asc),
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
checkAnswer(
testData2.sortBy('a.asc, 'b.desc),
Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
checkAnswer(
testData2.sortBy('a.desc, 'b.desc),
Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
checkAnswer(
testData2.sortBy('a.desc, 'b.asc),
Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
}
test("limit") {
checkAnswer(
testData.limit(10),
......
......@@ -42,6 +42,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
TimeZone.setDefault(origZone)
}
test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
checkAnswer(
sql("SELECT a FROM testData2 SORT BY a"),
Seq(1, 1, 2 ,2 ,3 ,3).map(Seq(_))
)
}
test("grouping on nested fields") {
jsonRDD(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
.registerTempTable("rows")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment