From 64f04154e3078ec7340da97e3c2b07cf24e89098 Mon Sep 17 00:00:00 2001
From: Edoardo Vacchi <uncommonnonsense@gmail.com>
Date: Mon, 14 Sep 2015 14:56:04 -0700
Subject: [PATCH] [SPARK-6981] [SQL] Factor out SparkPlanner and QueryExecution
 from SQLContext

Alternative to PR #6122; in this case the refactored out classes are replaced by inner classes with the same name for backwards binary compatibility

   * process in a lighter-weight, backwards-compatible way

Author: Edoardo Vacchi <uncommonnonsense@gmail.com>

Closes #6356 from evacchi/sqlctx-refactoring-lite.
---
 .../org/apache/spark/sql/DataFrame.scala      |   4 +-
 .../org/apache/spark/sql/SQLContext.scala     | 138 ++----------------
 .../spark/sql/execution/QueryExecution.scala  |  85 +++++++++++
 .../spark/sql/execution/SQLExecution.scala    |   2 +-
 .../spark/sql/execution/SparkPlanner.scala    |  92 ++++++++++++
 .../spark/sql/execution/SparkStrategies.scala |   2 +-
 6 files changed, 195 insertions(+), 128 deletions(-)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 1a687b2374..3e61123c14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -114,7 +114,7 @@ private[sql] object DataFrame {
 @Experimental
 class DataFrame private[sql](
     @transient val sqlContext: SQLContext,
-    @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
+    @DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable {
 
   // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
   // you wrap it with `withNewExecutionId` if this actions doesn't call other action.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4e8414af50..e3fdd782e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -38,6 +38,10 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
+import org.apache.spark.sql.execution.{Filter, _}
+import org.apache.spark.sql.{execution => sparkexecution}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
@@ -188,9 +192,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
 
-  protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
+  protected[sql] def executeSql(sql: String):
+    org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
 
-  protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
+  protected[sql] def executePlan(plan: LogicalPlan) =
+    new sparkexecution.QueryExecution(this, plan)
 
   @transient
   protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
@@ -781,77 +787,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }.toArray
   }
 
-  protected[sql] class SparkPlanner extends SparkStrategies {
-    val sparkContext: SparkContext = self.sparkContext
-
-    val sqlContext: SQLContext = self
-
-    def codegenEnabled: Boolean = self.conf.codegenEnabled
-
-    def unsafeEnabled: Boolean = self.conf.unsafeEnabled
-
-    def numPartitions: Int = self.conf.numShufflePartitions
-
-    def strategies: Seq[Strategy] =
-      experimental.extraStrategies ++ (
-      DataSourceStrategy ::
-      DDLStrategy ::
-      TakeOrderedAndProject ::
-      HashAggregation ::
-      Aggregation ::
-      LeftSemiJoin ::
-      EquiJoinSelection ::
-      InMemoryScans ::
-      BasicOperators ::
-      CartesianProduct ::
-      BroadcastNestedLoopJoin :: Nil)
-
-    /**
-     * Used to build table scan operators where complex projection and filtering are done using
-     * separate physical operators.  This function returns the given scan operator with Project and
-     * Filter nodes added only when needed.  For example, a Project operator is only used when the
-     * final desired output requires complex expressions to be evaluated or when columns can be
-     * further eliminated out after filtering has been done.
-     *
-     * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
-     * away by the filter pushdown optimization.
-     *
-     * The required attributes for both filtering and expression evaluation are passed to the
-     * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
-     */
-    def pruneFilterProject(
-        projectList: Seq[NamedExpression],
-        filterPredicates: Seq[Expression],
-        prunePushedDownFilters: Seq[Expression] => Seq[Expression],
-        scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
-
-      val projectSet = AttributeSet(projectList.flatMap(_.references))
-      val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
-      val filterCondition =
-        prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
-
-      // Right now we still use a projection even if the only evaluation is applying an alias
-      // to a column.  Since this is a no-op, it could be avoided. However, using this
-      // optimization with the current implementation would change the output schema.
-      // TODO: Decouple final output schema from expression evaluation so this copy can be
-      // avoided safely.
-
-      if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
-          filterSet.subsetOf(projectSet)) {
-        // When it is possible to just use column pruning to get the right projection and
-        // when the columns of this projection are enough to evaluate all filter conditions,
-        // just do a scan followed by a filter, with no extra project.
-        val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
-        filterCondition.map(Filter(_, scan)).getOrElse(scan)
-      } else {
-        val scan = scanBuilder((projectSet ++ filterSet).toSeq)
-        Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
-      }
-    }
-  }
+  @deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0")
+  protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this)
 
   @transient
