diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9ab98fd124a348ea5082778d5e62aeae90050f54..ee72a70cced1a21fb1ec54362c0732b9b798fe10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec( "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException + // in the case of fallback, this batched scan should never fail because of: + // 1) only primitive types are supported + // 2) the number of columns should be smaller than spark.sql.codegen.maxFields + WholeStageCodegenExec(this).execute() } override def simpleString: String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e0d8e357132917424fae3c5e05566be01db8dfcc..ac4c3aae5f8ee2ce24525dd33e7d4300f040378e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * An interface for those physical operators that support codegen. @@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments())) logDebug(s"\n${CodeFormatter.format(cleanedSource)}") - CodeGenerator.compile(cleanedSource) (ctx, cleanedSource) } override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() + // try to compile and fallback if it failed + try { + CodeGenerator.compile(cleanedSource) + } catch { + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + // We should already saw the error message + logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") + return child.execute() + } val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 437e093825f6ea745aa963f7d629655512e519fa..27b1fffe27a707a09600f62f75ac30bf0991febf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -433,7 +433,14 @@ object SQLConf { .doc("The maximum number of fields (including nested fields) that will be supported before" + " deactivating whole-stage codegen.") .intConf - .createWithDefault(200) + .createWithDefault(100) + + val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") + .internal() + .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + " fail to compile generated code") + .booleanConf + .createWithDefault(true) val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches") .internal() @@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)