Skip to content
Snippets Groups Projects
Commit 76e4a556 authored by wangzhenhua's avatar wangzhenhua Committed by Wenchen Fan
Browse files

[SPARK-20678][SQL] Ndv for columns not in filter condition should also be updated

## What changes were proposed in this pull request?

In filter estimation, we update column stats for those columns in filter condition. However, if the number of rows decreases after the filter (i.e. the overall selectivity is less than 1), we need to update (scale down) the number of distinct values (NDV) for all columns, no matter they are in filter conditions or not.

This pr also fixes the inconsistency of rounding mode for ndv and rowCount.

## How was this patch tested?

Added new tests.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17918 from wzhfy/scaleDownNdvAfterFilter.
parent 789bdbe3
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,18 @@ object EstimationUtils {
avgLen = dataType.defaultSize, maxLen = dataType.defaultSize)
}
/**
* Updates (scales down) the number of distinct values if the number of rows decreases after
* some operation (such as filter, join). Otherwise keep it unchanged.
*/
def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): BigInt = {
if (newNumRows < oldNumRows) {
ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / BigDecimal(oldNumRows))
} else {
oldNdv
}
}
def ceil(bigDecimal: BigDecimal): BigInt = bigDecimal.setScale(0, RoundingMode.CEILING).toBigInt()
/** Get column stats for output attributes. */
......
......@@ -19,12 +19,12 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.math.BigDecimal.RoundingMode
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
......@@ -32,14 +32,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
private val childStats = plan.child.stats(catalystConf)
/**
* We will update the corresponding ColumnStats for a column after we apply a predicate condition.
* For example, column c has [min, max] value as [0, 100]. In a range condition such as
* (c > 40 AND c <= 50), we need to set the column's [min, max] value to [40, 100] after we
* evaluate the first condition c > 40. We need to set the column's [min, max] value to [40, 50]
* after we evaluate the second condition c <= 50.
*/
private val colStatsMap = new ColumnStatsMap
private val colStatsMap = new ColumnStatsMap(childStats.attributeStats)
/**
* Returns an option of Statistics for a Filter logical plan node.
......@@ -53,24 +46,19 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
def estimate: Option[Statistics] = {
if (childStats.rowCount.isEmpty) return None
// Save a mutable copy of colStats so that we can later change it recursively.
colStatsMap.setInitValues(childStats.attributeStats)
// Estimate selectivity of this filter predicate, and update column stats if needed.
// For not-supported condition, set filter selectivity to a conservative estimate 100%
val filterSelectivity: Double = calculateFilterSelectivity(plan.condition).getOrElse(1.0)
val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1.0))
val newColStats = if (filterSelectivity == 0) {
val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
val newColStats = if (filteredRowCount == 0) {
// The output is empty, we don't need to keep column stats.
AttributeMap[ColumnStat](Nil)
} else {
colStatsMap.toColumnStats
colStatsMap.outputColumnStats(rowsBeforeFilter = childStats.rowCount.get,
rowsAfterFilter = filteredRowCount)
}
val filteredRowCount: BigInt =
EstimationUtils.ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
val filteredSizeInBytes: BigInt =
EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats)
val filteredSizeInBytes: BigInt = getOutputSize(plan.output, filteredRowCount, newColStats)
Some(childStats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount),
attributeStats = newColStats))
......@@ -92,16 +80,17 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = {
def calculateFilterSelectivity(condition: Expression, update: Boolean = true)
: Option[BigDecimal] = {
condition match {
case And(cond1, cond2) =>
val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(1.0)
val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(1.0)
val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1.0))
val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1.0))
Some(percent1 * percent2)
case Or(cond1, cond2) =>
val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(1.0)
val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0)
val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1.0))
val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1.0))
Some(percent1 + percent2 - (percent1 * percent2))
// Not-operator pushdown
......@@ -143,7 +132,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
def calculateSingleCondition(condition: Expression, update: Boolean): Option[Double] = {
def calculateSingleCondition(condition: Expression, update: Boolean): Option[BigDecimal] = {
condition match {
case l: Literal =>
evaluateLiteral(l)
......@@ -237,7 +226,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
def evaluateNullCheck(
attr: Attribute,
isNull: Boolean,
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
if (!colStatsMap.contains(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
......@@ -256,7 +245,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
} else {
colStat.copy(nullCount = 0)
}
colStatsMap(attr) = newStats
colStatsMap.update(attr, newStats)
}
val percent = if (isNull) {
......@@ -265,7 +254,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
1.0 - nullPercent
}
Some(percent.toDouble)
Some(percent)
}
/**
......@@ -283,7 +272,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
op: BinaryComparison,
attr: Attribute,
literal: Literal,
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
if (!colStatsMap.contains(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
......@@ -317,7 +306,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
def evaluateEquality(
attr: Attribute,
literal: Literal,
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
if (!colStatsMap.contains(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
......@@ -341,10 +330,10 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
colStat.copy(distinctCount = 1, min = Some(literal.value),
max = Some(literal.value), nullCount = 0)
}
colStatsMap(attr) = newStats
colStatsMap.update(attr, newStats)
}
Some((1.0 / BigDecimal(ndv)).toDouble)
Some(1.0 / BigDecimal(ndv))
} else {
Some(0.0)
}
......@@ -361,7 +350,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition
*/
def evaluateLiteral(literal: Literal): Option[Double] = {
def evaluateLiteral(literal: Literal): Option[BigDecimal] = {
literal match {
case Literal(null, _) => Some(0.0)
case FalseLiteral => Some(0.0)
......@@ -386,7 +375,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
def evaluateInSet(
attr: Attribute,
hSet: Set[Any],
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
if (!colStatsMap.contains(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
......@@ -417,7 +406,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
if (update) {
val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
max = Some(newMax), nullCount = 0)
colStatsMap(attr) = newStats
colStatsMap.update(attr, newStats)
}
// We assume the whole set since there is no min/max information for String/Binary type
......@@ -425,13 +414,13 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
newNdv = ndv.min(BigInt(hSet.size))
if (update) {
val newStats = colStat.copy(distinctCount = newNdv, nullCount = 0)
colStatsMap(attr) = newStats
colStatsMap.update(attr, newStats)
}
}
// return the filter selectivity. Without advanced statistics such as histograms,
// we have to assume uniform distribution.
Some(math.min(1.0, (BigDecimal(newNdv) / BigDecimal(ndv)).toDouble))
Some((BigDecimal(newNdv) / BigDecimal(ndv)).min(1.0))
}
/**
......@@ -449,7 +438,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
op: BinaryComparison,
attr: Attribute,
literal: Literal,
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
val colStat = colStatsMap(attr)
val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
......@@ -518,7 +507,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val newValue = Some(literal.value)
var newMax = colStat.max
var newMin = colStat.min
var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
var newNdv = ceil(ndv * percent)
if (newNdv < 1) newNdv = 1
op match {
......@@ -532,11 +521,11 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val newStats =
colStat.copy(distinctCount = newNdv, min = newMin, max = newMax, nullCount = 0)
colStatsMap(attr) = newStats
colStatsMap.update(attr, newStats)
}
}
Some(percent.toDouble)
Some(percent)
}
/**
......@@ -557,7 +546,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
op: BinaryComparison,
attrLeft: Attribute,
attrRight: Attribute,
update: Boolean): Option[Double] = {
update: Boolean): Option[BigDecimal] = {
if (!colStatsMap.contains(attrLeft)) {
logDebug("[CBO] No statistics for " + attrLeft)
......@@ -654,10 +643,10 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
// Need to adjust new min/max after the filter condition is applied
val ndvLeft = BigDecimal(colStatLeft.distinctCount)
var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
var newNdvLeft = ceil(ndvLeft * percent)
if (newNdvLeft < 1) newNdvLeft = 1
val ndvRight = BigDecimal(colStatRight.distinctCount)
var newNdvRight = (ndvRight * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
var newNdvRight = ceil(ndvRight * percent)
if (newNdvRight < 1) newNdvRight = 1
var newMaxLeft = colStatLeft.max
......@@ -750,24 +739,57 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
}
}
Some(percent.toDouble)
Some(percent)
}
}
class ColumnStatsMap {
private val baseMap: mutable.Map[ExprId, (Attribute, ColumnStat)] = mutable.HashMap.empty
/**
* This class contains the original column stats from child, and maintains the updated column stats.
* We will update the corresponding ColumnStats for a column after we apply a predicate condition.
* For example, column c has [min, max] value as [0, 100]. In a range condition such as
* (c > 40 AND c <= 50), we need to set the column's [min, max] value to [40, 100] after we
* evaluate the first condition c > 40. We also need to set the column's [min, max] value to
* [40, 50] after we evaluate the second condition c <= 50.
*
* @param originalMap Original column stats from child.
*/
case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
def setInitValues(colStats: AttributeMap[ColumnStat]): Unit = {
baseMap.clear()
baseMap ++= colStats.baseMap
}
/** This map maintains the latest column stats. */
private val updatedMap: mutable.Map[ExprId, (Attribute, ColumnStat)] = mutable.HashMap.empty
def contains(a: Attribute): Boolean = baseMap.contains(a.exprId)
def contains(a: Attribute): Boolean = updatedMap.contains(a.exprId) || originalMap.contains(a)
def apply(a: Attribute): ColumnStat = baseMap(a.exprId)._2
/**
* Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
* originalMap, because updatedMap has the latest (updated) column stats.
*/
def apply(a: Attribute): ColumnStat = {
if (updatedMap.contains(a.exprId)) {
updatedMap(a.exprId)._2
} else {
originalMap(a)
}
}
def update(a: Attribute, stats: ColumnStat): Unit = baseMap.update(a.exprId, a -> stats)
/** Updates column stats in updatedMap. */
def update(a: Attribute, stats: ColumnStat): Unit = updatedMap.update(a.exprId, a -> stats)
def toColumnStats: AttributeMap[ColumnStat] = AttributeMap(baseMap.values.toSeq)
/**
* Collects updated column stats, and scales down ndv for other column stats if the number of rows
* decreases after this Filter operator.
*/
def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
: AttributeMap[ColumnStat] = {
val newColumnStats = originalMap.map { case (attr, oriColStat) =>
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
attr -> colStat.copy(distinctCount = newNdv)
}
AttributeMap(newColumnStats.toSeq)
}
}
......@@ -217,32 +217,17 @@ case class InnerOuterEstimation(conf: SQLConf, join: Join) extends Logging {
if (joinKeyStats.contains(a)) {
outputAttrStats += a -> joinKeyStats(a)
} else {
val leftRatio = if (leftRows != 0) {
BigDecimal(outputRows) / BigDecimal(leftRows)
} else {
BigDecimal(0)
}
val rightRatio = if (rightRows != 0) {
BigDecimal(outputRows) / BigDecimal(rightRows)
} else {
BigDecimal(0)
}
val oldColStat = oldAttrStats(a)
val oldNdv = oldColStat.distinctCount
// We only change (scale down) the number of distinct values if the number of rows
// decreases after join, because join won't produce new values even if the number of
// rows increases.
val newNdv = if (join.left.outputSet.contains(a) && leftRatio < 1) {
ceil(BigDecimal(oldNdv) * leftRatio)
} else if (join.right.outputSet.contains(a) && rightRatio < 1) {
ceil(BigDecimal(oldNdv) * rightRatio)
val newNdv = if (join.left.outputSet.contains(a)) {
updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv)
} else {
oldNdv
updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv)
}
val newColStat = oldColStat.copy(distinctCount = newNdv)
// TODO: support nullCount updates for specific outer joins
outputAttrStats += a -> oldColStat.copy(distinctCount = newNdv)
outputAttrStats += a -> newColStat
}
}
outputAttrStats
}
......
......@@ -150,7 +150,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(LessThan(attrInt, Literal(3)), Literal(null, IntegerType))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt),
Seq(attrInt -> colStatInt.copy(distinctCount = 3)),
expectedRowCount = 3)
}
......@@ -158,7 +158,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt),
Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
expectedRowCount = 8)
}
......@@ -174,7 +174,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrInt, Literal(3)), Not(Literal(null, IntegerType))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt),
Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
expectedRowCount = 8)
}
......@@ -205,7 +205,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint < 3") {
validateEstimatedStats(
Filter(LessThan(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(3),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 3)
}
......@@ -221,7 +221,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint <= 3") {
validateEstimatedStats(
Filter(LessThanOrEqual(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(3),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 3)
}
......@@ -229,7 +229,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint > 6") {
validateEstimatedStats(
Filter(GreaterThan(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(6), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 5)
}
......@@ -245,7 +245,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint >= 6") {
validateEstimatedStats(
Filter(GreaterThanOrEqual(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(6), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 5)
}
......@@ -279,7 +279,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(3), max = Some(6),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 4)
}
......@@ -288,8 +288,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(EqualTo(attrInt, Literal(3)), EqualTo(attrInt, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
Seq(attrInt -> colStatInt.copy(distinctCount = 2)),
expectedRowCount = 2)
}
......@@ -297,7 +296,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt),
Seq(attrInt -> colStatInt.copy(distinctCount = 6)),
expectedRowCount = 6)
}
......@@ -305,7 +304,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(Or(LessThanOrEqual(attrInt, Literal(3)), GreaterThan(attrInt, Literal(6))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> colStatInt),
Seq(attrInt -> colStatInt.copy(distinctCount = 5)),
expectedRowCount = 5)
}
......@@ -321,7 +320,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(Or(EqualTo(attrInt, Literal(3)), LessThan(attrString, Literal("A8"))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt, attrString), 10L)),
Seq(attrInt -> colStatInt, attrString -> colStatString),
Seq(attrInt -> colStatInt.copy(distinctCount = 9),
attrString -> colStatString.copy(distinctCount = 9)),
expectedRowCount = 9)
}
......@@ -336,8 +336,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint NOT IN (3, 4, 5)") {
validateEstimatedStats(
Filter(Not(InSet(attrInt, Set(3, 4, 5))), childStatsTestPlan(Seq(attrInt), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
Seq(attrInt -> colStatInt.copy(distinctCount = 7)),
expectedRowCount = 7)
}
......@@ -380,7 +379,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(LessThan(attrDate, Literal(d20170103, DateType)),
childStatsTestPlan(Seq(attrDate), 10L)),
Seq(attrDate -> ColumnStat(distinctCount = 2, min = Some(dMin), max = Some(d20170103),
Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(dMin), max = Some(d20170103),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 3)
}
......@@ -421,7 +420,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cdouble < 3.0") {
validateEstimatedStats(
Filter(LessThan(attrDouble, Literal(3.0)), childStatsTestPlan(Seq(attrDouble), 10L)),
Seq(attrDouble -> ColumnStat(distinctCount = 2, min = Some(1.0), max = Some(3.0),
Seq(attrDouble -> ColumnStat(distinctCount = 3, min = Some(1.0), max = Some(3.0),
nullCount = 0, avgLen = 8, maxLen = 8)),
expectedRowCount = 3)
}
......@@ -487,9 +486,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(EqualTo(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4),
attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10),
attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 4)
}
......@@ -498,9 +497,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(GreaterThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4),
attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(10),
attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 4)
}
......@@ -509,9 +508,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(LessThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4),
attrInt2 -> ColumnStat(distinctCount = 3, min = Some(7), max = Some(16),
attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(16),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 4)
}
......@@ -531,9 +530,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(LessThan(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)),
Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10),
Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4),
attrInt4 -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(10),
attrInt4 -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4)),
expectedRowCount = 4)
}
......@@ -565,6 +564,20 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
expectedRowCount = 0)
}
test("update ndv for columns based on overall selectivity") {
// filter condition: cint > 3 AND cint4 <= 6
val condition = And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt4, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt, attrInt4, attrString), 10L)),
Seq(
attrInt -> ColumnStat(distinctCount = 5, min = Some(3), max = Some(10),
nullCount = 0, avgLen = 4, maxLen = 4),
attrInt4 -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(6),
nullCount = 0, avgLen = 4, maxLen = 4),
attrString -> colStatString.copy(distinctCount = 5)),
expectedRowCount = 5)
}
private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: BigInt): StatsTestPlan = {
StatsTestPlan(
outputList = outList,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment