From 875f5079290a06c12d44de657dfeabc913e4a303 Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Fri, 5 Feb 2016 15:07:43 -0800
Subject: [PATCH] [SPARK-13215] [SQL] remove fallback in codegen

Since we remove the configuration for codegen, we are heavily reply on codegen (also TungstenAggregate require the generated MutableProjection to update UnsafeRow), should remove the fallback, which could make user confusing, see the discussion in SPARK-13116.

Author: Davies Liu <davies@databricks.com>

Closes #11097 from davies/remove_fallback.
---
 .../spark/sql/execution/SparkPlan.scala       | 36 ++-----------------
 .../spark/sql/execution/aggregate/udaf.scala  | 12 ++-----
 .../spark/sql/execution/local/LocalNode.scala | 26 ++------------
 3 files changed, 8 insertions(+), 66 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b19b772409..3cc99d3c7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       inputSchema: Seq[Attribute],
       useSubexprElimination: Boolean = false): () => MutableProjection = {
     log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
-    try {
-      GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
-    } catch {
-      case e: Exception =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate mutable projection, fallback to interpreted", e)
-          () => new InterpretedMutableProjection(expressions, inputSchema)
-        }
-    }
+    GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
   }
 
   protected def newPredicate(
       expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
-    try {
-      GeneratePredicate.generate(expression, inputSchema)
-    } catch {
-      case e: Exception =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate predicate, fallback to interpreted", e)
-          InterpretedPredicate.create(expression, inputSchema)
-        }
-    }
+    GeneratePredicate.generate(expression, inputSchema)
   }
 
   protected def newOrdering(
       order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
-    try {
-      GenerateOrdering.generate(order, inputSchema)
-    } catch {
-      case e: Exception =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate ordering, fallback to interpreted", e)
-          new InterpretedOrdering(order, inputSchema)
-        }
-    }
+    GenerateOrdering.generate(order, inputSchema)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 5a19920add..812e696338 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
+import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
@@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
     val inputAttributes = childrenSchema.toAttributes
     log.debug(
       s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
-    try {
-      GenerateMutableProjection.generate(children, inputAttributes)()
-    } catch {
-      case e: Exception =>
-        log.error("Failed to generate mutable projection, fallback to interpreted", e)
-        new InterpretedMutableProjection(children, inputAttributes)
-    }
+    GenerateMutableProjection.generate(children, inputAttributes)()
   }
 
   private[this] lazy val inputToScalaConverters: Any => Any =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index a0dfe996cc..8726e48781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.local
 
-import scala.util.control.NonFatal
-
 import org.apache.spark.Logging
 import org.apache.spark.sql.{Row, SQLConf}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
       inputSchema: Seq[Attribute]): () => MutableProjection = {
     log.debug(
       s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
-    try {
-      GenerateMutableProjection.generate(expressions, inputSchema)
-    } catch {
-      case NonFatal(e) =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate mutable projection, fallback to interpreted", e)
-          () => new InterpretedMutableProjection(expressions, inputSchema)
-        }
-    }
+    GenerateMutableProjection.generate(expressions, inputSchema)
   }
 
   protected def newPredicate(
       expression: Expression,
       inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
-    try {
-      GeneratePredicate.generate(expression, inputSchema)
-    } catch {
-      case NonFatal(e) =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate predicate, fallback to interpreted", e)
-          InterpretedPredicate.create(expression, inputSchema)
-        }
-    }
+    GeneratePredicate.generate(expression, inputSchema)
   }
 }
 
-- 
GitLab