From 7504bc73f20fe0e6546a019ed91c3fd3804287ba Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Fri, 10 Jun 2016 21:12:06 -0700
Subject: [PATCH] [SPARK-15759] [SQL] Fallback to non-codegen when fail to
 compile generated code

## What changes were proposed in this pull request?

In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should fallback to non-codegen to make sure that query could run.

The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch mode, so we still use codegen for batched scan (for parquet). Because it only support primitive types and the number of columns is less than spark.sql.codegen.maxFields (100), it should not fail.

This could be configurable by `spark.sql.codegen.fallback`

## How was this patch tested?

Manual test it with buggy operator, it worked well.

Author: Davies Liu <davies@databricks.com>

Closes #13501 from davies/codegen_fallback.
---
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  5 ++++-
 .../spark/sql/execution/WholeStageCodegenExec.scala   | 11 ++++++++++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd124..ee72a70cce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
   }
 
   override def simpleString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35713..ac4c3aae5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
       new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments()))
 
     logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
-    CodeGenerator.compile(cleanedSource)
     (ctx, cleanedSource)
   }
 
   override def doExecute(): RDD[InternalRow] = {
     val (ctx, cleanedSource) = doCodeGen()
+    // try to compile and fallback if it failed
+    try {
+      CodeGenerator.compile(cleanedSource)
+    } catch {
+      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
+        // We should already saw the error message
+        logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
+        return child.execute()
+    }
     val references = ctx.references.toArray
 
     val durationMs = longMetric("pipelineTime")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 437e093825..27b1fffe27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -433,7 +433,14 @@ object SQLConf {
     .doc("The maximum number of fields (including nested fields) that will be supported before" +
       " deactivating whole-stage codegen.")
     .intConf
-    .createWithDefault(200)
+    .createWithDefault(100)
+
+  val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback")
+    .internal()
+    .doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
+      " fail to compile generated code")
+    .booleanConf
+    .createWithDefault(true)
 
   val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches")
     .internal()
@@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
+  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
-- 
GitLab