Skip to content
Snippets Groups Projects
Commit 64f04154 authored by Edoardo Vacchi's avatar Edoardo Vacchi Committed by Michael Armbrust
Browse files

[SPARK-6981] [SQL] Factor out SparkPlanner and QueryExecution from SQLContext

Alternative to PR #6122; in this case the refactored out classes are replaced by inner classes with the same name for backwards binary compatibility

   * process in a lighter-weight, backwards-compatible way

Author: Edoardo Vacchi <uncommonnonsense@gmail.com>

Closes #6356 from evacchi/sqlctx-refactoring-lite.
parent 7e32387a
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
......@@ -114,7 +114,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
@DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable {
// Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
......
......@@ -38,6 +38,10 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
......@@ -188,9 +192,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executeSql(sql: String):
org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
protected[sql] def executePlan(plan: LogicalPlan) =
new sparkexecution.QueryExecution(this, plan)
@transient
protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
......@@ -781,77 +787,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}.toArray
}
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
val sqlContext: SQLContext = self
def codegenEnabled: Boolean = self.conf.codegenEnabled
def unsafeEnabled: Boolean = self.conf.unsafeEnabled
def numPartitions: Int = self.conf.numShufflePartitions
def strategies: Seq[Strategy] =
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrderedAndProject ::
HashAggregation ::
Aggregation ::
LeftSemiJoin ::
EquiJoinSelection ::
InMemoryScans ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
* Filter nodes added only when needed. For example, a Project operator is only used when the
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
// optimization with the current implementation would change the output schema.
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.
if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}
@deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0")
protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this)
@transient
protected[sql] val planner = new SparkPlanner
protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
@transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
......@@ -898,59 +838,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val conf: SQLConf = new SQLConf
}
/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected[sql] class QueryExecution(val logical: LogicalPlan) {
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(self)
planner.plan(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
@deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
protected[sql] class QueryExecution(logical: LogicalPlan)
extends sparkexecution.QueryExecution(this, logical)
/**
* Parses the data type in our internal string representation. The data type string should
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, optimizer}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
val analyzer = sqlContext.analyzer
val optimizer = sqlContext.optimizer
val planner = sqlContext.planner
val cacheManager = sqlContext.cacheManager
val prepareForExecution = sqlContext.prepareForExecution
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(sqlContext)
planner.plan(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
......@@ -37,7 +37,7 @@ private[sql] object SQLExecution {
* we can connect them with an execution.
*/
def withNewExecutionId[T](
sqlContext: SQLContext, queryExecution: SQLContext#QueryExecution)(body: => T): T = {
sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
val sc = sqlContext.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
if (oldExecutionId == null) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@Experimental
class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext
def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
def numPartitions: Int = sqlContext.conf.numShufflePartitions
def strategies: Seq[Strategy] =
sqlContext.experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrderedAndProject ::
HashAggregation ::
Aggregation ::
LeftSemiJoin ::
EquiJoinSelection ::
InMemoryScans ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
* Filter nodes added only when needed. For example, a Project operator is only used when the
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
// optimization with the current implementation would change the output schema.
// TODO: Decouple final output schema from expression evaluation so this copy can be
// avoided safely.
if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}
......@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
self: SparkPlanner =>
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment