Skip to content
Snippets Groups Projects
Commit ead4ba0e authored by Xiao Li's avatar Xiao Li
Browse files

[SPARK-15453][SQL][FOLLOW-UP] FileSourceScanExec to extract `outputOrdering` information

### What changes were proposed in this pull request?
`outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in https://github.com/apache/spark/pull/14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206).

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #16994 from gatorsmile/bucketingTS.
parent d0ecca60
No related branches found
No related tags found
No related merge requests found
...@@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
case class BucketedTableTestSpec(
bucketSpec: Option[BucketSpec],
numPartitions: Int = 10,
expectedShuffle: Boolean = true,
expectedSort: Boolean = true)
/** /**
* A helper method to test the bucket read functionality using join. It will save `df1` and `df2` * A helper method to test the bucket read functionality using join. It will save `df1` and `df2`
* to hive tables, bucketed or not, according to the given bucket specifics. Next we will join * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join
...@@ -234,14 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -234,14 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
* exists as user expected according to the `shuffleLeft` and `shuffleRight`. * exists as user expected according to the `shuffleLeft` and `shuffleRight`.
*/ */
private def testBucketing( private def testBucketing(
bucketSpecLeft: Option[BucketSpec], bucketedTableTestSpecLeft: BucketedTableTestSpec,
bucketSpecRight: Option[BucketSpec], bucketedTableTestSpecRight: BucketedTableTestSpec,
joinType: String = "inner", joinType: String = "inner",
joinCondition: (DataFrame, DataFrame) => Column, joinCondition: (DataFrame, DataFrame) => Column): Unit = {
shuffleLeft: Boolean, val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) =
shuffleRight: Boolean, bucketedTableTestSpecLeft
sortLeft: Boolean = true, val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) =
sortRight: Boolean = true): Unit = { bucketedTableTestSpecRight
withTable("bucketed_table1", "bucketed_table2") { withTable("bucketed_table1", "bucketed_table2") {
def withBucket( def withBucket(
writer: DataFrameWriter[Row], writer: DataFrameWriter[Row],
...@@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}.getOrElse(writer) }.getOrElse(writer)
} }
withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1") withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft)
withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2") .saveAsTable("bucketed_table1")
withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight)
.saveAsTable("bucketed_table2")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
...@@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// check existence of sort // check existence of sort
assert( assert(
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}") s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
assert( assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}") s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
} }
} }
} }
...@@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
test("avoid shuffle when join 2 bucketed tables") { test("avoid shuffle when join 2 bucketed tables") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = false
) )
} }
// Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
ignore("avoid shuffle when join keys are a super-set of bucket keys") { ignore("avoid shuffle when join keys are a super-set of bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = false
) )
} }
test("only shuffle one side when join bucketed table and non-bucketed table") { test("only shuffle one side when join bucketed table and non-bucketed table") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = None, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = true
) )
} }
test("only shuffle one side when 2 bucketed tables have different bucket number") { test("only shuffle one side when 2 bucketed tables have different bucket number") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil)) val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec1, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec2, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = true
) )
} }
test("only shuffle one side when 2 bucketed tables have different bucket keys") { test("only shuffle one side when 2 bucketed tables have different bucket keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil)) val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil))
val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil)) val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec1, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec2, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i")), joinCondition = joinCondition(Seq("i"))
shuffleLeft = false,
shuffleRight = true
) )
} }
test("shuffle when join keys are not equal to bucket keys") { test("shuffle when join keys are not equal to bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("j")), joinCondition = joinCondition(Seq("j"))
shuffleLeft = true,
shuffleRight = true
) )
} }
test("shuffle when join 2 bucketed tables with bucketing disabled") { test("shuffle when join 2 bucketed tables with bucketing disabled") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = true,
shuffleRight = true
) )
} }
} }
test("avoid shuffle and sort when bucket and sort columns are join keys") { test("check sort and shuffle when bucket and sort columns are join keys") {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Therefore, we still need to keep the Sort in both sides.
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketedTableTestSpecLeft1 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
val bucketedTableTestSpecRight1 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight1,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false, )
shuffleRight = false,
sortLeft = false, val bucketedTableTestSpecLeft2 = BucketedTableTestSpec(
sortRight = false bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight2 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2,
bucketedTableTestSpecRight = bucketedTableTestSpecRight2,
joinCondition = joinCondition(Seq("i", "j"))
)
val bucketedTableTestSpecLeft3 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
val bucketedTableTestSpecRight3 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3,
bucketedTableTestSpecRight = bucketedTableTestSpecRight3,
joinCondition = joinCondition(Seq("i", "j"))
)
val bucketedTableTestSpecLeft4 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight4 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4,
bucketedTableTestSpecRight = bucketedTableTestSpecRight4,
joinCondition = joinCondition(Seq("i", "j"))
) )
} }
test("avoid shuffle and sort when sort columns are a super set of join keys") { test("avoid shuffle and sort when sort columns are a super set of join keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec1, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec2, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i")), joinCondition = joinCondition(Seq("i"))
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = false
) )
} }
test("only sort one side when sort columns are different") { test("only sort one side when sort columns are different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec1, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec2, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = true
) )
} }
test("only sort one side when sort columns are same but their ordering is different") { test("only sort one side when sort columns are same but their ordering is different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec1, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec2, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j")), joinCondition = joinCondition(Seq("i", "j"))
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = true
) )
} }
...@@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet ...@@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
test("SPARK-17698 Join predicates should not contain filter clauses") { test("SPARK-17698 Join predicates should not contain filter clauses") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing( testBucketing(
bucketSpecLeft = bucketSpec, bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketSpecRight = bucketSpec, bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinType = "fullouter", joinType = "fullouter",
joinCondition = (left: DataFrame, right: DataFrame) => { joinCondition = (left: DataFrame, right: DataFrame) => {
val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)
val filterLeft = left("i") === Literal("1") val filterLeft = left("i") === Literal("1")
val filterRight = right("i") === Literal("1") val filterRight = right("i") === Literal("1")
joinPredicates && filterLeft && filterRight joinPredicates && filterLeft && filterRight
}, }
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = false
) )
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment