diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b91518acce33a7cab2d3bb276e9ee4fd8ecd5ca6..4efefdacabf222b1a09d9a99df9a6e5cc62fc4e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,6 +113,13 @@ object SQLConf {
     .longConf
     .createWithDefault(10L * 1024 * 1024)
 
+  val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
+    SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
+    .doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
+      " This is useful in determining if a table is small enough to use auto broadcast joins.")
+    .booleanConf
+    .createWithDefault(false)
+
   val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
     .internal()
     .doc("The default table size used in query planning. By default, it is set to a larger " +
@@ -603,6 +610,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
+  def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228fd9b42b40a0da8f064eab33d633ab0611..9c820144aee12e7c0445ccf85d29054c06d1e857 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
+
 import scala.collection.JavaConverters._
 
 import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
       val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
       // TODO: check if this estimate is valid for tables after partition pruning.
       // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
-      // relatively cheap if parameters for the table are populated into the metastore.  An
-      // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
-      // of RPCs are involved.  Besides `totalSize`, there are also `numFiles`, `numRows`,
-      // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
+      // relatively cheap if parameters for the table are populated into the metastore.
+      // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+      // (see StatsSetupConst in Hive) that we can look at in the future.
       BigInt(
         // When table is external,`totalSize` is always zero, which will influence join strategy
         // so when `totalSize` is zero, use `rawDataSize` instead
-        // if the size is still less than zero, we use default size
-        Option(totalSize).map(_.toLong).filter(_ > 0)
-          .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
-            .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+        // if the size is still less than zero, we try to get the file size from HDFS.
+        // given this is only needed for optimization, if the HDFS call fails we return the default.
+        if (totalSize != null && totalSize.toLong > 0L) {
+          totalSize.toLong
+        } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+          rawDataSize.toLong
+        } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+          try {
+            val hadoopConf = sparkSession.sessionState.newHadoopConf()
+            val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+            fs.getContentSummary(hiveQlTable.getPath).getLength
+          } catch {
+            case e: IOException =>
+              logWarning("Failed to get table size from hdfs.", e)
+              sparkSession.sessionState.conf.defaultSizeInBytes
+          }
+        } else {
+          sparkSession.sessionState.conf.defaultSizeInBytes
+        })
     }
   )
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1a7b6c0112279a2cf401014e06f07cc4ef73dddf..f8e00a35a31e2fb35a117da66ca132006b97a02d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.{File, PrintWriter}
+
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
@@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
 
-class StatisticsSuite extends QueryTest with TestHiveSingleton {
+class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   import hiveContext.sql
 
   test("parse analyze commands") {
@@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
       classOf[AnalyzeTableCommand])
   }
 
+  test("MetastoreRelations fallback to HDFS for size estimation") {
+    val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
+    try {
+      withTempDir { tempDir =>
+
+        // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+        val file1 = new File(tempDir + "/data1")
+        val writer1 = new PrintWriter(file1)
+        writer1.write("1,2")
+        writer1.close()
+
+        val file2 = new File(tempDir + "/data2")
+        val writer2 = new PrintWriter(file2)
+        writer2.write("1,2")
+        writer2.close()
+
+        sql(
+          s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+            WITH SERDEPROPERTIES (
+              \"separatorChar\" = \",\",
+              \"quoteChar\"     = \"\\\"\",
+              \"escapeChar\"    = \"\\\\\")
+            LOCATION '$tempDir'
+          """)
+
+        hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+
+        val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+          .asInstanceOf[MetastoreRelation]
+
+        val properties = relation.hiveQlTable.getParameters
+        assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
+        assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")
+
+        val sizeInBytes = relation.statistics.sizeInBytes
+        assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+      }
+    } finally {
+      hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
+      sql("DROP TABLE csv_table ")
+    }
+  }
+
   ignore("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
       hiveContext.sessionState.catalog.lookupRelation(