diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 57d7d30e0eca2ca66e1ce20644489974d6cb99ee..e02b502b7b4d5eb9f3a55c878c71626842fe07f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -252,24 +252,8 @@ private[spark] object SQLConf {
         "not be provided to ExchangeCoordinator.",
       isPublic = false)
 
-  val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
-    defaultValue = Some(true),
-    doc = "When true, use the optimized Tungsten physical execution backend which explicitly " +
-          "manages memory and dynamically generates bytecode for expression evaluation.")
-
-  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
-    defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
-    doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
-      " a specific query.",
-    isPublic = false)
-
-  val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
-    defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
-    doc = "When true, use the new optimized Tungsten physical execution backend.",
-    isPublic = false)
-
   val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
-    defaultValue = Some(true),  // use CODEGEN_ENABLED as default
+    defaultValue = Some(true),
     doc = "When true, common subexpressions will be eliminated.",
     isPublic = false)
 
@@ -475,6 +459,9 @@ private[spark] object SQLConf {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
     val EXTERNAL_SORT = "spark.sql.planner.externalSort"
     val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2"
+    val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
+    val CODEGEN_ENABLED = "spark.sql.codegen"
+    val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
   }
 }
 
@@ -541,14 +528,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
-  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
-
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
-  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED))
-
   private[spark] def subexpressionEliminationEnabled: Boolean =
-    getConf(SUBEXPRESSION_ELIMINATION_ENABLED, codegenEnabled)
+    getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
 
   private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index b733b26987bcb9acad35452cdea6f2222f0a2a7a..d0e4e068092f90997fca7b1c1c3d5405a038343d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -58,7 +58,7 @@ case class Exchange(
    * Returns true iff we can support the data type, and we are not doing range partitioning.
    */
   private lazy val tungstenMode: Boolean = {
-    unsafeEnabled && codegenEnabled && GenerateUnsafeProjection.canSupport(child.schema) &&
+    GenerateUnsafeProjection.canSupport(child.schema) &&
       !newPartitioning.isInstanceOf[RangePartitioning]
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 77843f53b9bd0b811453b3b58a303b698856b0cc..5da5aea17e25b2672bad3dd5fd49e54537066ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -77,7 +77,6 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
        |${stringOrError(optimizedPlan)}
        |== Physical Plan ==
        |${stringOrError(executedPlan)}
-       |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
     """.stripMargin.trim
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 8650ac500b652c41327a43f55dceecf7eb5f7d78..1b833002f434c16fa9d6846509e8b6023fd7017f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -54,18 +54,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   protected def sparkContext = sqlContext.sparkContext
 
   // sqlContext will be null when we are being deserialized on the slaves.  In this instance
-  // the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the
+  // the value of subexpressionEliminationEnabled will be set by the desserializer after the
   // constructor has run.
-  val codegenEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.conf.codegenEnabled
-  } else {
-    false
-  }
-  val unsafeEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.conf.unsafeEnabled
-  } else {
-    false
-  }
   val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
     sqlContext.conf.subexpressionEliminationEnabled
   } else {
@@ -233,83 +223,63 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
   protected def newProjection(
       expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
-    log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if (codegenEnabled) {
-      try {
-        GenerateProjection.generate(expressions, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate projection, fallback to interpret", e)
-            new InterpretedProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedProjection(expressions, inputSchema)
+    log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
+    try {
+      GenerateProjection.generate(expressions, inputSchema)
+    } catch {
+      case e: Exception =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate projection, fallback to interpret", e)
+          new InterpretedProjection(expressions, inputSchema)
+        }
     }
   }
 
   protected def newMutableProjection(
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): () => MutableProjection = {
-    log.debug(
-      s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if(codegenEnabled) {
-      try {
-        GenerateMutableProjection.generate(expressions, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate mutable projection, fallback to interpreted", e)
-            () => new InterpretedMutableProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      () => new InterpretedMutableProjection(expressions, inputSchema)
+      expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
+    log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
+    try {
+      GenerateMutableProjection.generate(expressions, inputSchema)
+    } catch {
+      case e: Exception =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate mutable projection, fallback to interpreted", e)
+          () => new InterpretedMutableProjection(expressions, inputSchema)
+        }
     }
   }
 
   protected def newPredicate(
       expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
-    if (codegenEnabled) {
-      try {
-        GeneratePredicate.generate(expression, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate predicate, fallback to interpreted", e)
-            InterpretedPredicate.create(expression, inputSchema)
-          }
-      }
-    } else {
-      InterpretedPredicate.create(expression, inputSchema)
+    try {
+      GeneratePredicate.generate(expression, inputSchema)
+    } catch {
+      case e: Exception =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate predicate, fallback to interpreted", e)
+          InterpretedPredicate.create(expression, inputSchema)
+        }
     }
   }
 
   protected def newOrdering(
-      order: Seq[SortOrder],
-      inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
-    if (codegenEnabled) {
-      try {
-        GenerateOrdering.generate(order, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate ordering, fallback to interpreted", e)
-            new InterpretedOrdering(order, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedOrdering(order, inputSchema)
+      order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
+    try {
+      GenerateOrdering.generate(order, inputSchema)
+    } catch {
+      case e: Exception =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate ordering, fallback to interpreted", e)
+          new InterpretedOrdering(order, inputSchema)
+        }
     }
   }
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index a10d1edcc91aaa326f822eff769362167f1fc464..cf482ae4a05eea71dc7511c9c35cd17dfe861abf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -27,10 +27,6 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
   val sparkContext: SparkContext = sqlContext.sparkContext
 
-  def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
-
-  def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
-
   def numPartitions: Int = sqlContext.conf.numShufflePartitions
 
   def strategies: Seq[Strategy] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d65cb1bae7fb54c6fe94bcb4b458732294c0dd9f..96242f160aa51c2cdac7bd14aee6de60b9b36008 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -327,8 +327,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      *               if necessary.
      */
     def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
-      if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
-        TungstenSort.supportsSchema(child.schema)) {
+      if (TungstenSort.supportsSchema(child.schema)) {
         execution.TungstenSort(sortExprs, global, child)
       } else {
         execution.Sort(sortExprs, global, child)
@@ -368,8 +367,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.Project(projectList, child) =>
         // If unsafe mode is enabled and we support these data types in Unsafe, use the
         // Tungsten project. Otherwise, use the normal project.
-        if (sqlContext.conf.unsafeEnabled &&
-          UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
+        if (UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
           execution.TungstenProject(projectList, planLater(child)) :: Nil
         } else {
           execution.Project(projectList, planLater(child)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index 79abf2d5929be1fdf9927784d8e97345b8138ce0..a70e41436c7aa80fb12d2d0d5a750c8c3c4c80c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -59,13 +59,10 @@ object Utils {
       resultExpressions: Seq[NamedExpression],
       child: SparkPlan): Seq[SparkPlan] = {
     // Check if we can use TungstenAggregate.
-    val usesTungstenAggregate =
-      child.sqlContext.conf.unsafeEnabled &&
-      TungstenAggregate.supportsAggregate(
+    val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
         groupingExpressions,
         aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
 
-
     // 1. Create an Aggregate Operator for partial aggregations.
 
     val groupingAttributes = groupingExpressions.map(_.toAttribute)
@@ -144,11 +141,9 @@ object Utils {
       child: SparkPlan): Seq[SparkPlan] = {
 
     val aggregateExpressions = functionsWithDistinct ++ functionsWithoutDistinct
-    val usesTungstenAggregate =
-      child.sqlContext.conf.unsafeEnabled &&
-        TungstenAggregate.supportsAggregate(
-          groupingExpressions,
-          aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+    val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
+      groupingExpressions,
+      aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
 
     // functionsWithDistinct is guaranteed to be non-empty. Even though it may contain more than one
     // DISTINCT aggregate function, all of those functions will have the same column expression.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 8b2755a58757c47b1220d073269e35248131076d..e29c281b951f28917162a6ba291581039476ef12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -121,6 +121,33 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+    case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
+            s"will be ignored. Tungsten will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
+      }
+      (keyValueOutput, runFunc)
+
+    case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
+            s"will be ignored. Codegen will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true"))
+      }
+      (keyValueOutput, runFunc)
+
+    case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
+            s"will be ignored. Unsafe mode will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 7ce4a517838cb2c6676932c3d50e0e8e7dcd6cbf..997f7f494f4a3a43e5d478d4525a768f7648c530 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -45,9 +45,7 @@ trait HashJoin {
   override def output: Seq[Attribute] = left.output ++ right.output
 
   protected[this] def isUnsafeMode: Boolean = {
-    (self.codegenEnabled && self.unsafeEnabled
-      && UnsafeProjection.canSupport(buildKeys)
-      && UnsafeProjection.canSupport(self.schema))
+    UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)
   }
 
   override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 15b06b1537f8cce7ce09d350aa61a9d807258404..3633f356b014b6b98295b1170093d9c7c22fd119 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -65,9 +65,9 @@ trait HashOuterJoin {
   }
 
   protected[this] def isUnsafeMode: Boolean = {
-    (self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter
-      && UnsafeProjection.canSupport(buildKeys)
-      && UnsafeProjection.canSupport(self.schema))
+    joinType != FullOuter &&
+      UnsafeProjection.canSupport(buildKeys) &&
+      UnsafeProjection.canSupport(self.schema)
   }
 
   override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index beb141ade616de721148c8898faf0ae3c1dc769e..c7d13e0a72a870d4dc8eb578fe669085609ab25a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -34,11 +34,10 @@ trait HashSemiJoin {
   override def output: Seq[Attribute] = left.output
 
   protected[this] def supportUnsafe: Boolean = {
-    (self.codegenEnabled && self.unsafeEnabled
-      && UnsafeProjection.canSupport(leftKeys)
-      && UnsafeProjection.canSupport(rightKeys)
-      && UnsafeProjection.canSupport(left.schema)
-      && UnsafeProjection.canSupport(right.schema))
+    UnsafeProjection.canSupport(leftKeys) &&
+      UnsafeProjection.canSupport(rightKeys) &&
+      UnsafeProjection.canSupport(left.schema) &&
+      UnsafeProjection.canSupport(right.schema)
   }
 
   override def outputsUnsafeRows: Boolean = supportUnsafe
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 17030947b7bbc3db45e08f8e17828ddaec9b1af4..7aee8e3dd3fce7b896e4fb9f87b7e191ff08f70f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -54,10 +54,9 @@ case class SortMergeJoin(
     requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
   protected[this] def isUnsafeMode: Boolean = {
-    (codegenEnabled && unsafeEnabled
-      && UnsafeProjection.canSupport(leftKeys)
-      && UnsafeProjection.canSupport(rightKeys)
-      && UnsafeProjection.canSupport(schema))
+    UnsafeProjection.canSupport(leftKeys) &&
+      UnsafeProjection.canSupport(rightKeys) &&
+      UnsafeProjection.canSupport(schema)
   }
 
   override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 7e854e6702f770daccc6e283ad176976012943c7..5f1590c4638363766d6a01e6190e396e78ccef73 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -90,10 +90,9 @@ case class SortMergeOuterJoin(
   }
 
   private def isUnsafeMode: Boolean = {
-    (codegenEnabled && unsafeEnabled
-      && UnsafeProjection.canSupport(leftKeys)
-      && UnsafeProjection.canSupport(rightKeys)
-      && UnsafeProjection.canSupport(schema))
+    UnsafeProjection.canSupport(leftKeys) &&
+      UnsafeProjection.canSupport(rightKeys) &&
+      UnsafeProjection.canSupport(schema)
   }
 
   override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
index b1dc719ca85087464a808bba2058b91ea6440f14..aef655727fbbb0c292771de791a1a31e7eb4f6a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
@@ -46,10 +46,7 @@ trait HashJoinNode {
   private[this] var joinKeys: Projection = _
 
   protected def isUnsafeMode: Boolean = {
-    (codegenEnabled &&
-      unsafeEnabled &&
-      UnsafeProjection.canSupport(schema) &&
-      UnsafeProjection.canSupport(streamedKeys))
+    UnsafeProjection.canSupport(schema) && UnsafeProjection.canSupport(streamedKeys)
   }
 
   private def streamSideKeyGenerator: Projection = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index f96b62a67a254521357f70ab74b28041a5a8dd23..d3381eac91d43cda710d779e07c52eca1790e5e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -35,10 +35,6 @@ import org.apache.spark.sql.types.StructType
  */
 abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
 
-  protected val codegenEnabled: Boolean = conf.codegenEnabled
-
-  protected val unsafeEnabled: Boolean = conf.unsafeEnabled
-
   private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
 
   /**
@@ -111,21 +107,17 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
       expressions: Seq[Expression],
       inputSchema: Seq[Attribute]): Projection = {
     log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if (codegenEnabled) {
-      try {
-        GenerateProjection.generate(expressions, inputSchema)
-      } catch {
-        case NonFatal(e) =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate projection, fallback to interpret", e)
-            new InterpretedProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedProjection(expressions, inputSchema)
+      s"Creating Projection: $expressions, inputSchema: $inputSchema")
+    try {
+      GenerateProjection.generate(expressions, inputSchema)
+    } catch {
+      case NonFatal(e) =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate projection, fallback to interpret", e)
+          new InterpretedProjection(expressions, inputSchema)
+        }
     }
   }
 
@@ -133,41 +125,33 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
       expressions: Seq[Expression],
       inputSchema: Seq[Attribute]): () => MutableProjection = {
     log.debug(
-      s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if (codegenEnabled) {
-      try {
-        GenerateMutableProjection.generate(expressions, inputSchema)
-      } catch {
-        case NonFatal(e) =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate mutable projection, fallback to interpreted", e)
-            () => new InterpretedMutableProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      () => new InterpretedMutableProjection(expressions, inputSchema)
+      s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
+    try {
+      GenerateMutableProjection.generate(expressions, inputSchema)
+    } catch {
+      case NonFatal(e) =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate mutable projection, fallback to interpreted", e)
+          () => new InterpretedMutableProjection(expressions, inputSchema)
+        }
     }
   }
 
   protected def newPredicate(
       expression: Expression,
       inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
-    if (codegenEnabled) {
-      try {
-        GeneratePredicate.generate(expression, inputSchema)
-      } catch {
-        case NonFatal(e) =>
-          if (isTesting) {
-            throw e
-          } else {
-            log.error("Failed to generate predicate, fallback to interpreted", e)
-            InterpretedPredicate.create(expression, inputSchema)
-          }
-      }
-    } else {
-      InterpretedPredicate.create(expression, inputSchema)
+    try {
+      GeneratePredicate.generate(expression, inputSchema)
+    } catch {
+      case NonFatal(e) =>
+        if (isTesting) {
+          throw e
+        } else {
+          log.error("Failed to generate predicate, fallback to interpreted", e)
+          InterpretedPredicate.create(expression, inputSchema)
+        }
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 5b8841bc154a5c67acc26a8ad77110003b17afe4..48de693a999d8f178cf7d9e949640d5ae875b923 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -423,8 +423,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
 
   private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
 
-  private val codegenEnabled = sqlContext.conf.codegenEnabled
-
   private var _partitionSpec: PartitionSpec = _
 
   private class FileStatusCache {
@@ -661,7 +659,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
     // Yeah, to workaround serialization...
     val dataSchema = this.dataSchema
-    val codegenEnabled = this.codegenEnabled
     val needConversion = this.needConversion
 
     val requiredOutput = requiredColumns.map { col =>
@@ -678,11 +675,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       }
 
     converted.mapPartitions { rows =>
-      val buildProjection = if (codegenEnabled) {
+      val buildProjection =
         GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
-      } else {
-        () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
-      }
 
       val projectedRows = {
         val mutableProjection = buildProjection()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f3a7aa280367aba20ad19ba05f5cde443b4591b6..e4f23fe17b757b299876e7577f38c4b81e5bf7df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -621,11 +621,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-6899: type should match when using codegen") {
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-      checkAnswer(
-        decimalData.agg(avg('a)),
-        Row(new java.math.BigDecimal(2.0)))
-    }
+    checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0)))
   }
 
   test("SPARK-7133: Implement struct, array, and map field accessor") {
@@ -844,31 +840,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-8608: call `show` on local DataFrame with random columns should return same value") {
-    // Make sure we can pass this test for both codegen mode and interpreted mode.
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-      val df = testData.select(rand(33))
-      assert(df.showString(5) == df.showString(5))
-    }
-
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
-      val df = testData.select(rand(33))
-      assert(df.showString(5) == df.showString(5))
-    }
+    val df = testData.select(rand(33))
+    assert(df.showString(5) == df.showString(5))
 
     // We will reuse the same Expression object for LocalRelation.
-    val df = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
-    assert(df.showString(5) == df.showString(5))
+    val df1 = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
+    assert(df1.showString(5) == df1.showString(5))
   }
 
   test("SPARK-8609: local DataFrame with random columns should return same value after sort") {
-    // Make sure we can pass this test for both codegen mode and interpreted mode.
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-      checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
-    }
-
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
-      checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
-    }
+    checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
 
     // We will reuse the same Expression object for LocalRelation.
     val df = (1 to 10).map(Tuple1.apply).toDF()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
index 7ae12a7895f7e3243de4c31a3bcf43389a7a392a..68e99d6a6b8168ddae5ebb4d6083d45cdd264729 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
@@ -31,52 +31,46 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
   test("test simple types") {
-    withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
-      val df = sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
-      assert(df.select(struct("a", "b")).first().getStruct(0) === Row(1, 2))
-    }
+    val df = sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
+    assert(df.select(struct("a", "b")).first().getStruct(0) === Row(1, 2))
   }
 
   test("test struct type") {
-    withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
-      val struct = Row(1, 2L, 3.0F, 3.0)
-      val data = sparkContext.parallelize(Seq(Row(1, struct)))
+    val struct = Row(1, 2L, 3.0F, 3.0)
+    val data = sparkContext.parallelize(Seq(Row(1, struct)))
 
-      val schema = new StructType()
-        .add("a", IntegerType)
-        .add("b",
-          new StructType()
-            .add("b1", IntegerType)
-            .add("b2", LongType)
-            .add("b3", FloatType)
-            .add("b4", DoubleType))
+    val schema = new StructType()
+      .add("a", IntegerType)
+      .add("b",
+        new StructType()
+          .add("b1", IntegerType)
+          .add("b2", LongType)
+          .add("b3", FloatType)
+          .add("b4", DoubleType))
 
-      val df = sqlContext.createDataFrame(data, schema)
-      assert(df.select("b").first() === Row(struct))
-    }
+    val df = sqlContext.createDataFrame(data, schema)
+    assert(df.select("b").first() === Row(struct))
   }
 
   test("test nested struct type") {
-    withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
-      val innerStruct = Row(1, "abcd")
-      val outerStruct = Row(1, 2L, 3.0F, 3.0, innerStruct, "efg")
-      val data = sparkContext.parallelize(Seq(Row(1, outerStruct)))
+    val innerStruct = Row(1, "abcd")
+    val outerStruct = Row(1, 2L, 3.0F, 3.0, innerStruct, "efg")
+    val data = sparkContext.parallelize(Seq(Row(1, outerStruct)))
 
-      val schema = new StructType()
-        .add("a", IntegerType)
-        .add("b",
-          new StructType()
-            .add("b1", IntegerType)
-            .add("b2", LongType)
-            .add("b3", FloatType)
-            .add("b4", DoubleType)
-            .add("b5", new StructType()
-            .add("b5a", IntegerType)
-            .add("b5b", StringType))
-            .add("b6", StringType))
+    val schema = new StructType()
+      .add("a", IntegerType)
+      .add("b",
+        new StructType()
+          .add("b1", IntegerType)
+          .add("b2", LongType)
+          .add("b3", FloatType)
+          .add("b4", DoubleType)
+          .add("b5", new StructType()
+          .add("b5a", IntegerType)
+          .add("b5b", StringType))
+          .add("b6", StringType))
 
-      val df = sqlContext.createDataFrame(data, schema)
-      assert(df.select("b").first() === Row(outerStruct))
-    }
+    val df = sqlContext.createDataFrame(data, schema)
+    assert(df.select("b").first() === Row(outerStruct))
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 19e850a46fdfcfabca58462cef1a8effc81f7f09..acabe32c67bc0df08d1f3c8bb296ebc628371979 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -261,8 +261,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("aggregation with codegen") {
-    val originalValue = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
     // Prepare a table that we can group some rows.
     sqlContext.table("testData")
       .unionAll(sqlContext.table("testData"))
@@ -347,7 +345,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         Row(null, null, null, 0) :: Nil)
     } finally {
       sqlContext.dropTempTable("testData3x")
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
     }
   }
 
@@ -567,12 +564,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     sortTest()
   }
 
-  test("SPARK-6927 external sorting with codegen on") {
-    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-      sortTest()
-    }
-  }
-
   test("limit") {
     checkAnswer(
       sql("SELECT * FROM testData LIMIT 10"),
@@ -1624,12 +1615,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("aggregation with codegen updates peak execution memory") {
-    withSQLConf((SQLConf.CODEGEN_ENABLED.key, "true")) {
-      AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "aggregation with codegen") {
-        testCodeGen(
-          "SELECT key, count(value) FROM testData GROUP BY key",
-          (1 to 100).map(i => Row(i, 1)))
-      }
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "aggregation with codegen") {
+      testCodeGen(
+        "SELECT key, count(value) FROM testData GROUP BY key",
+        (1 to 100).map(i => Row(i, 1)))
     }
   }
 
@@ -1783,9 +1772,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     // This bug will be triggered when Tungsten is enabled and there are multiple
     // SortMergeJoin operators executed in the same task.
     val confs =
-      SQLConf.SORTMERGE_JOIN.key -> "true" ::
-        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" ::
-        SQLConf.TUNGSTEN_ENABLED.key -> "true" :: Nil
+      SQLConf.SORTMERGE_JOIN.key -> "true" :: SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
     withSQLConf(confs: _*) {
       val df1 = (1 to 50).map(i => (s"str_$i", i)).toDF("i", "j")
       val df2 =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
index 7a0f0dfd2b7f16b1c2799211382d558d16a9cc43..85486c08894c9e199adda37f24305048dc87c840 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
@@ -31,19 +31,6 @@ import org.apache.spark.sql.types._
 class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
   import testImplicits.localSeqToDataFrameHolder
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    sqlContext.conf.setConf(SQLConf.CODEGEN_ENABLED, true)
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      sqlContext.conf.unsetConf(SQLConf.CODEGEN_ENABLED)
-    } finally {
-      super.afterAll()
-    }
-  }
-
   test("sort followed by limit") {
     checkThatPlansAgree(
       (1 to 100).map(v => Tuple1(v)).toDF("a"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index dcbfdca71acb6e8b90d4124dad3d6db7b9230ee1..5b2998c3c76d3b5734b44ac5cd097a5b361c4cd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{SQLConf, SQLContext, QueryTest}
 
 /**
- * Test various broadcast join operators with unsafe enabled.
+ * Test various broadcast join operators.
  *
  * Tests in this suite we need to run Spark in local-cluster mode. In particular, the use of
  * unsafe map in [[org.apache.spark.sql.execution.joins.UnsafeHashedRelation]] is not triggered
@@ -45,8 +45,6 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
       .setAppName("testing")
     val sc = new SparkContext(conf)
     sqlContext = new SQLContext(sc)
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED, true)
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
   }
 
   override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
index 8c2e78b2a9db7b8c4ae55dc3532ed7ee05684220..44b0d9d4102a1b5d58820594391473ff14c65b63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
@@ -28,12 +28,9 @@ import org.apache.spark.sql.execution.joins.{HashedRelation, BuildLeft, BuildRig
 class HashJoinNodeSuite extends LocalNodeTest {
 
   // Test all combinations of the two dimensions: with/out unsafe and build sides
-  private val maybeUnsafeAndCodegen = Seq(false, true)
   private val buildSides = Seq(BuildLeft, BuildRight)
-  maybeUnsafeAndCodegen.foreach { unsafeAndCodegen =>
-    buildSides.foreach { buildSide =>
-      testJoin(unsafeAndCodegen, buildSide)
-    }
+  buildSides.foreach { buildSide =>
+    testJoin(buildSide)
   }
 
   /**
@@ -45,10 +42,7 @@ class HashJoinNodeSuite extends LocalNodeTest {
       buildKeys: Seq[Expression],
       buildNode: LocalNode): HashedRelation = {
 
-    val isUnsafeMode =
-      conf.codegenEnabled &&
-        conf.unsafeEnabled &&
-        UnsafeProjection.canSupport(buildKeys)
+    val isUnsafeMode = UnsafeProjection.canSupport(buildKeys)
 
     val buildSideKeyGenerator =
       if (isUnsafeMode) {
@@ -68,15 +62,10 @@ class HashJoinNodeSuite extends LocalNodeTest {
   /**
    * Test inner hash join with varying degrees of matches.
    */
-  private def testJoin(
-      unsafeAndCodegen: Boolean,
-      buildSide: BuildSide): Unit = {
-    val simpleOrUnsafe = if (!unsafeAndCodegen) "simple" else "unsafe"
-    val testNamePrefix = s"$simpleOrUnsafe / $buildSide"
+  private def testJoin(buildSide: BuildSide): Unit = {
+    val testNamePrefix = buildSide
     val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
     val conf = new SQLConf
-    conf.setConf(SQLConf.UNSAFE_ENABLED, unsafeAndCodegen)
-    conf.setConf(SQLConf.CODEGEN_ENABLED, unsafeAndCodegen)
 
     // Actual test body
     def runTest(leftInput: Array[(Int, String)], rightInput: Array[(Int, String)]): Unit = {
@@ -119,7 +108,7 @@ class HashJoinNodeSuite extends LocalNodeTest {
         .map { case (k, v) => (k, v, k, rightInputMap(k)) }
 
       Seq(makeBinaryHashJoinNode, makeBroadcastJoinNode).foreach { makeNode =>
-        val makeUnsafeNode = if (unsafeAndCodegen) wrapForUnsafe(makeNode) else makeNode
+        val makeUnsafeNode = wrapForUnsafe(makeNode)
         val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
 
         val actualOutput = hashJoinNode.collect().map { row =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
index 40299d9d5ee37a31d3b406e6d70a0c03171595f7..252f7cc8971f46da97811ef1b9e5d672674fea29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
@@ -26,30 +26,21 @@ import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
 class NestedLoopJoinNodeSuite extends LocalNodeTest {
 
   // Test all combinations of the three dimensions: with/out unsafe, build sides, and join types
-  private val maybeUnsafeAndCodegen = Seq(false, true)
   private val buildSides = Seq(BuildLeft, BuildRight)
   private val joinTypes = Seq(LeftOuter, RightOuter, FullOuter)
-  maybeUnsafeAndCodegen.foreach { unsafeAndCodegen =>
-    buildSides.foreach { buildSide =>
-      joinTypes.foreach { joinType =>
-        testJoin(unsafeAndCodegen, buildSide, joinType)
-      }
+  buildSides.foreach { buildSide =>
+    joinTypes.foreach { joinType =>
+      testJoin(buildSide, joinType)
     }
   }
 
   /**
    * Test outer nested loop joins with varying degrees of matches.
    */
-  private def testJoin(
-      unsafeAndCodegen: Boolean,
-      buildSide: BuildSide,
-      joinType: JoinType): Unit = {
-    val simpleOrUnsafe = if (!unsafeAndCodegen) "simple" else "unsafe"
-    val testNamePrefix = s"$simpleOrUnsafe / $buildSide / $joinType"
+  private def testJoin(buildSide: BuildSide, joinType: JoinType): Unit = {
+    val testNamePrefix = s"$buildSide / $joinType"
     val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
     val conf = new SQLConf
-    conf.setConf(SQLConf.UNSAFE_ENABLED, unsafeAndCodegen)
-    conf.setConf(SQLConf.CODEGEN_ENABLED, unsafeAndCodegen)
 
     // Actual test body
     def runTest(
@@ -63,7 +54,7 @@ class NestedLoopJoinNodeSuite extends LocalNodeTest {
         resolveExpressions(
           new NestedLoopJoinNode(conf, node1, node2, buildSide, joinType, Some(cond)))
       }
-      val makeUnsafeNode = if (unsafeAndCodegen) wrapForUnsafe(makeNode) else makeNode
+      val makeUnsafeNode = wrapForUnsafe(makeNode)
       val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
       val expectedOutput = generateExpectedOutput(leftInput, rightInput, joinType)
       val actualOutput = hashJoinNode.collect().map { row =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 97162249d99517b079d20ce110b61c22e369c7a6..544c1ef303ae9258c509f837ce2efd30f35a7bf3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -110,33 +110,23 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("Project metrics") {
-    withSQLConf(
-      SQLConf.UNSAFE_ENABLED.key -> "false",
-      SQLConf.CODEGEN_ENABLED.key -> "false",
-      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
-      // Assume the execution plan is
-      // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
-      val df = person.select('name)
-      testSparkPlanMetrics(df, 1, Map(
-        0L ->("Project", Map(
-          "number of rows" -> 2L)))
-      )
-    }
+    // Assume the execution plan is
+    // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+    val df = person.select('name)
+    testSparkPlanMetrics(df, 1, Map(
+      0L ->("TungstenProject", Map(
+        "number of rows" -> 2L)))
+    )
   }
 
   test("TungstenProject metrics") {
-    withSQLConf(
-      SQLConf.UNSAFE_ENABLED.key -> "true",
-      SQLConf.CODEGEN_ENABLED.key -> "true",
-      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
-      // Assume the execution plan is
-      // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
-      val df = person.select('name)
-      testSparkPlanMetrics(df, 1, Map(
-        0L ->("TungstenProject", Map(
-          "number of rows" -> 2L)))
-      )
-    }
+    // Assume the execution plan is
+    // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+    val df = person.select('name)
+    testSparkPlanMetrics(df, 1, Map(
+      0L ->("TungstenProject", Map(
+        "number of rows" -> 2L)))
+    )
   }
 
   test("Filter metrics") {
@@ -150,71 +140,30 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     )
   }
 
-  test("SortBasedAggregate metrics") {
-    // Because SortBasedAggregate may skip different rows if the number of partitions is different,
-    // this test should use the deterministic number of partitions.
-    withSQLConf(
-      SQLConf.UNSAFE_ENABLED.key -> "false",
-      SQLConf.CODEGEN_ENABLED.key -> "true",
-      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
-      // Assume the execution plan is
-      // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) ->
-      // SortBasedAggregate(nodeId = 0)
-      val df = testData2.groupBy().count() // 2 partitions
-      testSparkPlanMetrics(df, 1, Map(
-        2L -> ("SortBasedAggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 2L)),
-        0L -> ("SortBasedAggregate", Map(
-          "number of input rows" -> 2L,
-          "number of output rows" -> 1L)))
-      )
-
-      // Assume the execution plan is
-      // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
-      // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
-      // 2 partitions and each partition contains 2 keys
-      val df2 = testData2.groupBy('a).count()
-      testSparkPlanMetrics(df2, 1, Map(
-        3L -> ("SortBasedAggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 4L)),
-        0L -> ("SortBasedAggregate", Map(
-          "number of input rows" -> 4L,
-          "number of output rows" -> 3L)))
-      )
-    }
-  }
-
   test("TungstenAggregate metrics") {
-    withSQLConf(
-      SQLConf.UNSAFE_ENABLED.key -> "true",
-      SQLConf.CODEGEN_ENABLED.key -> "true",
-      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
-      // Assume the execution plan is
-      // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
-      // -> TungstenAggregate(nodeId = 0)
-      val df = testData2.groupBy().count() // 2 partitions
-      testSparkPlanMetrics(df, 1, Map(
-        2L -> ("TungstenAggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 2L)),
-        0L -> ("TungstenAggregate", Map(
-          "number of input rows" -> 2L,
-          "number of output rows" -> 1L)))
-      )
+    // Assume the execution plan is
+    // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+    // -> TungstenAggregate(nodeId = 0)
+    val df = testData2.groupBy().count() // 2 partitions
+    testSparkPlanMetrics(df, 1, Map(
+      2L -> ("TungstenAggregate", Map(
+        "number of input rows" -> 6L,
+        "number of output rows" -> 2L)),
+      0L -> ("TungstenAggregate", Map(
+        "number of input rows" -> 2L,
+        "number of output rows" -> 1L)))
+    )
 
-      // 2 partitions and each partition contains 2 keys
-      val df2 = testData2.groupBy('a).count()
-      testSparkPlanMetrics(df2, 1, Map(
-        2L -> ("TungstenAggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 4L)),
-        0L -> ("TungstenAggregate", Map(
-          "number of input rows" -> 4L,
-          "number of output rows" -> 3L)))
-      )
-    }
+    // 2 partitions and each partition contains 2 keys
+    val df2 = testData2.groupBy('a).count()
+    testSparkPlanMetrics(df2, 1, Map(
+      2L -> ("TungstenAggregate", Map(
+        "number of input rows" -> 6L,
+        "number of output rows" -> 4L)),
+      0L -> ("TungstenAggregate", Map(
+        "number of input rows" -> 4L,
+        "number of output rows" -> 3L)))
+    )
   }
 
   test("SortMergeJoin metrics") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 22d2aefd699b50b8f7e002486789ed707329b035..61e3e913c23eaa69ce20993b438acf6b7c035c57 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -808,54 +808,12 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
   }
 }
 
-class SortBasedAggregationQuerySuite extends AggregationQuerySuite {
 
-  var originalUnsafeEnabled: Boolean = _
+class TungstenAggregationQuerySuite extends AggregationQuerySuite
 
-  override def beforeAll(): Unit = {
-    originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "false")
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
-  }
-}
-
-class TungstenAggregationQuerySuite extends AggregationQuerySuite {
-
-  var originalUnsafeEnabled: Boolean = _
-
-  override def beforeAll(): Unit = {
-    originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "true")
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
-  }
-}
 
 class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
 
-  var originalUnsafeEnabled: Boolean = _
-
-  override def beforeAll(): Unit = {
-    originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "true")
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
-    sqlContext.conf.unsetConf("spark.sql.TungstenAggregate.testFallbackStartsAt")
-  }
-
   override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
     (0 to 2).foreach { fallbackStartsAt =>
       sqlContext.setConf(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 94162da4eae1ac0b8985377744ba667868f28780..a7b7ad0093915ae6a0545491385d20b8149f79d3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -37,8 +37,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
                    "== Parsed Logical Plan ==",
                    "== Analyzed Logical Plan ==",
                    "== Optimized Logical Plan ==",
-                   "== Physical Plan ==",
-                   "Code Generation")
+                   "== Physical Plan ==")
   }
 
   test("explain create table command") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 5f9a447759b482462417591c415deb120a33b32a..5ab477efc4ee0850b0204183b90339e9ebfe8d3c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -28,11 +28,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
 import org.apache.hadoop.io.Writable
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
-
 import org.apache.spark.util.Utils
 
+
 case class Fields(f1: Int, f2: Int, f3: Int, f4: Int, f5: Int)
 
 // Case classes for the custom UDF's.
@@ -92,44 +92,36 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton {
   }
 
   test("Max/Min on named_struct") {
-    def testOrderInStruct(): Unit = {
-      checkAnswer(sql(
-        """
-          |SELECT max(named_struct(
-          |           "key", key,
-          |           "value", value)).value FROM src
-        """.stripMargin), Seq(Row("val_498")))
-      checkAnswer(sql(
-        """
-          |SELECT min(named_struct(
-          |           "key", key,
-          |           "value", value)).value FROM src
-        """.stripMargin), Seq(Row("val_0")))
-
-      // nested struct cases
-      checkAnswer(sql(
-        """
-          |SELECT max(named_struct(
-          |           "key", named_struct(
-                              "key", key,
-                              "value", value),
-          |           "value", value)).value FROM src
-        """.stripMargin), Seq(Row("val_498")))
-      checkAnswer(sql(
-        """
-          |SELECT min(named_struct(
-          |           "key", named_struct(
-                             "key", key,
-                             "value", value),
-          |           "value", value)).value FROM src
-        """.stripMargin), Seq(Row("val_0")))
-    }
-    val codegenDefault = hiveContext.getConf(SQLConf.CODEGEN_ENABLED)
-    hiveContext.setConf(SQLConf.CODEGEN_ENABLED, true)
-    testOrderInStruct()
-    hiveContext.setConf(SQLConf.CODEGEN_ENABLED, false)
-    testOrderInStruct()
-    hiveContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault)
+    checkAnswer(sql(
+      """
+        |SELECT max(named_struct(
+        |           "key", key,
+        |           "value", value)).value FROM src
+      """.stripMargin), Seq(Row("val_498")))
+    checkAnswer(sql(
+      """
+        |SELECT min(named_struct(
+        |           "key", key,
+        |           "value", value)).value FROM src
+      """.stripMargin), Seq(Row("val_0")))
+
+    // nested struct cases
+    checkAnswer(sql(
+      """
+        |SELECT max(named_struct(
+        |           "key", named_struct(
+                            "key", key,
+                            "value", value),
+        |           "value", value)).value FROM src
+      """.stripMargin), Seq(Row("val_498")))
+    checkAnswer(sql(
+      """
+        |SELECT min(named_struct(
+        |           "key", named_struct(
+                           "key", key,
+                           "value", value),
+        |           "value", value)).value FROM src
+      """.stripMargin), Seq(Row("val_0")))
   }
 
   test("SPARK-6409 UDAF Average test") {