From ead4ba0eb5841e42e6a57c1a1865bf89564e8ff9 Mon Sep 17 00:00:00 2001 From: Xiao Li <gatorsmile@gmail.com> Date: Mon, 20 Feb 2017 09:04:22 -0800 Subject: [PATCH] [SPARK-15453][SQL][FOLLOW-UP] FileSourceScanExec to extract `outputOrdering` information ### What changes were proposed in this pull request? `outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in https://github.com/apache/spark/pull/14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206). ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #16994 from gatorsmile/bucketingTS. --- .../spark/sql/sources/BucketedReadSuite.scala | 229 +++++++++++------- 1 file changed, 137 insertions(+), 92 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d9ddcbd57c..4fc72b9e47 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -227,6 +227,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2") + case class BucketedTableTestSpec( + bucketSpec: Option[BucketSpec], + numPartitions: Int = 10, + expectedShuffle: Boolean = true, + expectedSort: Boolean = true) + /** * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` * to hive tables, bucketed or not, according to the given bucket specifics. Next we will join @@ -234,14 +240,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet * exists as user expected according to the `shuffleLeft` and `shuffleRight`. */ private def testBucketing( - bucketSpecLeft: Option[BucketSpec], - bucketSpecRight: Option[BucketSpec], + bucketedTableTestSpecLeft: BucketedTableTestSpec, + bucketedTableTestSpecRight: BucketedTableTestSpec, joinType: String = "inner", - joinCondition: (DataFrame, DataFrame) => Column, - shuffleLeft: Boolean, - shuffleRight: Boolean, - sortLeft: Boolean = true, - sortRight: Boolean = true): Unit = { + joinCondition: (DataFrame, DataFrame) => Column): Unit = { + val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = + bucketedTableTestSpecLeft + val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = + bucketedTableTestSpecRight + withTable("bucketed_table1", "bucketed_table2") { def withBucket( writer: DataFrameWriter[Row], @@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet }.getOrElse(writer) } - withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1") - withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2") + withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft) + .saveAsTable("bucketed_table1") + withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight) + .saveAsTable("bucketed_table2") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { @@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // check existence of sort assert( joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft, - s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}") + s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}") assert( joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, - s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}") + s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") } } } @@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle when join 2 bucketed tables") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 ignore("avoid shuffle when join keys are a super-set of bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when join bucketed table and non-bucketed table") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = None, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when 2 bucketed tables have different bucket number") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil)) - val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil)) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only shuffle one side when 2 bucketed tables have different bucket keys") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil)) - val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil)) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i")), - shuffleLeft = false, - shuffleRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) ) } test("shuffle when join keys are not equal to bucket keys") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("j")), - shuffleLeft = true, - shuffleRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("j")) ) } test("shuffle when join 2 bucketed tables with bucketing disabled") { val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) + val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true) withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") { testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = true, - shuffleRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } } - test("avoid shuffle and sort when bucket and sort columns are join keys") { + test("check sort and shuffle when bucket and sort columns are join keys") { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Therefore, we still need to keep the Sort in both sides. val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - sortLeft = false, - sortRight = false + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) ) } test("avoid shuffle and sort when sort columns are a super set of join keys") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i")), - shuffleLeft = false, - shuffleRight = false, - sortLeft = false, - sortRight = false + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i")) ) } test("only sort one side when sort columns are different") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - sortLeft = false, - sortRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } test("only sort one side when sort columns are same but their ordering is different") { - val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) + val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true) testBucketing( - bucketSpecLeft = bucketSpec1, - bucketSpecRight = bucketSpec2, - joinCondition = joinCondition(Seq("i", "j")), - shuffleLeft = false, - shuffleRight = false, - sortLeft = false, - sortRight = true + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) ) } @@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("SPARK-17698 Join predicates should not contain filter clauses") { val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i"))) + val bucketedTableTestSpecLeft = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) testBucketing( - bucketSpecLeft = bucketSpec, - bucketSpecRight = bucketSpec, + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, joinType = "fullouter", joinCondition = (left: DataFrame, right: DataFrame) => { val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _) val filterLeft = left("i") === Literal("1") val filterRight = right("i") === Literal("1") joinPredicates && filterLeft && filterRight - }, - shuffleLeft = false, - shuffleRight = false, - sortLeft = false, - sortRight = false + } ) } -- GitLab