diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 2b98aacdd7264f64f541a43ca3e5c920b39ef6fd..abba8668216f4613f6221fb2ff734e350389b830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis._ -private[spark] trait CatalystConf { +/** + * Interface for configuration options used in the catalyst module. + */ +trait CatalystConf { def caseSensitiveAnalysis: Boolean def orderByOrdinal: Boolean def groupByOrdinal: Boolean + def optimizerMaxIterations: Int + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. */ def resolver: Resolver = { - if (caseSensitiveAnalysis) { - caseSensitiveResolution - } else { - caseInsensitiveResolution - } + if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution } } -/** - * A trivial conf that is empty. Used for testing when all - * relations are already filled in and the analyser needs only to resolve attribute references. - */ -object EmptyConf extends CatalystConf { - override def caseSensitiveAnalysis: Boolean = { - throw new UnsupportedOperationException - } - override def orderByOrdinal: Boolean = { - throw new UnsupportedOperationException - } - override def groupByOrdinal: Boolean = { - throw new UnsupportedOperationException - } -} /** A CatalystConf that can be used for local testing. */ case class SimpleCatalystConf( caseSensitiveAnalysis: Boolean, orderByOrdinal: Boolean = true, - groupByOrdinal: Boolean = true) - + groupByOrdinal: Boolean = true, + optimizerMaxIterations: Int = 100) extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index de40ddde1bdd970b17a165ceb741720db4a3abed..37ff6ab6f627db5b4f4b63b9b4c9169cdb3c3ee8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -39,14 +39,13 @@ import org.apache.spark.sql.types._ * Used for testing when all relations are already filled in and the analyzer needs only * to resolve attribute references. */ -object SimpleAnalyzer - extends SimpleAnalyzer( - EmptyFunctionRegistry, +object SimpleAnalyzer extends Analyzer( + new SessionCatalog( + new InMemoryCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)), new SimpleCatalystConf(caseSensitiveAnalysis = true)) -class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf) - extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf) - /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and * [[UnresolvedRelation]]s into fully typed objects using information in a @@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf) class Analyzer( catalog: SessionCatalog, conf: CatalystConf, - maxIterations: Int = 100) + maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis { + def this(catalog: SessionCatalog, conf: CatalystConf) = { + this(catalog, conf, conf.optimizerMaxIterations) + } + def resolver: Resolver = { if (conf.caseSensitiveAnalysis) { caseSensitiveResolution @@ -66,7 +69,7 @@ class Analyzer( } } - val fixedPoint = FixedPoint(maxIterations) + protected val fixedPoint = FixedPoint(maxIterations) /** * Override to provide additional rules for the "Resolution" batch. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bb68ef826f4800fb27b5f9e7fb94887b9e9f6814..6c8f8f40dd92eb6ef9d602531ba5c47eb15c06e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf} +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -36,9 +36,11 @@ import org.apache.spark.sql.types._ * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ -abstract class Optimizer( - conf: CatalystConf, - sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { +abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) + extends RuleExecutor[LogicalPlan] { + + protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) + def batches: Seq[Batch] = { // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). @@ -59,12 +61,12 @@ abstract class Optimizer( // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: - Batch("Replace Operators", FixedPoint(100), + Batch("Replace Operators", fixedPoint, ReplaceIntersectWithSemiJoin, ReplaceDistinctWithAggregate) :: - Batch("Aggregate", FixedPoint(100), + Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions) :: - Batch("Operator Optimizations", FixedPoint(100), + Batch("Operator Optimizations", fixedPoint, // Operator push down SetOperationPushDown, SamplePushDown, @@ -95,11 +97,11 @@ abstract class Optimizer( SimplifyCasts, SimplifyCaseConversionExpressions, EliminateSerialization) :: - Batch("Decimal Optimizations", FixedPoint(100), + Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :: - Batch("Typed Filter Optimization", FixedPoint(100), + Batch("Typed Filter Optimization", fixedPoint, EmbedSerializerInFilter) :: - Batch("LocalRelation", FixedPoint(100), + Batch("LocalRelation", fixedPoint, ConvertToLocalRelation) :: Batch("Subquery", Once, OptimizeSubqueries) :: Nil @@ -117,15 +119,19 @@ abstract class Optimizer( } /** - * Non-abstract representation of the standard Spark optimizing strategies + * An optimizer used in test code. * * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while * specific rules go to the subclasses */ -object DefaultOptimizer - extends Optimizer( - EmptyConf, - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) +object SimpleTestOptimizer extends SimpleTestOptimizer + +class SimpleTestOptimizer extends Optimizer( + new SessionCatalog( + new InMemoryCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)), + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Pushes operations down into a Sample. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index cf26d4843d84f6a9e71747fddff8645d4d4f3db6..faa90fb1c5e35799c76243ed9b03b138855a7914 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils @@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index 27195d3458b8e700d84faa0fb796e6a5cc6f3107..452792d21c2042e628eff97f5651bed2f0d1b4be 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection -import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer +import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} import org.apache.spark.sql.types._ @@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { expression: Expression, inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) - val optimizedPlan = DefaultOptimizer.execute(plan) + val optimizedPlan = SimpleTestOptimizer.execute(plan) checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala index 55af6c5d6ae937117746768cc857816b77e43d40..7112c033eabcea313201da3835e422c2435e4ccb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala @@ -15,12 +15,9 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst +package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} -import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -40,10 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite { * This class represents a dummy extended optimizer that takes the batches of the * Optimizer and adds custom ones. */ - class ExtendedOptimizer - extends Optimizer( - EmptyConf, - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) { + class ExtendedOptimizer extends SimpleTestOptimizer { // rules set to DummyRule, would not be executed anyways val myBatches: Seq[Batch] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 8dfbba779dbd58606a9e30c88006af2e7c38602d..08b2d7fcd4882924aa4b25d8b057457f07c96bf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -18,14 +18,16 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.internal.SQLConf class SparkOptimizer( - conf: CatalystConf, - sessionCatalog: SessionCatalog, - experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) { + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends Optimizer(catalog, conf) { + override def batches: Seq[Batch] = super.batches :+ Batch( - "User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*) + "User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 20d9a285483f02a1577a4010894ef83fdb8c9af6..e58b7178e95b207fe29e41caef62e95245672607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -51,6 +51,11 @@ object SQLConf { } + val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") + .doc("The max number of iterations the optimizer and analyzer runs") + .intConf + .createWithDefault(100) + val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts") .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " + "When set to false, only one SQLContext/HiveContext is allowed to be created " + @@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { /** ************************ Spark SQL Params/Hints ******************* */ + def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) + def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 10497e4fdfcbc8f7cbb85817d45115bcf15fc343..c30f879dedaa17b6c5dce66545b2992d102d21b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Logical query plan optimizer. */ - lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods) + lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) /** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.