diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index 3a7543e2141e999ca1f2fdbc277f8b313cafe130..db7baf6e9bc7d3a6891be44242488d9ab4baa37a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -32,7 +32,10 @@ import org.apache.spark.sql.internal.SQLConf * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper { + + private def conf = SQLConf.get + def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan @@ -379,7 +382,7 @@ object JoinReorderDPFilters extends PredicateHelper { if (conf.joinReorderDPStarFilter) { // Compute the tables in a star-schema relationship. - val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) + val starJoin = StarSchemaDetection.findStarJoins(items, conditions.toSeq) val nonStarJoin = items.filterNot(starJoin.contains(_)) if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 946fa7bae0199554bd57791f770499967703c0dc..d82af94dbffb7e9b8c089112f8d6d9f1a4ff13ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._ * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ -abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) +abstract class Optimizer(sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { - protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) + protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) def batches: Seq[Batch] = { Batch("Eliminate Distinct", Once, EliminateDistinct) :: @@ -77,11 +77,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) Batch("Operator Optimizations", fixedPoint, Seq( // Operator push down PushProjectionThroughUnion, - ReorderJoin(conf), + ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, - LimitPushDown(conf), + LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine @@ -92,10 +92,10 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineLimits, CombineUnions, // Constant folding and strength reduction - NullPropagation(conf), + NullPropagation, ConstantPropagation, FoldablePropagation, - OptimizeIn(conf), + OptimizeIn, ConstantFolding, ReorderAssociativeOperator, LikeSimplification, @@ -117,11 +117,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineConcats) ++ extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, - CheckCartesianProducts(conf)) :: + CheckCartesianProducts) :: Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: + CostBasedJoinReorder) :: Batch("Decimal Optimizations", fixedPoint, - DecimalAggregates(conf)) :: + DecimalAggregates) :: Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :: @@ -129,7 +129,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) ConvertToLocalRelation, PropagateEmptyRelation) :: Batch("OptimizeCodegen", Once, - OptimizeCodegen(conf)) :: + OptimizeCodegen) :: Batch("RewriteSubquery", Once, RewritePredicateSubquery, CollapseProject) :: Nil @@ -178,8 +178,7 @@ class SimpleTestOptimizer extends Optimizer( new SessionCatalog( new InMemoryCatalog, EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)), - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))) /** * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change @@ -288,7 +287,7 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ -case class LimitPushDown(conf: SQLConf) extends Rule[LogicalPlan] { +object LimitPushDown extends Rule[LogicalPlan] { private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = { plan match { @@ -1077,8 +1076,7 @@ object CombineLimits extends Rule[LogicalPlan] { * the join between R and S is not a cartesian product and therefore should be allowed. * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule. */ -case class CheckCartesianProducts(conf: SQLConf) - extends Rule[LogicalPlan] with PredicateHelper { +object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { /** * Check if a join is a cartesian product. Returns true if * there are no join conditions involving references from both left and right. @@ -1090,7 +1088,7 @@ case class CheckCartesianProducts(conf: SQLConf) } def apply(plan: LogicalPlan): LogicalPlan = - if (conf.crossJoinEnabled) { + if (SQLConf.get.crossJoinEnabled) { plan } else plan transform { case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, condition) @@ -1112,7 +1110,7 @@ case class CheckCartesianProducts(conf: SQLConf) * This uses the same rules for increasing the precision and scale of the output as * [[org.apache.spark.sql.catalyst.analysis.DecimalPrecision]]. */ -case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { +object DecimalAggregates extends Rule[LogicalPlan] { import Decimal.MAX_LONG_DIGITS /** Maximum number of decimal digits representable precisely in a Double */ @@ -1130,7 +1128,7 @@ case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { we.copy(windowFunction = ae.copy(aggregateFunction = Average(UnscaledValue(e)))) Cast( Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)), - DecimalType(prec + 4, scale + 4), Option(conf.sessionLocalTimeZone)) + DecimalType(prec + 4, scale + 4), Option(SQLConf.get.sessionLocalTimeZone)) case _ => we } @@ -1142,7 +1140,7 @@ case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { val newAggExpr = ae.copy(aggregateFunction = Average(UnscaledValue(e))) Cast( Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)), - DecimalType(prec + 4, scale + 4), Option(conf.sessionLocalTimeZone)) + DecimalType(prec + 4, scale + 4), Option(SQLConf.get.sessionLocalTimeZone)) case _ => ae } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index ca729127e7d1d3c093a849ba7079fba216e27c73..1f20b7661489e453ae07e85c5886aff8b51f383f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -28,7 +28,9 @@ import org.apache.spark.sql.internal.SQLConf /** * Encapsulates star-schema detection logic. */ -case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { +object StarSchemaDetection extends PredicateHelper { + + private def conf = SQLConf.get /** * Star schema consists of one or more fact tables referencing a number of dimension diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 66b8ca62e5e4c147ef814bc55f3be828fbcfb0fc..6c83f4790004fd9c058064b89942adfa00795c58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -173,12 +173,12 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 2. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ -case class OptimizeIn(conf: SQLConf) extends Rule[LogicalPlan] { +object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq - if (newList.size > conf.optimizerInSetConversionThreshold) { + if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } else if (newList.size < list.size) { @@ -414,7 +414,7 @@ object LikeSimplification extends Rule[LogicalPlan] { * equivalent [[Literal]] values. This rule is more specific with * Null value propagation from bottom to top of the expression tree. */ -case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { +object NullPropagation extends Rule[LogicalPlan] { private def isNullLiteral(e: Expression): Boolean = e match { case Literal(null, _) => true case _ => false @@ -423,9 +423,9 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case e @ WindowExpression(Cast(Literal(0L, _), _, _), _) => - Cast(Literal(0L), e.dataType, Option(conf.sessionLocalTimeZone)) + Cast(Literal(0L), e.dataType, Option(SQLConf.get.sessionLocalTimeZone)) case e @ AggregateExpression(Count(exprs), _, _, _) if exprs.forall(isNullLiteral) => - Cast(Literal(0L), e.dataType, Option(conf.sessionLocalTimeZone)) + Cast(Literal(0L), e.dataType, Option(SQLConf.get.sessionLocalTimeZone)) case ae @ AggregateExpression(Count(exprs), _, false, _) if !exprs.exists(_.nullable) => // This rule should be only triggered when isDistinct field is false. ae.copy(aggregateFunction = Count(Literal(1))) @@ -552,14 +552,14 @@ object FoldablePropagation extends Rule[LogicalPlan] { /** * Optimizes expressions by replacing according to CodeGen configuration. */ -case class OptimizeCodegen(conf: SQLConf) extends Rule[LogicalPlan] { +object OptimizeCodegen extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case e: CaseWhen if canCodegen(e) => e.toCodegen() } private def canCodegen(e: CaseWhen): Boolean = { val numBranches = e.branches.size + e.elseValue.size - numBranches <= conf.maxCaseBranchesForCodegen + numBranches <= SQLConf.get.maxCaseBranchesForCodegen } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index bb97e2c808b9fe20d52ddcfea0617c5a6531affb..edbeaf273fd6f6167d60305466a76e3a969efddb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.internal.SQLConf * * If star schema detection is enabled, reorder the star join plans based on heuristics. */ -case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Join a list of plans together and push down the conditions into them. * @@ -87,8 +87,8 @@ case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHe def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => - if (conf.starSchemaDetection && !conf.cboEnabled) { - val starJoinPlan = StarSchemaDetection(conf).reorderStarJoins(input, conditions) + if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { + val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) createOrderedJoin(starJoinPlan ++ rest, conditions) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala index 2a04bd588dc1dee913f570589bb447802d30a670..a313681eeb8f010386f132e413cf42b39dbb6546 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala @@ -33,7 +33,7 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyBinaryComparison, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index c6345b60b744b202b8b98f2c5a319922993391b1..56399f4831a6f754096dacf7fab2de8b43dc6767 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -35,7 +35,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, PruneFilters) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index ac71887c16f96dfd34c8a9d9a41f353fa9f29d47..87ad81db11b64187930f3e9d1b5a239df22c0721 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -32,7 +32,7 @@ class CombiningLimitsSuite extends PlanTest { Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyConditionals) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 25c592b9c1ddeed126f306b9125d1cc3899be205..641c89873dcc4817e5db1f1677226d80517ed844 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -33,7 +33,7 @@ class ConstantFoldingSuite extends PlanTest { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("ConstantFolding", Once, - OptimizeIn(conf), + OptimizeIn, ConstantFolding, BooleanSimplification) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala index cc4fb3a244a982830a79f7c74828aaa390ee2b60..711294ed619284ca71d0e38bad873ec8c1f02a27 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala @@ -29,7 +29,7 @@ class DecimalAggregatesSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Decimal Optimizations", FixedPoint(100), - DecimalAggregates(conf)) :: Nil + DecimalAggregates) :: Nil } val testRelation = LocalRelation('a.decimal(2, 1), 'b.decimal(12, 1)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala index d4f37e2a5e8772f9544f7cd2fe2cd172ab874f4c..157472c2293f9686536e9b0123ee8e9eb52d89e0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala @@ -31,7 +31,7 @@ class EliminateMapObjectsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = { Batch("EliminateMapObjects", FixedPoint(50), - NullPropagation(conf), + NullPropagation, SimplifyCasts, EliminateMapObjects) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index a6584aa5fbba7850194d4543125ac993db10ec73..2f30a78f032114f6baebb941e941312b6d2ffd7e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -37,7 +37,7 @@ class JoinOptimizationSuite extends PlanTest { CombineFilters, PushDownPredicate, BooleanSimplification, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index 71db4e2e0ec4d9417c380f90b917feecef2180ad..2fb587d50a4cbb3520a673fd4dd0c7f2034bc1ea 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -24,25 +24,42 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, JOIN_REORDER_ENABLED} class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy(CBO_ENABLED -> true, JOIN_REORDER_ENABLED -> true) - object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: Nil + CostBasedJoinReorder) :: Nil + } + + var originalConfCBOEnabled = false + var originalConfJoinReorderEnabled = false + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfCBOEnabled = conf.cboEnabled + originalConfJoinReorderEnabled = conf.joinReorderEnabled + conf.setConf(CBO_ENABLED, true) + conf.setConf(JOIN_REORDER_ENABLED, true) + } + + override def afterAll(): Unit = { + try { + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + conf.setConf(JOIN_REORDER_ENABLED, originalConfJoinReorderEnabled) + } finally { + super.afterAll() + } } /** Set up tables and columns for testing */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index d8302dfc9462dc8ad4e375c4c26a30f7a9ed8eda..f50e2e86516f0cdbd1db307d6380585e8f0fca8c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -32,7 +32,7 @@ class LimitPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Limit pushdown", FixedPoint(100), - LimitPushDown(conf), + LimitPushDown, CombineLimits, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala index 9dc6738ba04b3fde2be5974d5038aa1f8c81dedb..b71067c0af3a183d24dfd35b36eef15f4da1360f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ class OptimizeCodegenSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("OptimizeCodegen", Once, OptimizeCodegen(conf)) :: Nil + val batches = Batch("OptimizeCodegen", Once, OptimizeCodegen) :: Nil } protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index d8937321ecb98438240db595a98a517dc823d616..6a77580b29a2181203e7da8076fadca88045c29d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -34,10 +34,10 @@ class OptimizeInSuite extends PlanTest { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("ConstantFolding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, - OptimizeIn(conf)) :: Nil + OptimizeIn) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -159,16 +159,20 @@ class OptimizeInSuite extends PlanTest { .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3)))) .analyze - val notOptimizedPlan = OptimizeIn(conf)(plan) - comparePlans(notOptimizedPlan, plan) + withSQLConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "10") { + val notOptimizedPlan = OptimizeIn(plan) + comparePlans(notOptimizedPlan, plan) + } // Reduce the threshold to turning into InSet. - val optimizedPlan = OptimizeIn(conf.copy(OPTIMIZER_INSET_CONVERSION_THRESHOLD -> 2))(plan) - optimizedPlan match { - case Filter(cond, _) - if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => - // pass - case _ => fail("Unexpected result for OptimizedIn") + withSQLConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "2") { + val optimizedPlan = OptimizeIn(plan) + optimizedPlan match { + case Filter(cond, _) + if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => + // pass + case _ => fail("Unexpected result for OptimizedIn") + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala index a23d6266b28402404210fb9e4a960712d130e51f..ada6e2a43ea0f266095441ead25c098ce9b50dd2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala @@ -24,28 +24,46 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy( - CBO_ENABLED -> true, - JOIN_REORDER_ENABLED -> true, - JOIN_REORDER_DP_STAR_FILTER -> true) - object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: - Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: Nil + Batch("Join Reorder", Once, + CostBasedJoinReorder) :: Nil + } + + var originalConfCBOEnabled = false + var originalConfJoinReorderEnabled = false + var originalConfJoinReorderDPStarFilter = false + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfCBOEnabled = conf.cboEnabled + originalConfJoinReorderEnabled = conf.joinReorderEnabled + originalConfJoinReorderDPStarFilter = conf.joinReorderDPStarFilter + conf.setConf(CBO_ENABLED, true) + conf.setConf(JOIN_REORDER_ENABLED, true) + conf.setConf(JOIN_REORDER_DP_STAR_FILTER, true) + } + + override def afterAll(): Unit = { + try { + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + conf.setConf(JOIN_REORDER_ENABLED, originalConfJoinReorderEnabled) + conf.setConf(JOIN_REORDER_DP_STAR_FILTER, originalConfJoinReorderDPStarFilter) + } finally { + super.afterAll() + } } private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala index 605c01b7220d1c58053d90099afcb83f9277f3d0..777c5637201ed71c16fe5ae41fe500ecf37f4e52 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala @@ -24,19 +24,36 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, STARSCHEMA_DETECTION} +import org.apache.spark.sql.internal.SQLConf._ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, STARSCHEMA_DETECTION -> true) + var originalConfStarSchemaDetection = false + var originalConfCBOEnabled = true + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfStarSchemaDetection = conf.starSchemaDetection + originalConfCBOEnabled = conf.cboEnabled + conf.setConf(STARSCHEMA_DETECTION, true) + conf.setConf(CBO_ENABLED, false) + } + + override def afterAll(): Unit = { + try { + conf.setConf(STARSCHEMA_DETECTION, originalConfStarSchemaDetection) + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + } finally { + super.afterAll() + } + } object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala index 0a18858350e1fd58b53b65ad966498c7608d840b..3634accf1ec21d605ee3b077c40b9bc999fbe1d8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala @@ -37,7 +37,7 @@ class ComplexTypesSuite extends PlanTest{ Batch("collapse projections", FixedPoint(10), CollapseProject) :: Batch("Constant Folding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyConditionals, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index e9679d3361509a17b2d4f09879fb114aa40f08b9..5389bf3389da41953b18d1d5565924fed4af1e26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.internal.SQLConf */ trait PlanTest extends SparkFunSuite with PredicateHelper { - // TODO(gatorsmile): remove this from PlanTest and all the analyzer/optimizer rules - protected val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true) + // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules + protected def conf = SQLConf.get /** * Since attribute references are given globally unique ids during analysis, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 3c046ce49428599c01590c4ec2d0e633860fb1d0..5cfad9126986b22037e0705bbf9c4cb391d58954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -38,12 +38,10 @@ import org.apache.spark.sql.internal.SQLConf * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. */ -case class OptimizeMetadataOnlyQuery( - catalog: SessionCatalog, - conf: SQLConf) extends Rule[LogicalPlan] { +case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { - if (!conf.optimizerMetadataOnly) { + if (!SQLConf.get.optimizerMetadataOnly) { return plan } @@ -106,7 +104,7 @@ case class OptimizeMetadataOnlyQuery( val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(conf.sessionLocalTimeZone) + .getOrElse(SQLConf.get.sessionLocalTimeZone) val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 1de4f508b89a00dcc3a22c6ed37c9019e37b3329..00ff4c8ac310b3b25dd7fe46c1f0c949a774b302 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -22,16 +22,14 @@ import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate -import org.apache.spark.sql.internal.SQLConf class SparkOptimizer( catalog: SessionCatalog, - conf: SQLConf, experimentalMethods: ExperimentalMethods) - extends Optimizer(catalog, conf) { + extends Optimizer(catalog) { override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ - Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+ + Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ postHocOptimizationBatches :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 9d0148117fadfb3614f1b18011c3ee2b3fc08b50..72d0ddc62303a3a48fbe2493e578a4e6d74aaa75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -208,7 +208,7 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields. */ protected def optimizer: Optimizer = { - new SparkOptimizer(catalog, conf, experimentalMethods) { + new SparkOptimizer(catalog, experimentalMethods) { override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules }