From 4e70e8256ce2f45b438642372329eac7b1e9e8cf Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Thu, 6 Aug 2015 17:30:31 -0700
Subject: [PATCH] [SPARK-9228] [SQL] use tungsten.enabled in public for both of
 codegen/unsafe

spark.sql.tungsten.enabled will be the default value for both codegen and unsafe, they are kept internally for debug/testing.

cc marmbrus rxin

Author: Davies Liu <davies@databricks.com>

Closes #7998 from davies/tungsten and squashes the following commits:

c1c16da [Davies Liu] update doc
1a47be1 [Davies Liu] use tungsten.enabled for both of codegen/unsafe
---
 docs/sql-programming-guide.md                 |  6 +++---
 .../scala/org/apache/spark/sql/SQLConf.scala  | 20 ++++++++++++-------
 .../spark/sql/execution/SparkPlan.scala       |  8 +++++++-
 .../spark/sql/execution/joins/HashJoin.scala  |  3 ++-
 .../sql/execution/joins/HashOuterJoin.scala   |  2 +-
 .../sql/execution/joins/HashSemiJoin.scala    |  3 ++-
 6 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3ea77e8242..6c317175d3 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1884,11 +1884,11 @@ that these options will be deprecated in future release as more optimizations ar
     </td>
   </tr>
   <tr>
-    <td><code>spark.sql.codegen</code></td>
+    <td><code>spark.sql.tungsten.enabled</code></td>
     <td>true</td>
     <td>
-      When true, code will be dynamically generated at runtime for expression evaluation in a specific
-      query. For some queries with complicated expression this option can lead to significant speed-ups.
+      When true, use the optimized Tungsten physical execution backend which explicitly manages memory
+      and dynamically generates bytecode for expression evaluation.
     </td>
   </tr>
   <tr>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f836122b3e..ef35c133d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -223,14 +223,21 @@ private[spark] object SQLConf {
     defaultValue = Some(200),
     doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
 
-  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+  val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
     defaultValue = Some(true),
+    doc = "When true, use the optimized Tungsten physical execution backend which explicitly " +
+          "manages memory and dynamically generates bytecode for expression evaluation.")
+
+  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+    defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
     doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
-      " a specific query.")
+      " a specific query.",
+    isPublic = false)
 
   val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
-    defaultValue = Some(true),
-    doc = "When true, use the new optimized Tungsten physical execution backend.")
+    defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
+    doc = "When true, use the new optimized Tungsten physical execution backend.",
+    isPublic = false)
 
   val DIALECT = stringConf(
     "spark.sql.dialect",
@@ -427,7 +434,6 @@ private[spark] object SQLConf {
  *
  * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
  */
-
 private[sql] class SQLConf extends Serializable with CatalystConf {
   import SQLConf._
 
@@ -474,11 +480,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
-  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED)
+  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
 
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
-  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED)
+  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED))
 
   private[spark] def useSqlAggregate2: Boolean = getConf(USE_SQL_AGGREGATE2)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2f29067f56..3fff79cd1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -55,12 +55,18 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   protected def sparkContext = sqlContext.sparkContext
 
   // sqlContext will be null when we are being deserialized on the slaves.  In this instance
-  // the value of codegenEnabled will be set by the desserializer after the constructor has run.
+  // the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the
+  // constructor has run.
   val codegenEnabled: Boolean = if (sqlContext != null) {
     sqlContext.conf.codegenEnabled
   } else {
     false
   }
+  val unsafeEnabled: Boolean = if (sqlContext != null) {
+    sqlContext.conf.unsafeEnabled
+  } else {
+    false
+  }
 
   /**
    * Whether the "prepare" method is called.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 5e9cd9fd23..22d46d1c3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -44,7 +44,8 @@ trait HashJoin {
   override def output: Seq[Attribute] = left.output ++ right.output
 
   protected[this] def isUnsafeMode: Boolean = {
-    (self.codegenEnabled && UnsafeProjection.canSupport(buildKeys)
+    (self.codegenEnabled && self.unsafeEnabled
+      && UnsafeProjection.canSupport(buildKeys)
       && UnsafeProjection.canSupport(self.schema))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 346337e642..701bd3cd86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -67,7 +67,7 @@ trait HashOuterJoin {
   }
 
   protected[this] def isUnsafeMode: Boolean = {
-    (self.codegenEnabled && joinType != FullOuter
+    (self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter
       && UnsafeProjection.canSupport(buildKeys)
       && UnsafeProjection.canSupport(self.schema))
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 47a7d370f5..82dd6eb7e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -33,7 +33,8 @@ trait HashSemiJoin {
   override def output: Seq[Attribute] = left.output
 
   protected[this] def supportUnsafe: Boolean = {
-    (self.codegenEnabled && UnsafeProjection.canSupport(leftKeys)
+    (self.codegenEnabled && self.unsafeEnabled
+      && UnsafeProjection.canSupport(leftKeys)
       && UnsafeProjection.canSupport(rightKeys)
       && UnsafeProjection.canSupport(left.schema)
       && UnsafeProjection.canSupport(right.schema))
-- 
GitLab