-  protected[sql] val planner = new SparkPlanner
+  protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
 
   @transient
   protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
@@ -898,59 +838,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
     protected[sql] lazy val conf: SQLConf = new SQLConf
   }
 
-  /**
-   * :: DeveloperApi ::
-   * The primary workflow for executing relational queries using Spark.  Designed to allow easy
-   * access to the intermediate phases of query execution for developers.
-   */
-  @DeveloperApi
-  protected[sql] class QueryExecution(val logical: LogicalPlan) {
-    def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
-
-    lazy val analyzed: LogicalPlan = analyzer.execute(logical)
-    lazy val withCachedData: LogicalPlan = {
-      assertAnalyzed()
-      cacheManager.useCachedData(analyzed)
-    }
-    lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
-
-    // TODO: Don't just pick the first one...
-    lazy val sparkPlan: SparkPlan = {
-      SparkPlan.currentContext.set(self)
-      planner.plan(optimizedPlan).next()
-    }
-    // executedPlan should not be used to initialize any SparkPlan. It should be
-    // only used for execution.
-    lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
-
-    /** Internal version of the RDD. Avoids copies and has no schema */
-    lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
-
-    protected def stringOrError[A](f: => A): String =
-      try f.toString catch { case e: Throwable => e.toString }
-
-    def simpleString: String =
-      s"""== Physical Plan ==
-         |${stringOrError(executedPlan)}
-      """.stripMargin.trim
-
-    override def toString: String = {
-      def output =
-        analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
-
-      s"""== Parsed Logical Plan ==
-         |${stringOrError(logical)}
-         |== Analyzed Logical Plan ==
-         |${stringOrError(output)}
-         |${stringOrError(analyzed)}
-         |== Optimized Logical Plan ==
-         |${stringOrError(optimizedPlan)}
-         |== Physical Plan ==
-         |${stringOrError(executedPlan)}
-         |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
-      """.stripMargin.trim
-    }
-  }
+  @deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
+  protected[sql] class QueryExecution(logical: LogicalPlan)
+    extends sparkexecution.QueryExecution(this, logical)
 
   /**
    * Parses the data type in our internal string representation. The data type string should
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
new file mode 100644
index 0000000000..7bb4133a29
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, optimizer}
+import org.apache.spark.sql.{SQLContext, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * :: DeveloperApi ::
+ * The primary workflow for executing relational queries using Spark.  Designed to allow easy
+ * access to the intermediate phases of query execution for developers.
+ */
+@DeveloperApi
+class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
+  val analyzer = sqlContext.analyzer
+  val optimizer = sqlContext.optimizer
+  val planner = sqlContext.planner
+  val cacheManager = sqlContext.cacheManager
+  val prepareForExecution = sqlContext.prepareForExecution
+
+  def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+
+  lazy val analyzed: LogicalPlan = analyzer.execute(logical)
+  lazy val withCachedData: LogicalPlan = {
+    assertAnalyzed()
+    cacheManager.useCachedData(analyzed)
+  }
+  lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+  // TODO: Don't just pick the first one...
+  lazy val sparkPlan: SparkPlan = {
+    SparkPlan.currentContext.set(sqlContext)
+    planner.plan(optimizedPlan).next()
+  }
+  // executedPlan should not be used to initialize any SparkPlan. It should be
+  // only used for execution.
+  lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+
+  /** Internal version of the RDD. Avoids copies and has no schema */
+  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+
+  protected def stringOrError[A](f: => A): String =
+    try f.toString catch { case e: Throwable => e.toString }
+
+  def simpleString: String =
+    s"""== Physical Plan ==
+       |${stringOrError(executedPlan)}
+      """.stripMargin.trim
+
+
+  override def toString: String = {
+    def output =
+      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
+
+    s"""== Parsed Logical Plan ==
+       |${stringOrError(logical)}
+       |== Analyzed Logical Plan ==
+       |${stringOrError(output)}
+       |${stringOrError(analyzed)}
+       |== Optimized Logical Plan ==
+       |${stringOrError(optimizedPlan)}
+       |== Physical Plan ==
+       |${stringOrError(executedPlan)}
+       |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
+    """.stripMargin.trim
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index cee58218a8..1422e15549 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -37,7 +37,7 @@ private[sql] object SQLExecution {
    * we can connect them with an execution.
    */
   def withNewExecutionId[T](
-      sqlContext: SQLContext, queryExecution: SQLContext#QueryExecution)(body: => T): T = {
+      sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
     val sc = sqlContext.sparkContext
     val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
     if (oldExecutionId == null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
new file mode 100644
index 0000000000..b346f43fae
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+@Experimental
+class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
+  val sparkContext: SparkContext = sqlContext.sparkContext
+
+  def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
+
+  def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
+
+  def numPartitions: Int = sqlContext.conf.numShufflePartitions
+
+  def strategies: Seq[Strategy] =
+    sqlContext.experimental.extraStrategies ++ (
+      DataSourceStrategy ::
+      DDLStrategy ::
+      TakeOrderedAndProject ::
+      HashAggregation ::
+      Aggregation ::
+      LeftSemiJoin ::
+      EquiJoinSelection ::
+      InMemoryScans ::
+      BasicOperators ::
+      CartesianProduct ::
+      BroadcastNestedLoopJoin :: Nil)
+
+  /**
+   * Used to build table scan operators where complex projection and filtering are done using
+   * separate physical operators.  This function returns the given scan operator with Project and
+   * Filter nodes added only when needed.  For example, a Project operator is only used when the
+   * final desired output requires complex expressions to be evaluated or when columns can be
+   * further eliminated out after filtering has been done.
+   *
+   * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
+   * away by the filter pushdown optimization.
+   *
+   * The required attributes for both filtering and expression evaluation are passed to the
+   * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
+   */
+  def pruneFilterProject(
+      projectList: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      prunePushedDownFilters: Seq[Expression] => Seq[Expression],
+      scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
+
+    val projectSet = AttributeSet(projectList.flatMap(_.references))
+    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+    val filterCondition =
+      prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
+
+    // Right now we still use a projection even if the only evaluation is applying an alias
+    // to a column.  Since this is a no-op, it could be avoided. However, using this
+    // optimization with the current implementation would change the output schema.
+    // TODO: Decouple final output schema from expression evaluation so this copy can be
+    // avoided safely.
+
+    if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
+        filterSet.subsetOf(projectSet)) {
+      // When it is possible to just use column pruning to get the right projection and
+      // when the columns of this projection are enough to evaluate all filter conditions,
+      // just do a scan followed by a filter, with no extra project.
+      val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
+      filterCondition.map(Filter(_, scan)).getOrElse(scan)
+    } else {
+      val scan = scanBuilder((projectSet ++ filterSet).toSeq)
+      Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4572d5efc9..5e40d77689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.{SQLContext, Strategy, execution}
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
-  self: SQLContext#SparkPlanner =>
+  self: SparkPlanner =>
 
   object LeftSemiJoin extends Strategy with PredicateHelper {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-- 
GitLab