diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 75ae588c18ec6fd74d1c30c544bde942dcb6a92c..b805cfe88be63eb51f0f2bc2d650e2c6691b940a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -49,6 +49,11 @@ trait CatalystConf {
   def resolver: Resolver = {
     if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
   }
+
+  /**
+   * Enables CBO for estimation of plan statistics when set true.
+   */
+  def cboEnabled: Boolean
 }
 
 
@@ -62,5 +67,6 @@ case class SimpleCatalystConf(
     maxCaseBranchesForCodegen: Int = 20,
     runSQLonFile: Boolean = true,
     crossJoinEnabled: Boolean = false,
+    cboEnabled: Boolean = false,
     warehousePath: String = "/user/hive/warehouse")
   extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dfd66aac2dd44930b0b22291b29e33ec8965e285..d1f90e6a1a5935333c395523ffd5376221cdfa33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -82,7 +82,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       EliminateOuterJoin,
       PushPredicateThroughJoin,
       PushDownPredicate,
-      LimitPushDown,
+      LimitPushDown(conf),
       ColumnPruning,
       InferFiltersFromConstraints,
       // Operator combine
@@ -209,7 +209,7 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
 /**
  * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
  */
-object LimitPushDown extends Rule[LogicalPlan] {
+case class LimitPushDown(conf: CatalystConf) extends Rule[LogicalPlan] {
 
   private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
     plan match {
@@ -253,7 +253,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
         case FullOuter =>
           (left.maxRows, right.maxRows) match {
             case (None, None) =>
-              if (left.statistics.sizeInBytes >= right.statistics.sizeInBytes) {
+              if (left.planStats(conf).sizeInBytes >= right.planStats(conf).sizeInBytes) {
                 join.copy(left = maybePushLimit(exp, left))
               } else {
                 join.copy(right = maybePushLimit(exp, right))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index b0a4145f37767b307a4c797986e4c578e7d4c362..4f634cb29ddb96107494b765138f17b230484140 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -94,6 +95,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
     Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
   }
 
+  /**
+   * Returns the default statistics or statistics estimated by cbo based on configuration.
+   */
+  final def planStats(conf: CatalystConf): Statistics = {
+    if (conf.cboEnabled) {
+      if (estimatedStats.isEmpty) {
+        estimatedStats = Some(cboStatistics(conf))
+      }
+      estimatedStats.get
+    } else {
+      statistics
+    }
+  }
+
+  /**
+   * Returns statistics estimated by cbo. If the plan doesn't override this, it returns the
+   * default statistics.
+   */
+  protected def cboStatistics(conf: CatalystConf): Statistics = statistics
+
+  /** A cache for the estimated statistics, such that it will only be computed once. */
+  private var estimatedStats: Option[Statistics] = None
+
   /**
    * Returns the maximum number of rows that this plan may compute.
    *
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index dcbc79365c3aa561106efa24977071f52cc40553..9ec99835c68cb65208cb88b4733546ea0c7228ee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -32,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
       Batch("Subqueries", Once,
         EliminateSubqueryAliases) ::
       Batch("Limit pushdown", FixedPoint(100),
-        LimitPushDown,
+        LimitPushDown(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         CombineLimits,
         ConstantFolding,
         BooleanSimplification) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ba82ec156e85040e3bcc20331329ad9a38310b7b..81cd5ef3402720b340bc9d064e2690a37ff1d52f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,9 +114,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      * Matches a plan whose output should be small enough to be used in broadcast join.
      */
     private def canBroadcast(plan: LogicalPlan): Boolean = {
-      plan.statistics.isBroadcastable ||
-        (plan.statistics.sizeInBytes >= 0 &&
-          plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
+      plan.planStats(conf).isBroadcastable ||
+        (plan.planStats(conf).sizeInBytes >= 0 &&
+          plan.planStats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
     }
 
     /**
@@ -126,7 +126,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      * dynamic.
      */
     private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
-      plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
+      plan.planStats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
     }
 
     /**
@@ -137,7 +137,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      * use the size of bytes here as estimation.
      */
     private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
-      a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
+      a.planStats(conf).sizeInBytes * 3 <= b.planStats(conf).sizeInBytes
     }
 
     private def canBuildRight(joinType: JoinType): Boolean = joinType match {
@@ -206,7 +206,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       case logical.Join(left, right, joinType, condition) =>
         val buildSide =
-          if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
+          if (right.planStats(conf).sizeInBytes <= left.planStats(conf).sizeInBytes) {
             BuildRight
           } else {
             BuildLeft
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 304dcb691b32399b4a7403d02b3d5aa5c09d1ce9..322cc7c928838e4d4dd49233fda65299b17faf4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -642,6 +642,12 @@ object SQLConf {
       .doubleConf
       .createWithDefault(0.05)
 
+  val CBO_ENABLED =
+    SQLConfigBuilder("spark.sql.cbo.enabled")
+      .doc("Enables CBO for estimation of plan statistics when set true.")
+      .booleanConf
+      .createWithDefault(false)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -841,6 +847,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
 
   def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
+
+  override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..78f2ce1d57ef011f3e4b2200959a2d22c93c9cb3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/statsEstimation/StatsEstimationSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.statsEstimation
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.IntegerType
+
+
+class StatsEstimationSuite extends SharedSQLContext {
+  test("statistics for a plan based on the cbo switch") {
+    val expectedDefaultStats =
+      Statistics(
+        sizeInBytes = 40,
+        rowCount = Some(10),
+        attributeStats = AttributeMap(Seq(
+          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))),
+        isBroadcastable = false)
+    val expectedCboStats =
+      Statistics(
+        sizeInBytes = 4,
+        rowCount = Some(1),
+        attributeStats = AttributeMap(Seq(
+          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))),
+        isBroadcastable = false)
+
+    val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
+    withSQLConf("spark.sql.cbo.enabled" -> "true") {
+      // Use the statistics estimated by cbo
+      assert(plan.planStats(spark.sessionState.conf) == expectedCboStats)
+    }
+    withSQLConf("spark.sql.cbo.enabled" -> "false") {
+      // Use the default statistics
+      assert(plan.planStats(spark.sessionState.conf) == expectedDefaultStats)
+    }
+  }
+}
+
+/**
+ * This class is used for unit-testing the cbo switch, it mimics a logical plan which has both
+ * default statistics and cbo estimated statistics.
+ */
+private case class DummyLogicalPlan(
+    defaultStats: Statistics,
+    cboStats: Statistics) extends LogicalPlan {
+  override lazy val statistics = defaultStats
+  override def cboStatistics(conf: CatalystConf): Statistics = cboStats
+  override def output: Seq[Attribute] = Nil
+  override def children: Seq[LogicalPlan] = Nil
+}