From 14865d7ff78db5cf9a3e8626204c8e7ed059c353 Mon Sep 17 00:00:00 2001 From: wangzhenhua <wangzhenhua@huawei.com> Date: Tue, 21 Mar 2017 08:44:09 -0700 Subject: [PATCH] [SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method structure and add a debug log ## What changes were proposed in this pull request? 1. Improve documentation for class `Cost` and `JoinReorderDP` and method `buildJoin()`. 2. Change code structure of `buildJoin()` to make the logic clearer. 3. Add a debug-level log to record information for join reordering, including time cost, the number of items and the number of plans in memo. ## How was this patch tested? Not related. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17353 from wzhfy/reorderFollow. --- .../optimizer/CostBasedJoinReorder.scala | 109 +++++++++++------- .../apache/spark/sql/internal/SQLConf.scala | 1 + 2 files changed, 68 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index 521c468fe1..fc37720809 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project} @@ -51,7 +52,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr } } - def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { + private def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) // TODO: Compute the set of star-joins and use them in the join enumeration // algorithm to prune un-optimal plan choices. @@ -69,7 +70,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr } /** - * Extract consecutive inner joinable items and join conditions. + * Extracts items of consecutive inner joins and join conditions. * This method works for bushy trees and left/right deep trees. */ private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = { @@ -119,18 +120,21 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr * When building m-way joins, we only keep the best plan (with the lowest cost) for the same set * of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among * plans (A J B) J C, (A J C) J B and (B J C) J A. - * - * Thus the plans maintained for each level when reordering four items A, B, C, D are as follows: + * We also prune cartesian product candidates when building a new plan if there exists no join + * condition involving references from both left and right. This pruning strategy significantly + * reduces the search space. + * E.g., given A J B J C J D with join conditions A.k1 = B.k1 and B.k2 = C.k2 and C.k3 = D.k3, + * plans maintained for each level are as follows: * level 0: p({A}), p({B}), p({C}), p({D}) - * level 1: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D}) - * level 2: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D}) + * level 1: p({A, B}), p({B, C}), p({C, D}) + * level 2: p({A, B, C}), p({B, C, D}) * level 3: p({A, B, C, D}) * where p({A, B, C, D}) is the final output plan. * * For cost evaluation, since physical costs for operators are not available currently, we use * cardinalities and sizes to compute costs. */ -object JoinReorderDP extends PredicateHelper { +object JoinReorderDP extends PredicateHelper with Logging { def search( conf: SQLConf, @@ -138,6 +142,7 @@ object JoinReorderDP extends PredicateHelper { conditions: Set[Expression], topOutput: AttributeSet): LogicalPlan = { + val startTime = System.nanoTime() // Level i maintains all found plans for i + 1 items. // Create the initial plans: each plan is a single item with zero cost. val itemIndex = items.zipWithIndex @@ -152,6 +157,10 @@ object JoinReorderDP extends PredicateHelper { foundPlans += searchLevel(foundPlans, conf, conditions, topOutput) } + val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) + logDebug(s"Join reordering finished. Duration: $durationInMs ms, number of items: " + + s"${items.length}, number of plans in memo: ${foundPlans.map(_.size).sum}") + // The last level must have one and only one plan, because all items are joinable. assert(foundPlans.size == items.length && foundPlans.last.size == 1) foundPlans.last.head._2.plan @@ -183,18 +192,15 @@ object JoinReorderDP extends PredicateHelper { } otherSideCandidates.foreach { otherSidePlan => - // Should not join two overlapping item sets. - if (oneSidePlan.itemIds.intersect(otherSidePlan.itemIds).isEmpty) { - val joinPlan = buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) - if (joinPlan.isDefined) { - val newJoinPlan = joinPlan.get + buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match { + case Some(newJoinPlan) => // Check if it's the first plan for the item set, or it's a better plan than // the existing one due to lower cost. val existingPlan = nextLevel.get(newJoinPlan.itemIds) if (existingPlan.isEmpty || newJoinPlan.betterThan(existingPlan.get, conf)) { nextLevel.update(newJoinPlan.itemIds, newJoinPlan) } - } + case None => } } } @@ -203,7 +209,17 @@ object JoinReorderDP extends PredicateHelper { nextLevel.toMap } - /** Build a new join node. */ + /** + * Builds a new JoinPlan when both conditions hold: + * - the sets of items contained in left and right sides do not overlap. + * - there exists at least one join condition involving references from both sides. + * @param oneJoinPlan One side JoinPlan for building a new JoinPlan. + * @param otherJoinPlan The other side JoinPlan for building a new join node. + * @param conf SQLConf for statistics computation. + * @param conditions The overall set of join conditions. + * @param topOutput The output attributes of the final plan. + * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None. + */ private def buildJoin( oneJoinPlan: JoinPlan, otherJoinPlan: JoinPlan, @@ -211,6 +227,11 @@ object JoinReorderDP extends PredicateHelper { conditions: Set[Expression], topOutput: AttributeSet): Option[JoinPlan] = { + if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) { + // Should not join two overlapping item sets. + return None + } + val onePlan = oneJoinPlan.plan val otherPlan = otherJoinPlan.plan val joinConds = conditions @@ -220,33 +241,33 @@ object JoinReorderDP extends PredicateHelper { if (joinConds.isEmpty) { // Cartesian product is very expensive, so we exclude them from candidate plans. // This also significantly reduces the search space. - None + return None + } + + // Put the deeper side on the left, tend to build a left-deep tree. + val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) { + (onePlan, otherPlan) } else { - // Put the deeper side on the left, tend to build a left-deep tree. - val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) { - (onePlan, otherPlan) + (otherPlan, onePlan) + } + val newJoin = Join(left, right, Inner, joinConds.reduceOption(And)) + val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds + val remainingConds = conditions -- collectedJoinConds + val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput + val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains) + val newPlan = + if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) { + Project(neededFromNewJoin.toSeq, newJoin) } else { - (otherPlan, onePlan) + newJoin } - val newJoin = Join(left, right, Inner, joinConds.reduceOption(And)) - val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds - val remainingConds = conditions -- collectedJoinConds - val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput - val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains) - val newPlan = - if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) { - Project(neededFromNewJoin.toSeq, newJoin) - } else { - newJoin - } - val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds) - // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf - // item), so the cost of the new join should also include its own cost. - val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) + - otherJoinPlan.planCost + otherJoinPlan.rootCost(conf) - Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost)) - } + val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds) + // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf + // item), so the cost of the new join should also include its own cost. + val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) + + otherJoinPlan.planCost + otherJoinPlan.rootCost(conf) + Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost)) } /** Map[set of item ids, join plan for these items] */ @@ -278,10 +299,10 @@ object JoinReorderDP extends PredicateHelper { } def betterThan(other: JoinPlan, conf: SQLConf): Boolean = { - if (other.planCost.rows == 0 || other.planCost.size == 0) { + if (other.planCost.card == 0 || other.planCost.size == 0) { false } else { - val relativeRows = BigDecimal(this.planCost.rows) / BigDecimal(other.planCost.rows) + val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card) val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size) relativeRows * conf.joinReorderCardWeight + relativeSize * (1 - conf.joinReorderCardWeight) < 1 @@ -290,7 +311,11 @@ object JoinReorderDP extends PredicateHelper { } } -/** This class defines the cost model. */ -case class Cost(rows: BigInt, size: BigInt) { - def +(other: Cost): Cost = Cost(this.rows + other.rows, this.size + other.size) +/** + * This class defines the cost model for a plan. + * @param card Cardinality (number of rows). + * @param size Size in bytes. + */ +case class Cost(card: BigInt, size: BigInt) { + def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b6e0b8ccbe..d5006c1646 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -708,6 +708,7 @@ object SQLConf { buildConf("spark.sql.cbo.joinReorder.dp.threshold") .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.") .intConf + .checkValue(number => number > 0, "The maximum number must be a positive integer.") .createWithDefault(12) val JOIN_REORDER_CARD_WEIGHT = -- GitLab