From 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Tue, 29 Aug 2017 20:59:01 -0700
Subject: [PATCH] [SPARK-21845][SQL] Make codegen fallback of expressions
 configurable

## What changes were proposed in this pull request?
We should make codegen fallback of expressions configurable. So far, it is always on. We might hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback when running test cases.

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19062 from gatorsmile/fallbackCodegen.
---
 .../org/apache/spark/sql/internal/SQLConf.scala   |  6 +++---
 .../apache/spark/sql/execution/SparkPlan.scala    | 15 +++++----------
 .../sql/execution/WholeStageCodegenExec.scala     |  2 +-
 .../spark/sql/DataFrameFunctionsSuite.scala       |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala     | 12 +++++++++++-
 .../apache/spark/sql/test/SharedSQLContext.scala  |  2 ++
 .../org/apache/spark/sql/hive/test/TestHive.scala |  1 +
 7 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099505..24f51ef163 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -551,9 +551,9 @@ object SQLConf {
     .intConf
     .createWithDefault(100)
 
-  val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
+  val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
     .internal()
-    .doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
+    .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
       " fail to compile generated code")
     .booleanConf
     .createWithDefault(true)
@@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
-  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+  def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
 
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c7277c21ce..b1db9dd9dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -56,14 +56,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
   protected def sparkContext = sqlContext.sparkContext
 
-  // sqlContext will be null when we are being deserialized on the slaves.  In this instance
-  // the value of subexpressionEliminationEnabled will be set by the deserializer after the
-  // constructor has run.
-  val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.conf.subexpressionEliminationEnabled
-  } else {
-    false
-  }
+  // whether we should fallback when hitting compilation errors caused by codegen
+  private val codeGenFallBack = sqlContext.conf.codegenFallback
+
+  protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
@@ -370,8 +366,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     try {
       GeneratePredicate.generate(expression, inputSchema)
     } catch {
-      case e @ (_: JaninoRuntimeException | _: CompileException)
-          if sqlContext == null || sqlContext.conf.wholeStageFallback =>
+      case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack =>
         genInterpretedPredicate(expression, inputSchema)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index bacb7090a7..a41a7ca56a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
     try {
       CodeGenerator.compile(cleanedSource)
     } catch {
-      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
+      case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
         // We should already saw the error message
         logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
         return child.execute()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 0681b9cbeb..50e475984f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
         v
       }
       withSQLConf(
-        (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString),
+        (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString),
         (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) {
         val df = spark.range(0, 4, 1, 4).withColumn("c", c)
         val rows = df.collect()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5eb34e587e..13341645e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
 
     val filter = (0 until N)
       .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
-    df.filter(filter).count
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") {
+      df.filter(filter).count()
+    }
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
+      val e = intercept[SparkException] {
+        df.filter(filter).count()
+      }.getMessage
+      assert(e.contains("grows beyond 64 KB"))
+    }
   }
 
   test("SPARK-20897: cached self-join should not fail") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 1f073d5f64..cd8d0708d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
@@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua
     new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(SQLConf.CODEGEN_FALLBACK.key, "false")
   }
 
   /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 10c9a2de65..0f6a81b6f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -51,6 +51,7 @@ object TestHive
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")
+        .set(SQLConf.CODEGEN_FALLBACK.key, "false")
         .set("spark.sql.hive.metastore.barrierPrefixes",
           "org.apache.spark.sql.hive.execution.PairSerDe")
         .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
-- 
GitLab