diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 24f51ef16310693cedd79a30a6e39c8a256e12a7..a685099505ee82b1956dea61e80def615972bc84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") + val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() - .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + + .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b1db9dd9dd8ac4935111b511c6bba7b2aad87952..c7277c21cebb2d02efc5a632054c427d9a81ec9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,10 +56,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // whether we should fallback when hitting compilation errors caused by codegen - private val codeGenFallBack = sqlContext.conf.codegenFallback - - protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled + // sqlContext will be null when we are being deserialized on the slaves. In this instance + // the value of subexpressionEliminationEnabled will be set by the deserializer after the + // constructor has run. + val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { + sqlContext.conf.subexpressionEliminationEnabled + } else { + false + } /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { @@ -366,7 +370,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => + case e @ (_: JaninoRuntimeException | _: CompileException) + if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index a41a7ca56a0a1ae1a7fa7a8223e22a1cf0158467..bacb7090a70ab7edfc1d16a65a4446691e90ad11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 50e475984f4580e4428da5dd90e3ace0c8db12f5..0681b9cbeb1d84fc5e349bc47802afd314ab60fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { v } withSQLConf( - (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString), + (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString), (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { val df = spark.range(0, 4, 1, 4).withColumn("c", c) val rows = df.collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 13341645e8ff88329e299d718314b16273620986..5eb34e587e95f4e92df1409a6ebc80e01a39c58a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2011,17 +2011,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val filter = (0 until N) .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) - - withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { - df.filter(filter).count() - } - - withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { - val e = intercept[SparkException] { - df.filter(filter).count() - }.getMessage - assert(e.contains("grows beyond 64 KB")) - } + df.filter(filter).count } test("SPARK-20897: cached self-join should not fail") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index cd8d0708d8a32bc87df6eec1eb660564b11094ba..1f073d5f64c6b72019e781ceb9faca394649dcba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -24,7 +24,6 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -35,7 +34,6 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 0f6a81b6f813b0b72c6dc3e3f5d4618f227b0728..10c9a2de6540a1108f72aa46a9ec4175df9507cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -51,7 +51,6 @@ object TestHive "TestSQLContext", new SparkConf() .set("spark.sql.test", "") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)