diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 58d11751353b31dc5d54a4076733bb7aa03f2b4b..4911443dd6dde968b33c4327059009aae60a6425 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -53,7 +53,9 @@ private[sql] class DataFrameImpl protected[sql](
   def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
     this(sqlContext, {
       val qe = sqlContext.executePlan(logicalPlan)
-      qe.analyzed  // This should force analysis and throw errors if there are any
+      if (sqlContext.conf.dataFrameEagerAnalysis) {
+        qe.analyzed  // This should force analysis and throw errors if there are any
+      }
       qe
     })
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5ef3bb022fc5a1cc1b35f0afed7fc432ce655c21..180f5e765fb91e3270c66a9e6f2b06442183c493 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -52,6 +52,9 @@ private[spark] object SQLConf {
   // This is used to set the default data source
   val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
 
+  // Whether to perform eager analysis on a DataFrame.
+  val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -173,6 +176,9 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def defaultDataSourceName: String =
     getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
 
+  private[spark] def dataFrameEagerAnalysis: Boolean =
+    getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 74c29459d2e47752d65b2d1cdf3b6f3f332571ac..77fd3165f151f5497e3e643ee3b3df93fbcd6e36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.sql
 
+import scala.language.postfixOps
+
 import org.apache.spark.sql.Dsl._
 import org.apache.spark.sql.types._
-
-/* Implicits */
+import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
 import org.apache.spark.sql.test.TestSQLContext.implicits._
 
-import scala.language.postfixOps
 
 class DataFrameSuite extends QueryTest {
   import org.apache.spark.sql.TestData._
 
   test("analysis error should be eagerly reported") {
+    val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
+    // Eager analysis.
+    TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+
     intercept[Exception] { testData.select('nonExistentName) }
     intercept[Exception] {
       testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
@@ -40,6 +44,13 @@ class DataFrameSuite extends QueryTest {
     intercept[Exception] {
       testData.groupBy($"abcd").agg(Map("key" -> "sum"))
     }
+
+    // No more eager analysis once the flag is turned off
+    TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+    testData.select('nonExistentName)
+
+    // Set the flag back to original value before this test.
+    TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
   }
 
   test("table scan") {