From fbe4216e1e83d243a7f0521b76bfb20c25278281 Mon Sep 17 00:00:00 2001 From: Ioana Delaney <ioanamdelaney@gmail.com> Date: Thu, 13 Apr 2017 22:27:04 +0800 Subject: [PATCH] [SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration ## What changes were proposed in this pull request? Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph: ``` T1 D1 - T2 - T3 \ / F1 | D2 star-join: {F1, D1, D2} non-star: {T1, T2, T3} ``` The following join combinations will be generated: ``` level 0: (F1), (D1), (D2), (T1), (T2), (T3) level 1: {F1, D1}, {F1, D2}, {T2, T3} level 2: {F1, D1, D2} level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2} level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 } level 6: {F1, D1, D2, T1, T2, T3} ``` ## How was this patch tested? New test suite ```StarJOinCostBasedReorderSuite.scala```. Author: Ioana Delaney <ioanamdelaney@gmail.com> Closes #17546 from ioana-delaney/starSchemaCBOv3. --- .../optimizer/CostBasedJoinReorder.scala | 144 +++++- .../optimizer/StarSchemaDetection.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../StarJoinCostBasedReorderSuite.scala | 426 ++++++++++++++++++ 4 files changed, 571 insertions(+), 9 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index cbd506465a..c704c2e6d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) - // TODO: Compute the set of star-joins and use them in the join enumeration - // algorithm to prune un-optimal plan choices. val result = // Do reordering if the number of items is appropriate and join conditions exist. // We also need to check if costs of all items can be evaluated. @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging { case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) }.toMap) + // Build filters from the join graph to be used by the search algorithm. + val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex) + // Build plans for next levels until the last level has only one plan. This plan contains // all items that can be joined, so there's no need to continue. val topOutputSet = AttributeSet(output) - while (foundPlans.size < items.length && foundPlans.last.size > 1) { + while (foundPlans.size < items.length) { // Build plans for the next level. - foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet) + foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters) } val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) @@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging { existingLevels: Seq[JoinPlanMap], conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): JoinPlanMap = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): JoinPlanMap = { val nextLevel = mutable.Map.empty[Set[Int], JoinPlan] var k = 0 @@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging { } otherSideCandidates.foreach { otherSidePlan => - buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match { + buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match { case Some(newJoinPlan) => // Check if it's the first plan for the item set, or it's a better plan than // the existing one due to lower cost. @@ -218,14 +220,20 @@ object JoinReorderDP extends PredicateHelper with Logging { } /** - * Builds a new JoinPlan when both conditions hold: + * Builds a new JoinPlan if the following conditions hold: * - the sets of items contained in left and right sides do not overlap. * - there exists at least one join condition involving references from both sides. + * - if star-join filter is enabled, allow the following combinations: + * 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join + * 2) star-join is a subset of (oneJoinPlan U otherJoinPlan) + * 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join + * * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. * @param otherJoinPlan The other side JoinPlan for building a new join node. * @param conf SQLConf for statistics computation. * @param conditions The overall set of join conditions. * @param topOutput The output attributes of the final plan. + * @param filters Join graph info to be used as filters by the search algorithm. * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. */ private def buildJoin( @@ -233,13 +241,27 @@ object JoinReorderDP extends PredicateHelper with Logging { otherJoinPlan: JoinPlan, conf: SQLConf, conditions: Set[Expression], - topOutput: AttributeSet): Option[JoinPlan] = { + topOutput: AttributeSet, + filters: Option[JoinGraphInfo]): Option[JoinPlan] = { if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { // Should not join two overlapping item sets. return None } + if (filters.isDefined) { + // Apply star-join filter, which ensures that tables in a star schema relationship + // are planned together. The star-filter will eliminate joins among star and non-star + // tables until the star joins are built. The following combinations are allowed: + // 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join + // 2. star-join is a subset of (oneJoinPlan U otherJoinPlan) + // 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join + val isValidJoinCombination = + JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds, + filters.get) + if (!isValidJoinCombination) return None + } + val onePlan = oneJoinPlan.plan val otherPlan = otherJoinPlan.plan val joinConds = conditions @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging { case class Cost(card: BigInt, size: BigInt) { def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } + +/** + * Implements optional filters to reduce the search space for join enumeration. + * + * 1) Star-join filters: Plan star-joins together since they are assumed + * to have an optimal execution based on their RI relationship. + * 2) Cartesian products: Defer their planning later in the graph to avoid + * large intermediate results (expanding joins, in general). + * 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing + * intermediate results. + * + * Filters (2) and (3) are not implemented. + */ +object JoinReorderDPFilters extends PredicateHelper { + /** + * Builds join graph information to be used by the filtering strategies. + * Currently, it builds the sets of star/non-star joins. + * It can be extended with the sets of connected/unconnected joins, which + * can be used to filter Cartesian products. + */ + def buildJoinGraphInfo( + conf: SQLConf, + items: Seq[LogicalPlan], + conditions: Set[Expression], + itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = { + + if (conf.joinReorderDPStarFilter) { + // Compute the tables in a star-schema relationship. + val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) + val nonStarJoin = items.filterNot(starJoin.contains(_)) + + if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { + val itemMap = itemIndex.toMap + Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet)) + } else { + // Nothing interesting to return. + None + } + } else { + // Star schema filter is not enabled. + None + } + } + + /** + * Applies the star-join filter that eliminates join combinations among star + * and non-star tables until the star join is built. + * + * Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan + * permutations generated by the DP join enumeration, and the star/non-star plans, + * the following plan combinations are allowed: + * 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join + * 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan) + * 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join + * + * It assumes the sets are disjoint. + * + * Example query graph: + * + * t1 d1 - t2 - t3 + * \ / + * f1 + * | + * d2 + * + * star: {d1, f1, d2} + * non-star: {t2, t1, t3} + * + * level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + * level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + * level 2: {d2 f1 d1 } + * level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + * level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + * level 5: {d1 t3 t2 f1 t1 d2 } + * + * @param oneSideJoinPlan One side of the join represented as a set of plan ids. + * @param otherSideJoinPlan The other side of the join represented as a set of plan ids. + * @param filters Star and non-star plans represented as sets of plan ids + */ + def starJoinFilter( + oneSideJoinPlan: Set[Int], + otherSideJoinPlan: Set[Int], + filters: JoinGraphInfo) : Boolean = { + val starJoins = filters.starJoins + val nonStarJoins = filters.nonStarJoins + val join = oneSideJoinPlan.union(otherSideJoinPlan) + + // Disjoint sets + oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty && + // Either star or non-star is empty + (starJoins.isEmpty || nonStarJoins.isEmpty || + // Join is a subset of the star-join + join.subsetOf(starJoins) || + // Star-join is a subset of join + starJoins.subsetOf(join) || + // Join is a subset of non-star + join.subsetOf(nonStarJoins)) + } +} + +/** + * Helper class that keeps information about the join graph as sets of item/plan ids. + * It currently stores the star/non-star plans. It can be + * extended with the set of connected/unconnected plans. + */ +case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index 91cb004eae..97ee998838 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { val emptyStarJoinPlan = Seq.empty[LogicalPlan] - if (!conf.starSchemaDetection || input.size < 2) { + if (input.size < 2) { emptyStarJoinPlan } else { // Find if the input plans are eligible for star join detection. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6b0f495033..2e1798e22b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = + buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) + val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf @@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging { def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT) + def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER) + def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD) def sortMergeJoinExecBufferSpillThreshold: Int = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala new file mode 100644 index 0000000000..a23d6266b2 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf._ + + +class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { + + override val conf = new SQLConf().copy( + CBO_ENABLED -> true, + JOIN_REORDER_ENABLED -> true, + JOIN_REORDER_DP_STAR_FILTER -> true) + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Operator Optimizations", FixedPoint(100), + CombineFilters, + PushDownPredicate, + ReorderJoin(conf), + PushPredicateThroughJoin, + ColumnPruning, + CollapseProject) :: + Batch("Join Reorder", Once, + CostBasedJoinReorder(conf)) :: Nil + } + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( + // F1 (fact table) + attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + + // D1 (dimension) + attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50), + nullCount = 0, avgLen = 4, maxLen = 4), + + // D2 (dimension) + attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + + // D3 (dimension) + attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4), + + // T1 (regular table i.e. outside star) + attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 1, avgLen = 4, maxLen = 4), + + // T2 (regular table) + attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + + // T3 (regular table) + attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + + // T4 (regular table) + attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + + // T5 (regular table) + attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + + // T6 (regular table) + attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4), + attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), + nullCount = 1, avgLen = 4, maxLen = 4) + + )) + + private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) + private val nameToColInfo: Map[String, (Attribute, ColumnStat)] = + columnInfo.map(kv => kv._1.name -> kv) + + private val f1 = StatsTestPlan( + outputList = Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2").map(nameToAttr), + rowCount = 1000, + size = Some(1000 * (8 + 4 * 5)), + attributeStats = AttributeMap(Seq("f1_fk1", "f1_fk2", "f1_fk3", "f1_c1", "f1_c2") + .map(nameToColInfo))) + + // To control the layout of the join plans, keep the size for the non-fact tables constant + // and vary the rowcount and the number of distinct values of the join columns. + private val d1 = StatsTestPlan( + outputList = Seq("d1_pk", "d1_c2", "d1_c3").map(nameToAttr), + rowCount = 100, + size = Some(3000), + attributeStats = AttributeMap(Seq("d1_pk", "d1_c2", "d1_c3").map(nameToColInfo))) + + private val d2 = StatsTestPlan( + outputList = Seq("d2_pk", "d2_c2", "d2_c3").map(nameToAttr), + rowCount = 20, + size = Some(3000), + attributeStats = AttributeMap(Seq("d2_pk", "d2_c2", "d2_c3").map(nameToColInfo))) + + private val d3 = StatsTestPlan( + outputList = Seq("d3_pk", "d3_c2", "d3_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("d3_pk", "d3_c2", "d3_c3").map(nameToColInfo))) + + private val t1 = StatsTestPlan( + outputList = Seq("t1_c1", "t1_c2", "t1_c3").map(nameToAttr), + rowCount = 50, + size = Some(3000), + attributeStats = AttributeMap(Seq("t1_c1", "t1_c2", "t1_c3").map(nameToColInfo))) + + private val t2 = StatsTestPlan( + outputList = Seq("t2_c1", "t2_c2", "t2_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("t2_c1", "t2_c2", "t2_c3").map(nameToColInfo))) + + private val t3 = StatsTestPlan( + outputList = Seq("t3_c1", "t3_c2", "t3_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("t3_c1", "t3_c2", "t3_c3").map(nameToColInfo))) + + private val t4 = StatsTestPlan( + outputList = Seq("t4_c1", "t4_c2", "t4_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("t4_c1", "t4_c2", "t4_c3").map(nameToColInfo))) + + private val t5 = StatsTestPlan( + outputList = Seq("t5_c1", "t5_c2", "t5_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("t5_c1", "t5_c2", "t5_c3").map(nameToColInfo))) + + private val t6 = StatsTestPlan( + outputList = Seq("t6_c1", "t6_c2", "t6_c3").map(nameToAttr), + rowCount = 10, + size = Some(3000), + attributeStats = AttributeMap(Seq("t6_c1", "t6_c2", "t6_c3").map(nameToColInfo))) + + test("Test 1: Star query with two dimensions and two regular tables") { + + // d1 t1 + // \ / + // f1 + // / \ + // d2 t2 + // + // star: {f1, d1, d2} + // non-star: {t1, t2} + // + // level 0: (t2 ), (d2 ), (f1 ), (d1 ), (t1 ) + // level 1: {f1 d1 }, {d2 f1 } + // level 2: {d2 f1 d1 } + // level 3: {t2 d1 d2 f1 }, {t1 d1 d2 f1 } + // level 4: {f1 t1 t2 d1 d2 } + // + // Number of generated plans: 11 (vs. 20 w/o filter) + val query = + f1.join(t1).join(t2).join(d1).join(d2) + .where((nameToAttr("f1_c1") === nameToAttr("t1_c1")) && + (nameToAttr("f1_c2") === nameToAttr("t2_c1")) && + (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) && + (nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + + val expected = + f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))) + .join(t2, Inner, Some(nameToAttr("f1_c2") === nameToAttr("t2_c1"))) + .join(t1, Inner, Some(nameToAttr("f1_c1") === nameToAttr("t1_c1"))) + + assertEqualPlans(query, expected) + } + + test("Test 2: Star with a linear branch") { + // + // t1 d1 - t2 - t3 + // \ / + // f1 + // | + // d2 + // + // star: {d1, f1, d2} + // non-star: {t2, t1, t3} + // + // level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 ) + // level 1: {t3 t2 }, {f1 d2 }, {f1 d1 } + // level 2: {d2 f1 d1 } + // level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 } + // level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 } + // level 5: {d1 t3 t2 f1 t1 d2 } + // + // Number of generated plans: 15 (vs 24) + val query = + d1.join(t1).join(t2).join(f1).join(d2).join(t3) + .where((nameToAttr("d1_pk") === nameToAttr("f1_fk1")) && + (nameToAttr("t1_c1") === nameToAttr("f1_c1")) && + (nameToAttr("d2_pk") === nameToAttr("f1_fk2")) && + (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) && + (nameToAttr("d1_c2") === nameToAttr("t2_c1")) && + (nameToAttr("t2_c2") === nameToAttr("t3_c1"))) + + val expected = + f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))) + .join(t3.join(t2, Inner, Some(nameToAttr("t2_c2") === nameToAttr("t3_c1"))), Inner, + Some(nameToAttr("d1_c2") === nameToAttr("t2_c1"))) + .join(t1, Inner, Some(nameToAttr("t1_c1") === nameToAttr("f1_c1"))) + + assertEqualPlans(query, expected) + } + + test("Test 3: Star with derived branches") { + // t3 t2 + // | | + // d1 - t4 - t1 + // | + // f1 + // | + // d2 + // + // star: (d1 f1 d2 ) + // non-star: (t4 t1 t2 t3 ) + // + // level 0: (t1 ), (t3 ), (f1 ), (d1 ), (t2 ), (d2 ), (t4 ) + // level 1: {f1 d2 }, {t1 t4 }, {t1 t2 }, {f1 d1 }, {t3 t4 } + // level 2: {d1 f1 d2 }, {t2 t1 t4 }, {t1 t3 t4 } + // level 3: {t4 d1 f1 d2 }, {t3 t4 t1 t2 } + // level 4: {d1 f1 t4 d2 t3 }, {d1 f1 t4 d2 t1 } + // level 5: {d1 f1 t4 d2 t1 t2 }, {d1 f1 t4 d2 t1 t3 } + // level 6: {d1 f1 t4 d2 t1 t2 t3 } + // + // Number of generated plans: 22 (vs. 34) + val query = + d1.join(t1).join(t2).join(t3).join(t4).join(f1).join(d2) + .where((nameToAttr("t1_c1") === nameToAttr("t2_c1")) && + (nameToAttr("t3_c1") === nameToAttr("t4_c1")) && + (nameToAttr("t1_c2") === nameToAttr("t4_c2")) && + (nameToAttr("d1_c2") === nameToAttr("t4_c3")) && + (nameToAttr("f1_fk1") === nameToAttr("d1_pk")) && + (nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + + val expected = + f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))) + .join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner, + Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))) + .join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner, + Some(nameToAttr("t1_c2") === nameToAttr("t4_c2"))) + + assertEqualPlans(query, expected) + } + + test("Test 4: Star with several branches") { + // + // d1 - t3 - t4 + // | + // f1 - d3 - t1 - t2 + // | + // d2 - t5 - t6 + // + // star: {d1 f1 d2 d3 } + // non-star: {t5 t3 t6 t2 t4 t1} + // + // level 0: (t4 ), (d2 ), (t5 ), (d3 ), (d1 ), (f1 ), (t2 ), (t6 ), (t1 ), (t3 ) + // level 1: {t5 t6 }, {t4 t3 }, {d3 f1 }, {t2 t1 }, {d2 f1 }, {d1 f1 } + // level 2: {d2 d1 f1 }, {d2 d3 f1 }, {d3 d1 f1 } + // level 3: {d2 d1 d3 f1 } + // level 4: {d1 t3 d3 f1 d2 }, {d1 d3 f1 t1 d2 }, {d1 t5 d3 f1 d2 } + // level 5: {d1 t5 d3 f1 t1 d2 }, {d1 t3 t4 d3 f1 d2 }, {d1 t5 t6 d3 f1 d2 }, + // {d1 t5 t3 d3 f1 d2 }, {d1 t3 d3 f1 t1 d2 }, {d1 t2 d3 f1 t1 d2 } + // level 6: {d1 t5 t3 t4 d3 f1 d2 }, {d1 t3 t2 d3 f1 t1 d2 }, {d1 t5 t6 d3 f1 t1 d2 }, + // {d1 t5 t3 d3 f1 t1 d2 }, {d1 t5 t2 d3 f1 t1 d2 }, ... + // ... + // level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 } + // + // Number of generated plans: 46 (vs. 82) + val query = + d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2) + .where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) && + (nameToAttr("t3_c2") === nameToAttr("t4_c2")) && + (nameToAttr("d1_pk") === nameToAttr("f1_fk1")) && + (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) && + (nameToAttr("d2_c2") === nameToAttr("t5_c1")) && + (nameToAttr("t5_c2") === nameToAttr("t6_c2")) && + (nameToAttr("f1_fk3") === nameToAttr("d3_pk")) && + (nameToAttr("d3_c2") === nameToAttr("t1_c1")) && + (nameToAttr("t1_c2") === nameToAttr("t2_c2"))) + + val expected = + f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk"))) + .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))) + .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + .join(t4.join(t3, Inner, Some(nameToAttr("t3_c2") === nameToAttr("t4_c2"))), Inner, + Some(nameToAttr("d1_c2") === nameToAttr("t3_c1"))) + .join(t2.join(t1, Inner, Some(nameToAttr("t1_c2") === nameToAttr("t2_c2"))), Inner, + Some(nameToAttr("d3_c2") === nameToAttr("t1_c1"))) + .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner, + Some(nameToAttr("d2_c2") === nameToAttr("t5_c1"))) + + assertEqualPlans(query, expected) + } + + test("Test 5: RI star only") { + // d1 + // | + // f1 + // / \ + // d2 d3 + // + // star: {f1, d1, d2, d3} + // non-star: {} + // level 0: (d1), (f1), (d2), (d3) + // level 1: {f1 d3 }, {f1 d2 }, {d1 f1 } + // level 2: {d1 f1 d2 }, {d2 f1 d3 }, {d1 f1 d3 } + // level 3: {d1 d2 f1 d3 } + // Number of generated plans: 11 (= 11) + val query = + d1.join(d2).join(f1).join(d3) + .where((nameToAttr("f1_fk1") === nameToAttr("d1_pk")) && + (nameToAttr("f1_fk2") === nameToAttr("d2_pk")) && + (nameToAttr("f1_fk3") === nameToAttr("d3_pk"))) + + val expected = + f1.join(d3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("d3_pk"))) + .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk"))) + .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))) + + assertEqualPlans(query, expected) + } + + test("Test 6: No RI star") { + // + // f1 - t1 - t2 - t3 + // + // star: {} + // non-star: {f1, t1, t2, t3} + // level 0: (t1), (f1), (t2), (t3) + // level 1: {f1 t3 }, {f1 t2 }, {t1 f1 } + // level 2: {t1 f1 t2 }, {t2 f1 t3 }, {dt f1 t3 } + // level 3: {t1 t2 f1 t3 } + // Number of generated plans: 11 (= 11) + val query = + t1.join(f1).join(t2).join(t3) + .where((nameToAttr("f1_fk1") === nameToAttr("t1_c1")) && + (nameToAttr("f1_fk2") === nameToAttr("t2_c1")) && + (nameToAttr("f1_fk3") === nameToAttr("t3_c1"))) + + val expected = + f1.join(t3, Inner, Some(nameToAttr("f1_fk3") === nameToAttr("t3_c1"))) + .join(t2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("t2_c1"))) + .join(t1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("t1_c1"))) + + assertEqualPlans(query, expected) + } + + private def assertEqualPlans( plan1: LogicalPlan, plan2: LogicalPlan): Unit = { + val optimized = Optimize.execute(plan1.analyze) + val expected = plan2.analyze + compareJoinOrder(optimized, expected) + } +} -- GitLab