diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 41d40aa926fbb3e0ee18d785b2c7d38d4c702bb1..b97fa54446e0c20dfb6c6e63a325a2eea56fe886 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -409,6 +409,42 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
   }
 }
 
+
+/**
+ * A rule to do various checks before reading a table.
+ */
+object PreReadCheck extends (LogicalPlan => Unit) {
+  def apply(plan: LogicalPlan): Unit = {
+    plan.foreach {
+      case operator: LogicalPlan =>
+        operator transformExpressionsUp {
+          case e @ (_: InputFileName | _: InputFileBlockLength | _: InputFileBlockStart) =>
+            checkNumInputFileBlockSources(e, operator)
+            e
+        }
+    }
+  }
+
+  private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
+    operator match {
+      case _: CatalogRelation => 1
+      case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
+      case _: LeafNode => 0
+      // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
+      case u: Union =>
+        if (u.children.map(checkNumInputFileBlockSources(e, _)).sum >= 1) 1 else 0
+      case o =>
+        val numInputFileBlockSources = o.children.map(checkNumInputFileBlockSources(e, _)).sum
+        if (numInputFileBlockSources > 1) {
+          e.failAnalysis(s"'${e.prettyName}' does not support more than one sources")
+        } else {
+          numInputFileBlockSources
+        }
+    }
+  }
+}
+
+
 /**
  * A rule to do various checks before inserting into or writing to a data source table.
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 267f76217df84cbd2a7569df86138754a2cfc6e5..37f4f8d4ab65e54bb03a0d556eede73ee74e5a66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -168,6 +168,7 @@ abstract class BaseSessionStateBuilder(
 
     override val extendedCheckRules: Seq[LogicalPlan => Unit] =
       PreWriteCheck +:
+        PreReadCheck +:
         HiveOnlyCheck +:
         customCheckRules
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index bc708ca88d7e1e5d9b1bee073f17e3dccf4a38ee..7c45be21961d31d3f44a5cb5782aefa8970db68b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -530,6 +530,63 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
     )
   }
 
+  test("input_file_name, input_file_block_start, input_file_block_length - more than one source") {
+    withTempView("tempView1") {
+      withTable("tab1", "tab2") {
+        val data = sparkContext.parallelize(0 to 9).toDF("id")
+        data.write.saveAsTable("tab1")
+        data.write.saveAsTable("tab2")
+        data.createOrReplaceTempView("tempView1")
+        Seq("input_file_name", "input_file_block_start", "input_file_block_length").foreach { f =>
+          val e = intercept[AnalysisException] {
+            sql(s"SELECT *, $f() FROM tab1 JOIN tab2 ON tab1.id = tab2.id")
+          }.getMessage
+          assert(e.contains(s"'$f' does not support more than one source"))
+        }
+
+        def checkResult(
+            fromClause: String,
+            exceptionExpected: Boolean,
+            numExpectedRows: Int = 0): Unit = {
+          val stmt = s"SELECT *, input_file_name() FROM ($fromClause)"
+          if (exceptionExpected) {
+            val e = intercept[AnalysisException](sql(stmt)).getMessage
+            assert(e.contains("'input_file_name' does not support more than one source"))
+          } else {
+            assert(sql(stmt).count() == numExpectedRows)
+          }
+        }
+
+        checkResult(
+          "SELECT * FROM tab1 UNION ALL SELECT * FROM tab2 UNION ALL SELECT * FROM tab2",
+          exceptionExpected = false,
+          numExpectedRows = 30)
+
+        checkResult(
+          "(SELECT * FROM tempView1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2",
+          exceptionExpected = false,
+          numExpectedRows = 20)
+
+        checkResult(
+          "(SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tempView1",
+          exceptionExpected = false,
+          numExpectedRows = 20)
+
+        checkResult(
+          "(SELECT * FROM tempView1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2",
+          exceptionExpected = true)
+
+        checkResult(
+          "(SELECT * FROM tab1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2",
+          exceptionExpected = true)
+
+        checkResult(
+          "(SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2",
+          exceptionExpected = true)
+      }
+    }
+  }
+
   test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
     withTempPath { dir =>
       val data = sparkContext.parallelize(0 to 10).toDF("id")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 6676099d426ba902764618cdc4e5713ab2e80ee9..a5cf40c3581c614b4a67f24efe785fcdb6e9da5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -127,8 +127,7 @@ class FileStreamSinkSuite extends StreamTest {
       // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
       // been inferred
       val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
-        case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
-          baseRelation.asInstanceOf[HadoopFsRelation]
+        case LogicalRelation(baseRelation: HadoopFsRelation, _, _) => baseRelation
       }
       assert(hadoopdFsRelations.size === 1)
       assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index e16c9e46b7723fc5796005777f77ea9dbd161205..92cb4ef11c9e3ff92dd397b585dbde0dec2f71e6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -69,22 +69,23 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
   override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
     override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
       new ResolveHiveSerdeTable(session) +:
-      new FindDataSourceTable(session) +:
-      new ResolveSQLOnFile(session) +:
-      customResolutionRules
+        new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        customResolutionRules
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
       new DetermineTableStats(session) +:
-      RelationConversions(conf, catalog) +:
-      PreprocessTableCreation(session) +:
-      PreprocessTableInsertion(conf) +:
-      DataSourceAnalysis(conf) +:
-      HiveAnalysis +:
-      customPostHocResolutionRules
+        RelationConversions(conf, catalog) +:
+        PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        HiveAnalysis +:
+        customPostHocResolutionRules
 
     override val extendedCheckRules: Seq[LogicalPlan => Unit] =
       PreWriteCheck +:
-      customCheckRules
+        PreReadCheck +:
+        customCheckRules
   }
 
   /**