Skip to content
Snippets Groups Projects
Commit 33549170 authored by Tejas Patil's avatar Tejas Patil Committed by Wenchen Fan
Browse files

[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-15453

Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted.

Query:

```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
```

Before:

```
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
:  +- *Project [i#119, j#120, k#121]
:     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
:        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
 +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
```

After:  (note that the `Sort` step is no longer there)

```
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
:  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
:     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
   +- *Filter (isnotnull(j#52) && isnotnull(k#53))
      +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
```

## How was this patch tested?

Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`

Author: Tejas Patil <tejasp@fb.com>

Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
parent f7d21437
No related branches found
No related tags found
No related merge requests found
......@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
......@@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}
override val outputPartitioning: Partitioning = {
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
val bucketColumns = spec.bucketColumnNames.flatMap { n =>
output.find(_.name == n)
}
if (bucketColumns.size == spec.bucketColumnNames.size) {
HashPartitioning(bucketColumns, numBuckets)
} else {
UnknownPartitioning(0)
}
}.getOrElse {
UnknownPartitioning(0)
bucketSpec match {
case Some(spec) =>
// For bucketed columns:
// -----------------------
// `HashPartitioning` would be used only when:
// 1. ALL the bucketing columns are being read from the table
//
// For sorted columns:
// ---------------------
// Sort ordering should be used when ALL these criteria's match:
// 1. `HashPartitioning` is being used
// 2. A prefix (or all) of the sort columns are being read from the table.
//
// Sort ordering would be over the prefix subset of `sort columns` being read
// from the table.
// eg.
// Assume (col0, col2, col3) are the columns read from the table
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
// above
def toAttribute(colName: String): Option[Attribute] =
output.find(_.name == colName)
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
if (bucketColumns.size == spec.bucketColumnNames.size) {
val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
val sortOrder = if (sortColumns.nonEmpty) {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it
val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
if (singleFilePartitions) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
} else {
Nil
}
} else {
Nil
}
(partitioning, sortOrder)
} else {
(UnknownPartitioning(0), Nil)
}
case _ =>
(UnknownPartitioning(0), Nil)
}
}
......@@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))
private lazy val inputRDD: RDD[InternalRow] = {
val selectedPartitions = relation.location.listFiles(partitionFilters)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
......
......@@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
......@@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketSpecRight: Option[BucketSpec],
joinColumns: Seq[String],
shuffleLeft: Boolean,
shuffleRight: Boolean): Unit = {
shuffleRight: Boolean,
sortLeft: Boolean = true,
sortRight: Boolean = true): Unit = {
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
......@@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
spec.numBuckets,
spec.bucketColumnNames.head,
spec.bucketColumnNames.tail: _*)
if (spec.sortColumnNames.nonEmpty) {
writer.sortBy(
spec.sortColumnNames.head,
spec.sortColumnNames.tail: _*
)
} else {
writer
}
}.getOrElse(writer)
}
......@@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
// check existence of shuffle
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
// check existence of sort
assert(
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
}
}
}
......@@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}
test("avoid shuffle and sort when bucket and sort columns are join keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
testBucketing(
bucketSpec, bucketSpec, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = false
)
}
test("avoid shuffle and sort when sort columns are a super set of join keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = false
)
}
test("only sort one side when sort columns are different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = true
)
}
test("only sort one side when sort columns are same but their ordering is different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
testBucketing(
bucketSpec1, bucketSpec2, Seq("i", "j"),
shuffleLeft = false, shuffleRight = false,
sortLeft = false, sortRight = true
)
}
test("avoid shuffle when grouping keys are equal to bucket keys") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment