From 2974406d17a3831c1897b8d99261419592f8042f Mon Sep 17 00:00:00 2001 From: gatorsmile <gatorsmile@gmail.com> Date: Tue, 5 Sep 2017 09:04:03 -0700 Subject: [PATCH] [SPARK-21845][SQL][TEST-MAVEN] Make codegen fallback of expressions configurable ## What changes were proposed in this pull request? We should make codegen fallback of expressions configurable. So far, it is always on. We might hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback when running test cases. ## How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #19119 from gatorsmile/fallbackCodegen. --- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 11 ++++++----- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++++++++++- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 ++ .../org/apache/spark/sql/hive/test/TestHive.scala | 1 + 7 files changed, 25 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c407874381..db5d65c9b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -559,9 +559,9 @@ object SQLConf { .intConf .createWithDefault(100) - val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") + val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() - .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1051,7 +1051,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c7277c21ce..b263f100e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,15 +56,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // sqlContext will be null when we are being deserialized on the slaves. In this instance - // the value of subexpressionEliminationEnabled will be set by the deserializer after the - // constructor has run. + // sqlContext will be null when SparkPlan nodes are created without the active sessions. + // So far, this only happens in the test cases. val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.subexpressionEliminationEnabled } else { false } + // whether we should fallback when hitting compilation errors caused by codegen + private val codeGenFallBack = (sqlContext == null) || sqlContext.conf.codegenFallback + /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { SparkSession.setActiveSession(sqlContext.sparkSession) @@ -370,8 +372,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e @ (_: JaninoRuntimeException | _: CompileException) - if sqlContext == null || sqlContext.conf.wholeStageFallback => + case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => genInterpretedPredicate(expression, inputSchema) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index bacb7090a7..a41a7ca56a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 0681b9cbeb..50e475984f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { v } withSQLConf( - (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString), + (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString), (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { val df = spark.range(0, 4, 1, 4).withColumn("c", c) val rows = df.collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5eb34e587e..13341645e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val filter = (0 until N) .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) - df.filter(filter).count + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { + df.filter(filter).count() + } + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { + val e = intercept[SparkException] { + df.filter(filter).count() + }.getMessage + assert(e.contains("grows beyond 64 KB")) + } } test("SPARK-20897: cached self-join should not fail") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 1f073d5f64..cd8d0708d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.internal.SQLConf /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 10c9a2de65..0f6a81b6f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -51,6 +51,7 @@ object TestHive "TestSQLContext", new SparkConf() .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) -- GitLab