Skip to content
Snippets Groups Projects
Commit 14865d7f authored by wangzhenhua's avatar wangzhenhua Committed by Xiao Li
Browse files

[SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method...

[SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method structure and add a debug log

## What changes were proposed in this pull request?

1. Improve documentation for class `Cost` and `JoinReorderDP` and method `buildJoin()`.
2. Change code structure of `buildJoin()` to make the logic clearer.
3. Add a debug-level log to record information for join reordering, including time cost, the number of items and the number of plans in memo.

## How was this patch tested?

Not related.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17353 from wzhfy/reorderFollow.
parent 650d03cf
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike}
import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project}
......@@ -51,7 +52,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
}
}
def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
private def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
......@@ -69,7 +70,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
}
/**
* Extract consecutive inner joinable items and join conditions.
* Extracts items of consecutive inner joins and join conditions.
* This method works for bushy trees and left/right deep trees.
*/
private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
......@@ -119,18 +120,21 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
* When building m-way joins, we only keep the best plan (with the lowest cost) for the same set
* of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among
* plans (A J B) J C, (A J C) J B and (B J C) J A.
*
* Thus the plans maintained for each level when reordering four items A, B, C, D are as follows:
* We also prune cartesian product candidates when building a new plan if there exists no join
* condition involving references from both left and right. This pruning strategy significantly
* reduces the search space.
* E.g., given A J B J C J D with join conditions A.k1 = B.k1 and B.k2 = C.k2 and C.k3 = D.k3,
* plans maintained for each level are as follows:
* level 0: p({A}), p({B}), p({C}), p({D})
* level 1: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
* level 2: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
* level 1: p({A, B}), p({B, C}), p({C, D})
* level 2: p({A, B, C}), p({B, C, D})
* level 3: p({A, B, C, D})
* where p({A, B, C, D}) is the final output plan.
*
* For cost evaluation, since physical costs for operators are not available currently, we use
* cardinalities and sizes to compute costs.
*/
object JoinReorderDP extends PredicateHelper {
object JoinReorderDP extends PredicateHelper with Logging {
def search(
conf: SQLConf,
......@@ -138,6 +142,7 @@ object JoinReorderDP extends PredicateHelper {
conditions: Set[Expression],
topOutput: AttributeSet): LogicalPlan = {
val startTime = System.nanoTime()
// Level i maintains all found plans for i + 1 items.
// Create the initial plans: each plan is a single item with zero cost.
val itemIndex = items.zipWithIndex
......@@ -152,6 +157,10 @@ object JoinReorderDP extends PredicateHelper {
foundPlans += searchLevel(foundPlans, conf, conditions, topOutput)
}
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
logDebug(s"Join reordering finished. Duration: $durationInMs ms, number of items: " +
s"${items.length}, number of plans in memo: ${foundPlans.map(_.size).sum}")
// The last level must have one and only one plan, because all items are joinable.
assert(foundPlans.size == items.length && foundPlans.last.size == 1)
foundPlans.last.head._2.plan
......@@ -183,18 +192,15 @@ object JoinReorderDP extends PredicateHelper {
}
otherSideCandidates.foreach { otherSidePlan =>
// Should not join two overlapping item sets.
if (oneSidePlan.itemIds.intersect(otherSidePlan.itemIds).isEmpty) {
val joinPlan = buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput)
if (joinPlan.isDefined) {
val newJoinPlan = joinPlan.get
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
case Some(newJoinPlan) =>
// Check if it's the first plan for the item set, or it's a better plan than
// the existing one due to lower cost.
val existingPlan = nextLevel.get(newJoinPlan.itemIds)
if (existingPlan.isEmpty || newJoinPlan.betterThan(existingPlan.get, conf)) {
nextLevel.update(newJoinPlan.itemIds, newJoinPlan)
}
}
case None =>
}
}
}
......@@ -203,7 +209,17 @@ object JoinReorderDP extends PredicateHelper {
nextLevel.toMap
}
/** Build a new join node. */
/**
* Builds a new JoinPlan when both conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from both sides.
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
* @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
*/
private def buildJoin(
oneJoinPlan: JoinPlan,
otherJoinPlan: JoinPlan,
......@@ -211,6 +227,11 @@ object JoinReorderDP extends PredicateHelper {
conditions: Set[Expression],
topOutput: AttributeSet): Option[JoinPlan] = {
if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
// Should not join two overlapping item sets.
return None
}
val onePlan = oneJoinPlan.plan
val otherPlan = otherJoinPlan.plan
val joinConds = conditions
......@@ -220,33 +241,33 @@ object JoinReorderDP extends PredicateHelper {
if (joinConds.isEmpty) {
// Cartesian product is very expensive, so we exclude them from candidate plans.
// This also significantly reduces the search space.
None
return None
}
// Put the deeper side on the left, tend to build a left-deep tree.
val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
(onePlan, otherPlan)
} else {
// Put the deeper side on the left, tend to build a left-deep tree.
val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
(onePlan, otherPlan)
(otherPlan, onePlan)
}
val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
val remainingConds = conditions -- collectedJoinConds
val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
val newPlan =
if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
Project(neededFromNewJoin.toSeq, newJoin)
} else {
(otherPlan, onePlan)
newJoin
}
val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
val remainingConds = conditions -- collectedJoinConds
val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
val newPlan =
if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
Project(neededFromNewJoin.toSeq, newJoin)
} else {
newJoin
}
val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
// Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
// item), so the cost of the new join should also include its own cost.
val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
}
val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
// Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
// item), so the cost of the new join should also include its own cost.
val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
}
/** Map[set of item ids, join plan for these items] */
......@@ -278,10 +299,10 @@ object JoinReorderDP extends PredicateHelper {
}
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
if (other.planCost.rows == 0 || other.planCost.size == 0) {
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
val relativeRows = BigDecimal(this.planCost.rows) / BigDecimal(other.planCost.rows)
val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
relativeSize * (1 - conf.joinReorderCardWeight) < 1
......@@ -290,7 +311,11 @@ object JoinReorderDP extends PredicateHelper {
}
}
/** This class defines the cost model. */
case class Cost(rows: BigInt, size: BigInt) {
def +(other: Cost): Cost = Cost(this.rows + other.rows, this.size + other.size)
/**
* This class defines the cost model for a plan.
* @param card Cardinality (number of rows).
* @param size Size in bytes.
*/
case class Cost(card: BigInt, size: BigInt) {
def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}
......@@ -708,6 +708,7 @@ object SQLConf {
buildConf("spark.sql.cbo.joinReorder.dp.threshold")
.doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
.intConf
.checkValue(number => number > 0, "The maximum number must be a positive integer.")
.createWithDefault(12)
val JOIN_REORDER_CARD_WEIGHT =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment